OW2 Consortium joram

Compare Revisions

Ignore whitespace Rev 6408 → Rev 6409

/trunk/joram/joram/mom/core/src/main/java/org/objectweb/joram/mom/dest/Queue.java
29,6 → 29,7
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Map.Entry;
import java.util.Vector;
 
56,7 → 57,6
import org.objectweb.joram.mom.util.QueueDeliveryTimeTask;
import org.objectweb.joram.shared.DestinationConstants;
import org.objectweb.joram.shared.MessageErrorConstants;
import org.objectweb.joram.shared.admin.AdminCommandConstant;
import org.objectweb.joram.shared.admin.AdminReply;
import org.objectweb.joram.shared.admin.AdminRequest;
import org.objectweb.joram.shared.admin.ClearQueue;
78,6 → 78,7
import org.objectweb.joram.shared.excepts.AccessException;
import org.objectweb.joram.shared.excepts.DestinationException;
import org.objectweb.joram.shared.excepts.MomException;
import org.objectweb.joram.shared.messages.ConversionHelper;
import org.objectweb.joram.shared.selectors.Selector;
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;
115,27 → 116,7
/** Static value holding the default DMQ identifier for a server. */
static AgentId defaultDMQId = null;
 
/**
* The re-delivery delay use to wait before re-delivering
* messages after a deny.
*/
private int reDeliveryDelay = 0;
/**
* @return the reDeliveryDelay
*/
public int getReDeliveryDelay() {
return reDeliveryDelay/1000;
}
 
/**
* @param reDeliveryDelay the reDeliveryDelay to set
*/
public void setReDeliveryDelay(int reDeliveryDelay) {
this.reDeliveryDelay = reDeliveryDelay*1000;
}
 
/**
* Threshold above which messages are considered as undeliverable because
* constantly denied; 0 stands for no threshold, -1 for value not set.
*/
172,6 → 153,41
return defaultDMQId;
}
 
/**
* The re-delivery delay use to wait before re-delivering
* messages after a deny.
*/
private int redeliveryDelay = 0;
 
/** Static value holding the default redelivery delay for a server. */
static int defaultRedeliveryDelay = -1;
 
/**
* @return the reDeliveryDelay
*/
public final int getRedeliveryDelay() {
if (redeliveryDelay == 0)
return getDefaultRedeliveryDelay();
return redeliveryDelay;
}
 
/**
* @param reDeliveryDelay the reDeliveryDelay to set
*/
public final void setRedeliveryDelay(int redeliveryDelay) {
// logger.log(BasicLevel.FATAL, "Queue.setRedeliveryDelay: " + REDELIVERY_DELAY + "=" + redeliveryDelay);
this.redeliveryDelay = redeliveryDelay;
}
 
/** Static method returning the default redelivery delay for a server. */
public static final int getDefaultRedeliveryDelay() {
return defaultRedeliveryDelay;
}
public static final void setDefaultRedeliveryDelay(int reDeliveryDelay) {
defaultRedeliveryDelay = reDeliveryDelay;
}
 
/** <code>true</code> if all the stored messages have the same priority. */
private boolean samePriorities;
 
200,7 → 216,24
super(name, fixed, stamp);
}
 
public static final String REDELIVERY_DELAY = "redeliveryDelay";
/**
* Configures an {@link Queue} instance.
*
* @param properties
* The initial set of properties.
*/
public void setProperties(Properties properties, boolean firstTime) throws Exception {
super.setProperties(properties, firstTime);
// Set redeliveryDelay if defined.
if (properties != null && properties.containsKey(REDELIVERY_DELAY)) {
setRedeliveryDelay(ConversionHelper.toInt(properties.get(REDELIVERY_DELAY)));
}
}
/**
* Distributes the received notifications to the appropriate reactions.
*
* @throws Exception
466,9 → 499,6
*/
protected void initialize(boolean firstTime) throws Exception {
cleanWaitingRequest(System.currentTimeMillis());
 
if (reDeliveryDelay == 0)
reDeliveryDelay = AgentServer.getInteger(AdminCommandConstant.RE_DELIVERY_DELAY, 0) * 1000;
receiving = false;
messages = new Vector();
777,8 → 807,8
else
message.setDeliveryCount(message.getDeliveryCount()-1);
if (reDeliveryDelay > 0 && (message.isRedelivered() || message.getDeliveryCount() > 0)) {
message.setDeliveryTime(System.currentTimeMillis() + reDeliveryDelay);
if (getRedeliveryDelay() > 0 && (message.isRedelivered() || message.getDeliveryCount() > 0)) {
message.setDeliveryTime(System.currentTimeMillis() + (getRedeliveryDelay() *1000L));
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Queue.DenyRequest: setDeliveryTime " + message.getDeliveryTime());
}
833,8 → 863,8
else
message.setDeliveryCount(message.getDeliveryCount()-1);
if (reDeliveryDelay > 0 && (message.isRedelivered() || message.getDeliveryCount() > 0)) {
message.setDeliveryTime(System.currentTimeMillis() + reDeliveryDelay);
if (getRedeliveryDelay() > 0 && (message.isRedelivered() || message.getDeliveryCount() > 0)) {
message.setDeliveryTime(System.currentTimeMillis() + (getRedeliveryDelay() *1000L));
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Queue.DenyRequest: setDeliveryTime " + message.getDeliveryTime());
}
859,7 → 889,7
try {
if (message.getDeliveryTime() > 0) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Queue.DenyRequest: scheduleDeliveryTimeMessage " + message.getId() + ", reDeliveryDelay = " + reDeliveryDelay);
logger.log(BasicLevel.DEBUG, "Queue.DenyRequest: scheduleDeliveryTimeMessage " + message.getId() + ", reDeliveryDelay = " + getRedeliveryDelay());
scheduleDeliveryTimeMessage(message.getMsg(), false);
deliveryTable.remove(message.getId());
} else {
1152,7 → 1182,8
 
if (sharedMsg.deliveryTime > currentTime) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Queue.doClientMessages: scheduleDeliveryTimeMessage " + sharedMsg.id + ", reDeliveryDelay = " + reDeliveryDelay);
logger.log(BasicLevel.DEBUG,
"Queue.doClientMessages: scheduleDeliveryTimeMessage " + sharedMsg.id + ", redeliveryDelay = " + getRedeliveryDelay());
scheduleDeliveryTimeMessage(sharedMsg, throwsExceptionOnFullDest);
} else {
msg.order = arrivalState.getAndIncrementArrivalCount(msg.isPersistent());
1864,7 → 1895,7
public int getEncodedSize() throws Exception {
int encodedSize = super.getEncodedSize();
encodedSize += INT_ENCODED_SIZE * 3;
encodedSize += INT_ENCODED_SIZE * 4;
encodedSize += LONG_ENCODED_SIZE;
for (ReceiveRequest request : requests) {
encodedSize += request.getEncodedSize();
1877,6 → 1908,8
encoder.encodeUnsignedInt(nbMaxMsg);
encoder.encodeUnsignedLong(nbMsgsDeniedSinceCreation);
encoder.encodeUnsignedInt(priority);
encoder.encodeSignedInt(threshold);
encoder.encodeSignedInt(redeliveryDelay);
encoder.encodeUnsignedInt(requests.size());
for (ReceiveRequest request : requests) {
request.encode(encoder);
1888,6 → 1921,8
nbMaxMsg = decoder.decodeUnsignedInt();
nbMsgsDeniedSinceCreation = decoder.decodeUnsignedLong();
priority = decoder.decodeUnsignedInt();
threshold = decoder.decodeSignedInt();
redeliveryDelay = decoder.decodeSignedInt();
int requestsSize = decoder.decodeUnsignedInt();
requests = new Vector<ReceiveRequest>(requestsSize);
for (int i = 0; i < requestsSize; i++) {