OW2 Consortium joram

Compare Revisions

Ignore whitespace Rev 6415 → Rev 6416

/trunk/joram/joram/mom/core/src/main/java/org/objectweb/joram/mom/notifications/TopicDeliveryTimeNot.java
1,6 → 1,6
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2013 ScalAgent Distributed Technologies
* Copyright (C) 2013 - 2016 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
22,10 → 22,8
*/
package org.objectweb.joram.mom.notifications;
 
import java.util.List;
import org.objectweb.joram.mom.messages.Message;
 
import org.objectweb.joram.shared.messages.Message;
 
import fr.dyade.aaa.agent.AgentId;
import fr.dyade.aaa.agent.Notification;
 
36,13 → 34,13
private static final long serialVersionUID = 1L;
public Message msg;
public List<String> subNames;
public AgentId topic;
public String subName;
public TopicDeliveryTimeNot(Message msg, List<String> subNames, AgentId topic) {
public TopicDeliveryTimeNot(Message msg, AgentId topic, String subName) {
this.msg = msg;
this.subNames = subNames;
this.topic = topic;
this.subName = subName;
}
}
/trunk/joram/joram/mom/core/src/main/java/org/objectweb/joram/mom/notifications/QueueDeliveryTimeNot.java
1,6 → 1,6
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2013 ScalAgent Distributed Technologies
* Copyright (C) 2013 - 2016 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
22,7 → 22,7
*/
package org.objectweb.joram.mom.notifications;
 
import org.objectweb.joram.shared.messages.Message;
import org.objectweb.joram.mom.messages.Message;
 
import fr.dyade.aaa.agent.Notification;
 
33,7 → 33,7
private static final long serialVersionUID = 1L;
public Message msg;
public boolean throwsExceptionOnFullDest;
public boolean throwsExceptionOnFullDest = false;
public QueueDeliveryTimeNot(Message msg, boolean throwsExceptionOnFullDest) {
this.msg = msg;
/trunk/joram/joram/mom/core/src/main/java/org/objectweb/joram/mom/proxies/UserAgent.java
34,7 → 34,6
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Hashtable;
83,7 → 82,6
import org.objectweb.joram.mom.util.MessageInterceptor;
import org.objectweb.joram.mom.util.MessageTable;
import org.objectweb.joram.mom.util.MessageTableFactory;
import org.objectweb.joram.mom.util.TopicDeliveryTimeTask;
import org.objectweb.joram.shared.DestinationConstants;
import org.objectweb.joram.shared.MessageErrorConstants;
import org.objectweb.joram.shared.admin.AdminCommandConstant;
157,9 → 155,6
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;
 
import com.scalagent.scheduler.ScheduleEvent;
import com.scalagent.scheduler.Scheduler;
 
import fr.dyade.aaa.agent.Agent;
import fr.dyade.aaa.agent.AgentId;
import fr.dyade.aaa.agent.AgentServer;
430,8 → 425,6
 
private transient WakeUpTask cleaningTask;
 
protected transient Scheduler deliveryScheduler = null;
 
/**
* Used by the Encodable framework
*/
1254,7 → 1247,7
 
// Retrieving the subscriptions' messages.
List messages = Message.loadAll(getMsgTxname());
 
if (subsTable.isEmpty()) {
// it is possible because we always save MessageSoftRef
// so we must delete all message.
2162,9 → 2155,7
msg.setDeliveryTime(System.currentTimeMillis() + (getRedeliveryDelay() *1000L));
msg.setRedelivered();
//TODO: msg.setDeliveryCount(deliveryCount);
List<String> ids = new ArrayList<>();
ids.add(req.getTarget());
scheduleDeliveryTimeMessage(sub.getTopicId(), msg.getMsg(), ids);
sub.scheduleDeliveryTimeMessage(msg);
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "UserAgent.doReact SessDenyRequest: scheduleDeliveryTimeMessage " + msg.getId() + ", reDeliveryDelay = " + getRedeliveryDelay());
req.getIds().remove(msgId);
2240,12 → 2231,10
if (getRedeliveryDelay() > 0 && req.isRedelivered()) {
Message msg = sub.getSubMessage(msgId);
if (msg != null) {
List<String> target = new ArrayList<>();
target.add(req.getTarget());
msg.setDeliveryTime(System.currentTimeMillis() + (getRedeliveryDelay() *1000L));
msg.setRedelivered();
//TODO: msg.setDeliveryCount(deliveryCount);
scheduleDeliveryTimeMessage(sub.getTopicId(), msg.getMsg(), target);
sub.scheduleDeliveryTimeMessage(msg);
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "UserAgent.doReact ConsumerDenyRequest: scheduleDeliveryTimeMessage " + msg.getId() + ", reDeliveryDelay = " + getRedeliveryDelay());
} else {
2929,7 → 2918,6
 
String subName;
ClientSubscription sub;
List<String> delaySubNames = null;
long currentTime = System.currentTimeMillis();
// AF: TODO we should parse each message for each subscription
// see ClientSubscription.browseNewMessages
2939,13 → 2927,19
(org.objectweb.joram.shared.messages.Message) msgs.next();
Message message = new Message(sharedMsg);
if (sharedMsg.deliveryTime > currentTime) {
if (delaySubNames == null) {
delaySubNames = new ArrayList<String>();
for (Iterator subNames = tSub.getNames(); subNames.hasNext();) {
delaySubNames.add((String) subNames.next());
boolean isDurable = false;
for (Iterator subNames = tSub.getNames(); subNames.hasNext();) {
subName = (String) subNames.next();
sub = (ClientSubscription) subsTable.get(subName);
isDurable |= sub.getDurable();
if (isDurable) {
sub.scheduleDeliveryTimeMessage(message);
}
}
scheduleDeliveryTimeMessage(from, sharedMsg, delaySubNames);
// Setting the arrival order of the messages
// message.order = arrivalState.getAndIncrementArrivalCount();
if (isDurable)
persistDeliveryTimeMessage(message);
} else {
// Setting the arrival order of the messages
message.order = arrivalState.getAndIncrementArrivalCount();
2961,7 → 2955,7
// Browsing the delivered messages.
sub.browseNewMessages(messages);
}
 
// Save message if it is delivered to a durable subscription.
for (Iterator msgs = messages.iterator(); msgs.hasNext();) {
Message message = (Message) msgs.next();
3029,93 → 3023,80
messagesTable.checkConsumedMemory();
}
 
void scheduleDeliveryTimeMessage(AgentId from, org.objectweb.joram.shared.messages.Message msg, List<String> subNames) {
void persistDeliveryTimeMessage(Message msg) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "UserAgent.scheduleDeliveryTimeMessage(" + msg + ", " + subNames + ')');
 
if (deliveryScheduler == null) {
try {
deliveryScheduler = new Scheduler(AgentServer.getTimer());
} catch (Exception exc) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR, "UserAgent.scheduleDeliveryTimeMessage", exc);
}
logger.log(BasicLevel.DEBUG, "UserAgent.persistDeliveryTimeMessage(" + msg + ')');
// Save message if it is delivered to a durable subscription.
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, " -> save message " + msg);
if (msg.isPersistent()) {
// Persisting the message.
setMsgTxName(msg);
msg.save();
msg.releaseFullMessage();
}
// schedule a task
try {
deliveryScheduler.scheduleEvent(
new ScheduleEvent(msg.id,
new Date(msg.deliveryTime)),
new TopicDeliveryTimeTask(getId(),
from,
msg,
subNames));
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR, "UserAgent.scheduleDeliveryTimeMessage(" + msg + ')', e);
}
}
 
private void doReact(TopicDeliveryTimeNot not) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "UserAgent.doReact(" + not + ')');
 
// Browsing the target subscriptions:
TopicSubscription tSub = (TopicSubscription) topicsTable.get(not.topic);
if (tSub == null || tSub.isEmpty()) return;
String subName = not.subName;
ClientSubscription sub = (ClientSubscription) subsTable.get(subName);
if (sub == null) return;
 
Message momMsg = new Message(not.msg);
Message momMsg = not.msg;
List<Message> messages = new ArrayList<Message>();
messages.add(momMsg);
// remove sub.messagesTimeIds.remove()
sub.removeMessagesTimeIds(momMsg.getId());
 
String subName;
ClientSubscription sub;
for (Iterator names = not.subNames.iterator(); names.hasNext();) {
subName = (String) names.next();
sub = (ClientSubscription) subsTable.get(subName);
if (sub == null) continue;
// Browsing the delivered messages.
sub.browseNewMessages(messages);
 
// Browsing the delivered messages.
sub.browseNewMessages(messages);
}
// If the subscription is active, launching a delivery sequence.
if (sub.getActive() > 0) {
ConsumerMessages consM = sub.deliver();
 
// Save message if it is delivered to a durable subscription.
Message message = momMsg;
if (message.durableAcksCounter > 0) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, " -> save message " + message);
// TODO (AF): The message saving does it need the proxy saving ?
if (message.isPersistent()) {
setSave();
// Persisting the message.
setMsgTxName(message);
message.save();
message.releaseFullMessage();
}
}
 
for (Iterator names = not.subNames.iterator(); names.hasNext();) {
subName = (String) names.next();
sub = (ClientSubscription) subsTable.get(subName);
if (sub == null) continue;
 
// If the subscription is active, launching a delivery sequence.
if (sub.getActive() > 0) {
ConsumerMessages consM = sub.deliver();
 
if (consM != null) {
try {
setCtx(sub.getContextId());
if (activeCtx.getActivated())
doReply(consM);
else
activeCtx.addPendingDelivery(consM);
} catch (StateException pE) {
// The context is lost: nothing to do.
if (consM != null) {
try {
int ctxId = sub.getContextId();
SharedCtx sharedCtx = sharedSubs.get(subName);
if (sharedCtx != null) {
if(logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Subscription "+ subName + ", sharedCtx = " + sharedCtx);
int i = 0;
do {
// if shared, used the next contextId
Iterator<Entry<Integer, Integer>> it = sharedCtx.entrySet().iterator();
Entry<Integer, Integer> entry = it.next();
ClientContext ctx = (ClientContext) contexts.get(new Integer(entry.getKey()));
if (ctx.getActivated()) {
ctxId = ctx.getId();
if(logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Subscription "+ subName + ", ctxId = " + ctxId);
sharedCtx.get(ctxId);//update LRU
break;
}
i++;
} while (sharedCtx.size() < i);
}
setCtx(ctxId);
if (activeCtx.getActivated())
doReply(consM);
else
activeCtx.addPendingDelivery(consM);
} catch (StateException pE) {
// The context is lost: nothing to do.
}
}
}
} else if(logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Subscription " + sub + " is not active");
 
messagesTable.checkConsumedMemory();
}
 
/**
/trunk/joram/joram/mom/core/src/main/java/org/objectweb/joram/mom/proxies/ClientSubscription.java
24,6 → 24,7
 
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.Iterator;
43,8 → 44,8
import org.objectweb.joram.mom.util.MessageIdList;
import org.objectweb.joram.mom.util.MessageIdListFactory;
import org.objectweb.joram.mom.util.MessageTable;
import org.objectweb.joram.mom.util.TopicDeliveryTimeTask;
import org.objectweb.joram.shared.MessageErrorConstants;
import org.objectweb.joram.shared.admin.AdminCommandConstant;
import org.objectweb.joram.shared.client.ConsumerMessages;
import org.objectweb.joram.shared.excepts.RequestException;
import org.objectweb.joram.shared.selectors.Selector;
72,6 → 73,7
public static Logger logger = Debug.getLogger(ClientSubscription.class.getName());
public static final String MESSAGE_ID_LIST_PREFIX = "MIL_";
public static final String MESSAGE_TIME_ID_LIST_PREFIX = "MTIL_";
/** The proxy's agent identifier. */
private AgentId proxyId;
135,8 → 137,10
this.nbMaxMsg = nbMaxMsg;
}
/** Vector of identifiers of the messages to deliver. */
/** List of identifiers of the messages to deliver. */
private transient MessageIdList messageIds;
/** List of identifiers of the messages delivery time. */
private transient MessageIdList messageTimeIds;
/** Table of delivered messages identifiers. */
private Map<String, String> deliveredIds;
/** Table keeping the denied messages identifiers. */
240,6 → 244,7
 
// initialized in a separate method
messageIds = null;
messageTimeIds = null;
deliveredIds = new Hashtable<String, String>();
deniedMsgs = new Hashtable();
344,6 → 349,15
}
/**
* Returns the number of pending delivery time messages for the subscription.
*
* @return The number of pending delivery time message for the subscription.
*/
public int getPendingDeliveryTimeMessageCount() {
return messageTimeIds.size();
}
/**
* Returns the number of messages delivered and waiting for acknowledge.
*
* @return The number of messages delivered and waiting for acknowledge.
379,7 → 393,7
logger.log(BasicLevel.DEBUG, "ClientSubscription[" + this + "].reinitialize()");
this.messagesTable = messagesTable;
 
// Browsing the persisted messages.
Message message;
String msgId;
387,6 → 401,18
message = (Message) e.next();
msgId = message.getId();
 
// delivery time message
if (messageTimeIds.contains(msgId)) {
if (message.getDeliveryTime() < System.currentTimeMillis()) {
// delivery time expire
messageIds.add(msgId, message.isPersistent());
messageTimeIds.remove(msgId);
} else {
// schedule a task
AgentServer.getTimer().schedule(new TopicDeliveryTimeTask(proxyId, topicId, name, message), new Date(message.getDeliveryTime()));
}
}
if (messageIds.contains(msgId) || deliveredIds.containsKey(msgId)) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, " -> contains message " + msgId);
540,6 → 566,37
}
/**
* schedule the delivery time message.
*
* @param msgId the message Id
*/
void scheduleDeliveryTimeMessage(Message message) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, this + ".scheduleDeliveryTimeMessage(" + message + ')');
messageTimeIds.add(message.getId(), true);
// schedule a task
AgentServer.getTimer().schedule(new TopicDeliveryTimeTask(proxyId, topicId, name, message), new Date(message.getDeliveryTime()));
try {
messageTimeIds.save();
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR, "ERROR::scheduleDeliveryTimeMessage", e);
}
}
void removeMessagesTimeIds(String msgId) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, this + ".removeMessagesTimeIds(" + msgId + ')');
messageTimeIds.remove(msgId);
try {
messageTimeIds.save();
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR, "ERROR::removeMessagesTimeIds", e);
}
}
/**
* Browses messages and keeps those which will have to be delivered
* to the subscriber.
*/
557,14 → 614,14
msgId = message.getId();
 
// test nbMaxMsg
if (nbMaxMsg > 0 && nbMaxMsg <= messageIds.size()) {
if (nbMaxMsg > 0 && nbMaxMsg <= (messageIds.size() + messageTimeIds.size())) {
if (dmqManager == null)
dmqManager = new DMQManager(dmqId, null);
nbMsgsSentToDMQSinceCreation++;
dmqManager.addDeadMessage(message.getFullMessage(), MessageErrorConstants.QUEUE_FULL);
continue;
}
 
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, this + ".browseNewMessages message.getClientID() = " + message.getClientID() + ", clientID = " + clientID + ", noLocal = " + noLocal);
1236,6 → 1293,7
// it would encode the message id list several times.
messageIds.save();
messageTimeIds.save();
} catch (Exception exc) {
logger.log(BasicLevel.ERROR, "ClientSubscription named [" + txName
+ "] could not be saved", exc);
1247,16 → 1305,19
AgentServer.getTransaction().delete(getTxName());
messageIds.delete();
messageTimeIds.delete();
}
public void initMessageIds() throws Exception {
MessageIdListFactory messageIdListFactory = MessageIdListFactory.newFactory();
messageIds = messageIdListFactory.createMessageIdList(MESSAGE_ID_LIST_PREFIX + getTxName());
messageTimeIds = messageIdListFactory.createMessageIdList(MESSAGE_TIME_ID_LIST_PREFIX + getTxName());
}
public void loadMessageIds() throws Exception {
MessageIdListFactory messageIdListFactory = MessageIdListFactory.newFactory();
messageIds = messageIdListFactory.loadMessageIdList(MESSAGE_ID_LIST_PREFIX + getTxName());
messageTimeIds = messageIdListFactory.loadMessageIdList(MESSAGE_TIME_ID_LIST_PREFIX + getTxName());
}
private void setModified() {
/trunk/joram/joram/mom/core/src/main/java/org/objectweb/joram/mom/util/QueueDeliveryTimeTask.java
1,6 → 1,6
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2013 ScalAgent Distributed Technologies
* Copyright (C) 2013 - 2016 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
22,18 → 22,18
*/
package org.objectweb.joram.mom.util;
 
import java.util.TimerTask;
 
import org.objectweb.joram.mom.messages.Message;
import org.objectweb.joram.mom.notifications.QueueDeliveryTimeNot;
import org.objectweb.joram.shared.messages.Message;
 
import com.scalagent.scheduler.ScheduleTask;
 
import fr.dyade.aaa.agent.AgentId;
import fr.dyade.aaa.agent.Channel;
 
/**
* Task sending a QueueDeliveryTimeNot to a Queue destination.
* Timer Task sending a QueueDeliveryTimeNot to a Queue destination.
*/
public class QueueDeliveryTimeTask implements ScheduleTask {
public class QueueDeliveryTimeTask extends TimerTask {
private static final long serialVersionUID = 1L;
private AgentId destId = null;
/trunk/joram/joram/mom/core/src/main/java/org/objectweb/joram/mom/util/TopicDeliveryTimeTask.java
1,6 → 1,6
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2013 ScalAgent Distributed Technologies
* Copyright (C) 2013 - 2016 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
22,41 → 22,37
*/
package org.objectweb.joram.mom.util;
 
import java.util.List;
import java.util.TimerTask;
 
import org.objectweb.joram.mom.messages.Message;
import org.objectweb.joram.mom.notifications.TopicDeliveryTimeNot;
import org.objectweb.joram.shared.messages.Message;
 
import com.scalagent.scheduler.ScheduleTask;
 
import fr.dyade.aaa.agent.AgentId;
import fr.dyade.aaa.agent.Channel;
 
/**
* Task sending a TopicDeliveryTimeNot to a UserAgent.
*/
public class TopicDeliveryTimeTask implements ScheduleTask {
public class TopicDeliveryTimeTask extends TimerTask {
private static final long serialVersionUID = 1L;
private AgentId destId = null;
private Message msg = null;
private List<String> subNames = null;
private AgentId topic = null;
private String subName = null;
public TopicDeliveryTimeTask(AgentId destId, AgentId topic, Message msg, List<String> subNames) {
public TopicDeliveryTimeTask(AgentId destId, AgentId topic, String subName, Message msg) {
this.destId = destId;
this.msg = msg;
this.subNames = subNames;
this.topic= topic;
this.subName = subName;
}
/**
* Task to execute: send a TopicDeliveryTimeNot to the related UserAgent.
*
* @see com.scalagent.scheduler.ScheduleTask#run()
*/
public void run() {
Channel.sendTo(destId, new TopicDeliveryTimeNot(msg, subNames, topic));
Channel.sendTo(destId, new TopicDeliveryTimeNot(msg, topic, subName));
}
 
}
/trunk/joram/joram/mom/core/src/main/java/org/objectweb/joram/mom/dest/Queue.java
29,8 → 29,8
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Map.Entry;
import java.util.Vector;
 
import javax.management.openmbean.CompositeData;
83,9 → 83,6
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;
 
import com.scalagent.scheduler.ScheduleEvent;
import com.scalagent.scheduler.Scheduler;
 
import fr.dyade.aaa.agent.AgentId;
import fr.dyade.aaa.agent.AgentServer;
import fr.dyade.aaa.agent.Channel;
111,6 → 108,7
public static Logger logmsg = Debug.getLogger(Queue.class.getName() + ".Message");
public static final String DELIVERY_TABLE_PREFIX = "DT_";
public static final String DELIVERY_TIME_TABLE_PREFIX = "DTT_";
public static final String ARRIVAL_STATE_PREFIX = "AS_";
 
/** Static value holding the default DMQ identifier for a server. */
203,7 → 201,8
/** List holding the requests before reply or expiry. */
protected List<ReceiveRequest> requests = new Vector();
protected transient Scheduler deliveryScheduler = null;
/** Table keeping the message deliveries */
protected transient QueueDeliveryTable deliveryTimeTable;
public Queue() {
super();
259,7 → 258,7
else if (not instanceof ExpiredNot)
handleExpiredNot(from, (ExpiredNot) not);
else if (not instanceof QueueDeliveryTimeNot)
doStoreMessageAfterDeliveryTime(from, (QueueDeliveryTimeNot) not);
processDeliveryTime(from, (QueueDeliveryTimeNot) not);
else
super.react(from, not);
 
279,6 → 278,7
super.agentSave();
arrivalState.save();
deliveryTable.save();
deliveryTimeTable.save();
}
 
/**
447,6 → 447,18
return 0;
}
/**
* Returns the number of messages delivery time.
*
* @return The number of messages delivery time.
*/
public final int getDeliveryTimeMessageCount() {
if (deliveryTimeTable != null) {
return deliveryTimeTable.size();
}
return 0;
}
protected long nbMsgsDeniedSinceCreation = 0;
/**
507,13 → 519,16
 
String arrivalStateTxName = ARRIVAL_STATE_PREFIX + getId().toString();
String deliveryTableTxName = DELIVERY_TABLE_PREFIX + getId().toString();
String deliveryTimeTableTxName = DELIVERY_TIME_TABLE_PREFIX + getId().toString();
if (firstTime) {
arrivalState = new QueueArrivalState(arrivalStateTxName);
deliveryTable = new QueueDeliveryTable(deliveryTableTxName);
deliveryTimeTable = new QueueDeliveryTable(deliveryTimeTableTxName);
return;
} else {
arrivalState = QueueArrivalState.load(arrivalStateTxName);
deliveryTable = QueueDeliveryTable.load(deliveryTableTxName);
deliveryTimeTable = new QueueDeliveryTable(deliveryTimeTableTxName);
}
 
// Retrieving the persisted messages, if any.
530,6 → 545,25
if (logmsg.isLoggable(BasicLevel.INFO))
logmsg.log(BasicLevel.INFO, getName() + ": retrieves message " + persistedMsg.getId());
 
QueueDelivery queueDeliveryTime = deliveryTimeTable.get(persistedMsg.getId());
if (queueDelivery == null && queueDeliveryTime != null) {
if (persistedMsg.getDeliveryTime() > System.currentTimeMillis()) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, getName() + ": schedule " + persistedMsg.getClientID());
AgentServer.getTimer().schedule(new QueueDeliveryTimeTask(getId(), persistedMsg, false),
new Date(persistedMsg.getDeliveryTime()));
} else {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, getName() + ": delay expire, Adds a message " +
persistedMsg.getClientID() + " in the list of messages to deliver.");
// Adds a message in the list of messages to deliver.
addMessage(persistedMsg, false);
//TODO: delete persistedMsg if the queue is full ?
//persistedMsg.order = arrivalState.getAndIncrementArrivalCount(msg.isPersistent());
}
continue;
}
try {
if (queueDelivery == null) {
if (!addMessage(persistedMsg, false)) {
539,8 → 573,7
queueDelivery.setMessage(persistedMsg);
if (isLocal(queueDelivery.getConsumerId())) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger
.log(BasicLevel.DEBUG, " -> deny " + persistedMsg.getId());
logger.log(BasicLevel.DEBUG, " -> deny " + persistedMsg.getId());
deliveryTable.remove(persistedMsg.getId());
if (!addMessage(persistedMsg, false)) {
persistedMsg.delete();
890,8 → 923,7
if (message.getDeliveryTime() > 0) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Queue.DenyRequest: scheduleDeliveryTimeMessage " + message.getId() + ", reDeliveryDelay = " + getRedeliveryDelay());
scheduleDeliveryTimeMessage(message.getMsg(), false);
deliveryTable.remove(message.getId());
addDeliveryTimeMessage(message, not.getClientContext(), false, true);
} else {
// Else, putting the message back into the deliverables list:
storeMessageHeader(message, false);
1179,14 → 1211,13
 
org.objectweb.joram.shared.messages.Message sharedMsg = (org.objectweb.joram.shared.messages.Message) msgs.next();
msg = new Message(sharedMsg);
 
msg.order = arrivalState.getAndIncrementArrivalCount(msg.isPersistent());
if (sharedMsg.deliveryTime > currentTime) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG,
"Queue.doClientMessages: scheduleDeliveryTimeMessage " + sharedMsg.id + ", redeliveryDelay = " + getRedeliveryDelay());
scheduleDeliveryTimeMessage(sharedMsg, throwsExceptionOnFullDest);
logger.log(BasicLevel.DEBUG, "Queue.doClientMessages: deliveryTimeTable.put " + msg + ')');
addDeliveryTimeMessage(msg, not.getClientContext(), throwsExceptionOnFullDest, false);
} else {
msg.order = arrivalState.getAndIncrementArrivalCount(msg.isPersistent());
storeMessage(msg, throwsExceptionOnFullDest);
// ArrivalState is saved outside the Queue agent
1204,46 → 1235,59
receiving = false;
}
 
void scheduleDeliveryTimeMessage(org.objectweb.joram.shared.messages.Message msg, boolean throwsExceptionOnFullDest) {
void addDeliveryTimeMessage(Message msg, int clientCtx, boolean throwsExceptionOnFullDest, boolean isHeader) throws AccessException {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Queue.scheduleDeliveryTimeMessage(" + msg + ", " + throwsExceptionOnFullDest + ')');
 
if (deliveryScheduler == null) {
try {
deliveryScheduler = new Scheduler(AgentServer.getTimer());
} catch (Exception exc) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR, "Queue.scheduleDeliveryTimeMessage", exc);
logger.log(BasicLevel.DEBUG, "Queue.addDeliveryTimeMessage(" + msg + ", " + clientCtx + ')');
// queue is full
if (nbMaxMsg > -1 && nbMaxMsg <= (messages.size() + deliveryTable.size() + deliveryTimeTable.size())) {
if (throwsExceptionOnFullDest && isSyncExceptionOnFullDest()) {
if (logger.isLoggable(BasicLevel.INFO))
logger.log(BasicLevel.INFO, "addDeliveryTimeMessage " + msg.getId() + " throws Exception: The queue \"" + getName() + "\" is full (syncExceptionOnFullDest).");
throw new AccessException("The queue \"" + getName() + "\" is full.");
}
DMQManager dmqManager = new DMQManager(dmqId, getId());
nbMsgsSentToDMQSinceCreation++;
dmqManager.addDeadMessage(msg.getFullMessage(), MessageErrorConstants.QUEUE_FULL);
dmqManager.sendToDMQ();
return;
}
// schedule a task
try {
deliveryScheduler.scheduleEvent(new ScheduleEvent(msg.id, new Date(msg.deliveryTime)),
new QueueDeliveryTimeTask(getId(), msg, throwsExceptionOnFullDest));
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR, "Queue.scheduleDeliveryTimeMessage(" + msg + ')', e);
// add to the delivery time table
deliveryTimeTable.put(msg.getId(), new QueueDelivery(getId(), clientCtx, msg));
if (isHeader) {
msg.saveHeader();
msg.releaseFullMessage();
} else {
if (msg.isPersistent()) {
// Persisting the message.
setMsgTxName(msg);
msg.save();
msg.releaseFullMessage();
}
}
//schedule
AgentServer.getTimer().schedule(new QueueDeliveryTimeTask(getId(), msg, throwsExceptionOnFullDest),new Date(msg.getDeliveryTime()));
}
void doStoreMessageAfterDeliveryTime(AgentId from, QueueDeliveryTimeNot not) throws AccessException {
void processDeliveryTime(AgentId from, QueueDeliveryTimeNot not) throws AccessException {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Queue.doStoreMessageAfterDeliveryTime(" + from + ", " + not + ')');
logger.log(BasicLevel.DEBUG, "Queue.processDeliveryTime(" + from + ", " + not + ')');
 
if (not.msg == null) return;
org.objectweb.joram.shared.messages.Message sharedMsg = not.msg;
Message msg = new Message(sharedMsg);
msg.order = arrivalState.getAndIncrementArrivalCount(msg.isPersistent());
storeMessage(msg, not.throwsExceptionOnFullDest);
// ArrivalState is saved outside the Queue agent
// if (msg.isPersistent()) setSave();
// Adds a message in the list of messages to deliver.
addMessage(not.msg, not.throwsExceptionOnFullDest);
//msg.order = arrivalState.getAndIncrementArrivalCount(msg.isPersistent());
// Remove msgId to the deliveryTimeTable
deliveryTimeTable.remove(not.msg.getId());
 
// Launching a delivery sequence:
deliverMessages(0);
 
ClientMessages clientMsgs = new ClientMessages();
clientMsgs.addMessage(sharedMsg);
clientMsgs.addMessage(not.msg.getMsg());
if (clientMsgs != null)
postProcess(clientMsgs);
}
1446,7 → 1490,7
*/
protected final boolean addMessage(Message message, boolean throwsExceptionOnFullDest) throws AccessException {
 
if (nbMaxMsg > -1 && nbMaxMsg <= (messages.size() + deliveryTable.size())) {
if (nbMaxMsg > -1 && nbMaxMsg <= (messages.size() + deliveryTable.size() + deliveryTimeTable.size())) {
if (throwsExceptionOnFullDest && isSyncExceptionOnFullDest()) {
if (logger.isLoggable(BasicLevel.INFO))