OW2 Consortium joram

Compare Revisions

Ignore whitespace Rev 6409 → Rev 6410

/trunk/joram/joram/mom/core/src/main/java/org/objectweb/joram/mom/proxies/UserAgent.java
242,26 → 242,31
this.period = period;
}
}
 
public static final String REDELIVERY_DELAY = "redeliveryDelay";
 
/**
* The re-delivery delay use to wait before re-delivering
* messages after a deny.
* The redelivery delay use to wait before re-delivering messages after a deny.
*/
private int reDeliveryDelay = 0;
private int redeliveryDelay = 0;
/**
* @return the reDeliveryDelay
* Gets the redelivery delay.
* @return the redeliveryDelay
*/
public int getReDeliveryDelay() {
return reDeliveryDelay/1000;
public int getRedeliveryDelay() {
if (redeliveryDelay == 0)
return Queue.getDefaultRedeliveryDelay();
return redeliveryDelay;
}
 
/**
* @param reDeliveryDelay the reDeliveryDelay to set
* Sets the redelivery delay.
* @param redeliveryDelay the redeliveryDelay to set
*/
public void setReDeliveryDelay(int reDeliveryDelay) {
logger.log(BasicLevel.ERROR, this + " === setReDeliveryDelay " + reDeliveryDelay);//NTA tmp
this.reDeliveryDelay = reDeliveryDelay*1000;
public void setRedeliveryDelay(int redeliveryDelay) {
// logger.log(BasicLevel.FATAL, "UserAgent.setRedeliveryDelay: " + REDELIVERY_DELAY + "=" + redeliveryDelay);
this.redeliveryDelay = redeliveryDelay;
}
 
/**
283,8 → 288,8
/**
* Threshold above which messages are considered as undeliverable because
* constantly denied.
* This value is used as default value at subscription creation.
* 0 stands for no threshold, -1 for value not set (use server' default value).
* This value is used as default value at subscription creation.
* 0 stands for no threshold, -1 for value not set (use default value).
*/
private int threshold = -1;
 
1135,10 → 1140,7
} else {
arrivalState = UserAgentArrivalState.load(ARRIVAL_STATE_PREFIX + getId().toString());
}
if (reDeliveryDelay == 0)
reDeliveryDelay = AgentServer.getInteger(AdminCommandConstant.RE_DELIVERY_DELAY, 0) * 1000;
MessageTableFactory messageTableFactory = MessageTableFactory.newFactory();
messagesTable = messageTableFactory.createMessageTable(MESSAGE_TABLE_PREFIX + getId().toString());
 
2151,21 → 2153,20
if (sub == null)
return;
 
long currentTime = System.currentTimeMillis();
Iterator<String> denies = new Vector<String>(req.getIds()).iterator();
while (denies.hasNext()) {
String msgId = (String) denies.next();
Message msg = sub.getSubMessage(msgId);
 
if (msg != null && reDeliveryDelay > 0 && req.isRedelivered()) {
msg.setDeliveryTime(currentTime + reDeliveryDelay);
if (msg != null && getRedeliveryDelay() > 0 && req.isRedelivered()) {
msg.setDeliveryTime(System.currentTimeMillis() + (getRedeliveryDelay() *1000L));
msg.setRedelivered();
//TODO: msg.setDeliveryCount(deliveryCount);
List<String> ids = new ArrayList<>();
ids.add(req.getTarget());
scheduleDeliveryTimeMessage(sub.getTopicId(), msg.getMsg(), ids);
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "UserAgent.doReact SessDenyRequest: scheduleDeliveryTimeMessage " + msg.getId() + ", reDeliveryDelay = " + reDeliveryDelay);
logger.log(BasicLevel.DEBUG, "UserAgent.doReact SessDenyRequest: scheduleDeliveryTimeMessage " + msg.getId() + ", reDeliveryDelay = " + getRedeliveryDelay());
req.getIds().remove(msgId);
}
}
2236,18 → 2237,17
Vector<String> ids = new Vector<String>();
ids.add(msgId);
 
if (reDeliveryDelay > 0 && req.isRedelivered()) {
if (getRedeliveryDelay() > 0 && req.isRedelivered()) {
Message msg = sub.getSubMessage(msgId);
if (msg != null) {
List<String> traget = new ArrayList<>();
traget.add(req.getTarget());
long currentTime = System.currentTimeMillis();
msg.setDeliveryTime(currentTime + reDeliveryDelay);
msg.setRedelivered();
//TODO: msg.setDeliveryCount(deliveryCount);
scheduleDeliveryTimeMessage(sub.getTopicId(), msg.getMsg(), traget);
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "UserAgent.doReact ConsumerDenyRequest: scheduleDeliveryTimeMessage " + msg.getId() + ", reDeliveryDelay = " + reDeliveryDelay);
List<String> target = new ArrayList<>();
target.add(req.getTarget());
msg.setDeliveryTime(System.currentTimeMillis() + (getRedeliveryDelay() *1000L));
msg.setRedelivered();
//TODO: msg.setDeliveryCount(deliveryCount);
scheduleDeliveryTimeMessage(sub.getTopicId(), msg.getMsg(), target);
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "UserAgent.doReact ConsumerDenyRequest: scheduleDeliveryTimeMessage " + msg.getId() + ", reDeliveryDelay = " + getRedeliveryDelay());
} else {
sub.deny(ids.iterator(), req.isRedelivered());
}
4149,6 → 4149,8
}
 
res += INT_ENCODED_SIZE;
res += INT_ENCODED_SIZE;
res += EncodableHelper.getEncodedSize(subsClientIDs);
return res;
}
4203,7 → 4205,9
}
}
 
encoder.encodeUnsignedInt(threshold);
encoder.encodeSignedInt(threshold);
encoder.encodeSignedInt(redeliveryDelay);
EncodableHelper.encodeProperties(subsClientIDs, encoder);
}
 
4261,7 → 4265,9
}
}
 
threshold = decoder.decodeUnsignedInt();
threshold = decoder.decodeSignedInt();
redeliveryDelay = decoder.decodeSignedInt();
subsClientIDs= EncodableHelper.decodeProperties(decoder);
}