OW2 Consortium joram

Compare Revisions

Ignore whitespace Rev 6394 → Rev 6395

/trunk/joram/joram/shared/src/main/java/org/objectweb/joram/shared/admin/AdminCommandConstant.java
1,6 → 1,6
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2010 - 2013 ScalAgent Distributed Technologies
* Copyright (C) 2010 - 2016 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
80,4 → 80,6
 
/** Used by AdminTopic to invoke a static method */
public static final String INVOKE_METHOD_RESULT = "jms_joram_invoke_result";
public static final String RE_DELIVERY_DELAY = "jms_joram_re_delivery_delay";
}
/trunk/joram/joram/mom/core/src/main/java/org/objectweb/joram/mom/messages/Message.java
1,6 → 1,6
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2006 - 2013 ScalAgent Distributed Technologies
* Copyright (C) 2006 - 2016 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
276,6 → 276,16
msg.redelivered = true;
}
/** Sets the message delivery time */
public void setDeliveryTime(long deliveryTime) {
msg.deliveryTime = deliveryTime;
}
/** Returns the message delivery time*/
public long getDeliveryTime() {
return msg.deliveryTime;
}
public long getOrder() {
return order;
}
299,7 → 309,7
public void incDurableAcksCounter() {
this.durableAcksCounter++;
}
 
public org.objectweb.joram.shared.messages.Message getMsg() {
return msg;
}
/trunk/joram/joram/mom/core/src/main/java/org/objectweb/joram/mom/proxies/UserAgentMBean.java
1,6 → 1,6
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2004 - 2010 ScalAgent Distributed Technologies
* Copyright (C) 2004 - 2016 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
115,4 → 115,14
*/
int getMessageTableConsumedMemory();
/**
* Return the reDeliveryDelay (unit: second)
*/
int getReDeliveryDelay();
 
/**
* @param reDeliveryDelay the reDeliveryDelay to set(unit: second)
*/
void setReDeliveryDelay(int reDeliveryDelay);
}
/trunk/joram/joram/mom/core/src/main/java/org/objectweb/joram/mom/proxies/ClientSubscription.java
1,6 → 1,6
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2003 - 2013 ScalAgent Distributed Technologies
* Copyright (C) 2003 - 2016 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
22,7 → 22,6
*/
package org.objectweb.joram.mom.proxies;
 
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
45,6 → 44,7
import org.objectweb.joram.mom.util.MessageIdListFactory;
import org.objectweb.joram.mom.util.MessageTable;
import org.objectweb.joram.shared.MessageErrorConstants;
import org.objectweb.joram.shared.admin.AdminCommandConstant;
import org.objectweb.joram.shared.client.ConsumerMessages;
import org.objectweb.joram.shared.excepts.RequestException;
import org.objectweb.joram.shared.selectors.Selector;
134,7 → 134,7
public void setNbMaxMsg(int nbMaxMsg) {
this.nbMaxMsg = nbMaxMsg;
}
 
/** Vector of identifiers of the messages to deliver. */
private transient MessageIdList messageIds;
/** Table of delivered messages identifiers. */
609,8 → 609,10
if (requestId == -1)
return null;
 
long currentTime = System.currentTimeMillis();
// Returning null if a "receive" request has expired:
if (!toListener && requestExpTime > 0 && System.currentTimeMillis() >= requestExpTime) {
if (!toListener && requestExpTime > 0 && currentTime >= requestExpTime) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, this + ": receive request " + requestId + " expired.");
requestId = -1;
650,7 → 652,7
 
if (message != null) {
// Message still exists.
if (message.isValid(System.currentTimeMillis())) {
if (message.isValid(currentTime)) {
// Delivering it if valid.
deliveredIds.put(id, id);
 
662,7 → 664,7
message.setDeliveryCount(deliveryAttempts.intValue() +1);
message.setRedelivered();
}
 
// Inserting it according to its priority.
if (lastPrior == -1 || message.getPriority() == lastPrior)
insertionIndex++;
782,6 → 784,7
keptMsg.setDeliveryCount(deliveryAttempts.intValue() +1);
keptMsg.setRedelivered();
}
deliverables.add(keptMsg.getFullMessage().clone());
 
if (logger.isLoggable(BasicLevel.DEBUG))
860,7 → 863,7
*/
private void deny(Iterator<String> denies, boolean remove, boolean redelivered) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, this + ".deny(" + denies + ')');
logger.log(BasicLevel.DEBUG, this + ".deny(" + denies + ')');
String id;
Message message;
int deliveryAttempts = 1;
916,7 → 919,7
while (i < messageIds.size()) {
currentId = (String) messageIds.get(i);
Message currentMessage = (Message) messagesTable.get(currentId);
 
// Message may be null if it is not valid anymore
if (currentMessage != null) {
currentO = currentMessage.order;
994,6 → 997,12
return (Message) messagesTable.get(msgId);
}
Message getSubMessage(String msgId) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "ClientSubscription.getSubMessage(" + msgId + ')');
return (Message) messagesTable.get(msgId);
}
/**
* Returns the description of a particular pending message. The message is
* pointed out through its unique identifier.
/trunk/joram/joram/mom/core/src/main/java/org/objectweb/joram/mom/proxies/UserAgent.java
1,6 → 1,6
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2001 - 2015 ScalAgent Distributed Technologies
* Copyright (C) 2001 - 2016 ScalAgent Distributed Technologies
* Copyright (C) 2004 France Telecom R&D
* Copyright (C) 2003 - 2004 Bull SA
* Copyright (C) 1996 - 2000 Dyade
63,11 → 63,11
import org.objectweb.joram.mom.notifications.BrowseReply;
import org.objectweb.joram.mom.notifications.BrowseRequest;
import org.objectweb.joram.mom.notifications.ClientMessages;
import org.objectweb.joram.mom.notifications.ClientSubscriptionNot;
import org.objectweb.joram.mom.notifications.DenyRequest;
import org.objectweb.joram.mom.notifications.ExceptionReply;
import org.objectweb.joram.mom.notifications.FwdAdminRequestNot;
import org.objectweb.joram.mom.notifications.GetClientSubscriptions;
import org.objectweb.joram.mom.notifications.ClientSubscriptionNot;
import org.objectweb.joram.mom.notifications.QueueMsgReply;
import org.objectweb.joram.mom.notifications.ReceiveRequest;
import org.objectweb.joram.mom.notifications.ReconnectSubscribersNot;
165,7 → 165,6
import fr.dyade.aaa.agent.AgentServer;
import fr.dyade.aaa.agent.CallbackNotification;
import fr.dyade.aaa.agent.Channel;
import fr.dyade.aaa.agent.CountDownCallback;
import fr.dyade.aaa.agent.DeleteNot;
import fr.dyade.aaa.agent.Notification;
import fr.dyade.aaa.agent.UnknownAgent;
202,7 → 201,7
/** Map contains the clientID */
private transient Map<Integer, String> clientIDs = new HashMap<Integer, String>();
 
/** period to run the cleaning task, by default 60s. */
private long period = 60000L;
 
243,8 → 242,29
this.period = period;
}
}
/**
* The re-delivery delay use to wait before re-delivering
* messages after a deny.
*/
private int reDeliveryDelay = 0;
/**
* @return the reDeliveryDelay
*/
public int getReDeliveryDelay() {
return reDeliveryDelay/1000;
}
 
/**
* @param reDeliveryDelay the reDeliveryDelay to set
*/
public void setReDeliveryDelay(int reDeliveryDelay) {
logger.log(BasicLevel.ERROR, this + " === setReDeliveryDelay " + reDeliveryDelay);//NTA tmp
this.reDeliveryDelay = reDeliveryDelay*1000;
}
 
/**
* Identifier of this proxy dead message queue, <code>null</code> for DMQ
* not set.
*/
1116,6 → 1136,9
arrivalState = UserAgentArrivalState.load(ARRIVAL_STATE_PREFIX + getId().toString());
}
if (reDeliveryDelay == 0)
reDeliveryDelay = AgentServer.getInteger(AdminCommandConstant.RE_DELIVERY_DELAY, 0) * 1000;
MessageTableFactory messageTableFactory = MessageTableFactory.newFactory();
messagesTable = messageTableFactory.createMessageTable(MESSAGE_TABLE_PREFIX + getId().toString());
 
2128,6 → 2151,25
if (sub == null)
return;
 
long currentTime = System.currentTimeMillis();
Iterator<String> denies = new Vector<String>(req.getIds()).iterator();
while (denies.hasNext()) {
String msgId = (String) denies.next();
Message msg = sub.getSubMessage(msgId);
 
if (msg != null && reDeliveryDelay > 0 && req.isRedelivered()) {
msg.setDeliveryTime(currentTime + reDeliveryDelay);
msg.setRedelivered();
//TODO: msg.setDeliveryCount(deliveryCount);
List<String> ids = new ArrayList<>();
ids.add(req.getTarget());
scheduleDeliveryTimeMessage(sub.getTopicId(), msg.getMsg(), ids);
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "UserAgent.doReact SessDenyRequest: scheduleDeliveryTimeMessage " + msg.getId() + ", reDeliveryDelay = " + reDeliveryDelay);
req.getIds().remove(msgId);
}
}
sub.deny(req.getIds().iterator(), req.isRedelivered());
 
// Launching a delivery sequence:
2190,10 → 2232,29
if (sub == null)
return;
 
String msgId = req.getId();
Vector<String> ids = new Vector<String>();
ids.add(req.getId());
sub.deny(ids.iterator(), req.isRedelivered());
ids.add(msgId);
 
if (reDeliveryDelay > 0 && req.isRedelivered()) {
Message msg = sub.getSubMessage(msgId);
if (msg != null) {
List<String> traget = new ArrayList<>();
traget.add(req.getTarget());
long currentTime = System.currentTimeMillis();
msg.setDeliveryTime(currentTime + reDeliveryDelay);
msg.setRedelivered();
//TODO: msg.setDeliveryCount(deliveryCount);
scheduleDeliveryTimeMessage(sub.getTopicId(), msg.getMsg(), traget);
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "UserAgent.doReact ConsumerDenyRequest: scheduleDeliveryTimeMessage " + msg.getId() + ", reDeliveryDelay = " + reDeliveryDelay);
} else {
sub.deny(ids.iterator(), req.isRedelivered());
}
} else {
sub.deny(ids.iterator(), req.isRedelivered());
}
 
// Launching a delivery sequence:
ConsumerMessages consM = sub.deliver();
// Delivering.
/trunk/joram/joram/mom/core/src/main/java/org/objectweb/joram/mom/dest/Destination.java
1,6 → 1,6
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2001 - 2014 ScalAgent Distributed Technologies
* Copyright (C) 2001 - 2016 ScalAgent Distributed Technologies
* Copyright (C) 1996 - 2000 Dyade
*
* This library is free software; you can redistribute it and/or
53,6 → 53,7
import org.objectweb.joram.mom.util.DMQManager;
import org.objectweb.joram.mom.util.InterceptorsHelper;
import org.objectweb.joram.mom.util.MessageInterceptor;
import org.objectweb.joram.shared.DestinationConstants;
import org.objectweb.joram.shared.MessageErrorConstants;
import org.objectweb.joram.shared.admin.AdminCommandConstant;
import org.objectweb.joram.shared.admin.AdminCommandReply;
396,6 → 397,14
interceptors = null;
}
}
// reDeliveryDelay
if (prop != null
&& prop.containsKey(AdminCommandConstant.RE_DELIVERY_DELAY)
&& getType() == DestinationConstants.QUEUE_TYPE) {
int reDeliveryDelay = ConversionHelper.toInt(prop.get(AdminCommandConstant.RE_DELIVERY_DELAY));
((Queue) this).setReDeliveryDelay(reDeliveryDelay);
}
}
 
protected boolean isLocal(AgentId id) {
/trunk/joram/joram/mom/core/src/main/java/org/objectweb/joram/mom/dest/AdminTopic.java
1,6 → 1,6
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2001 - 2015 ScalAgent Distributed Technologies
* Copyright (C) 2001 - 2016 ScalAgent Distributed Technologies
* Copyright (C) 1996 - 2000 Dyade
*
* This library is free software; you can redistribute it and/or
96,8 → 96,10
import org.objectweb.joram.shared.admin.StopServerRequest;
import org.objectweb.joram.shared.admin.UpdateUser;
import org.objectweb.joram.shared.admin.UserAdminRequest;
import org.objectweb.joram.shared.excepts.MessageValueException;
import org.objectweb.joram.shared.excepts.MomException;
import org.objectweb.joram.shared.excepts.RequestException;
import org.objectweb.joram.shared.messages.ConversionHelper;
import org.objectweb.joram.shared.messages.Message;
import org.objectweb.joram.shared.messages.MessageHelper;
import org.objectweb.joram.shared.security.Identity;
877,10 → 879,23
UserAgent proxy = new UserAgent();
proxy.setName(name);
proxId = proxy.getId();
Properties props = request.getProperties();
try {
// set interceptors.
proxy.setInterceptors(request.getProperties());
proxy.setInterceptors(props);
// set reDeliverDelay
if (props != null && props.containsKey(AdminCommandConstant.RE_DELIVERY_DELAY)) {
int reDeliveryDelay;
try {
reDeliveryDelay = ConversionHelper.toInt(props.get(AdminCommandConstant.RE_DELIVERY_DELAY));
proxy.setReDeliveryDelay(reDeliveryDelay);
} catch (MessageValueException e) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN, "EXCEPTION:: createUser [" + name + "] set the redelivery delay", e);
}
}
 
// deploy UserAgent
proxy.deploy();
901,7 → 916,7
throw new RequestException("User proxy not deployed: " + exc);
}
}
 
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, info);
 
/trunk/joram/joram/mom/core/src/main/java/org/objectweb/joram/mom/dest/Queue.java
1,6 → 1,6
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2001 - 2013 ScalAgent Distributed Technologies
* Copyright (C) 2001 - 2016 ScalAgent Distributed Technologies
* Copyright (C) 1996 - 2000 Dyade
*
* This library is free software; you can redistribute it and/or
56,6 → 56,7
import org.objectweb.joram.mom.util.QueueDeliveryTimeTask;
import org.objectweb.joram.shared.DestinationConstants;
import org.objectweb.joram.shared.MessageErrorConstants;
import org.objectweb.joram.shared.admin.AdminCommandConstant;
import org.objectweb.joram.shared.admin.AdminReply;
import org.objectweb.joram.shared.admin.AdminRequest;
import org.objectweb.joram.shared.admin.ClearQueue;
114,7 → 115,27
/** Static value holding the default DMQ identifier for a server. */
static AgentId defaultDMQId = null;
 
/**
* The re-delivery delay use to wait before re-delivering
* messages after a deny.
*/
private int reDeliveryDelay = 0;
/**
* @return the reDeliveryDelay
*/
public int getReDeliveryDelay() {
return reDeliveryDelay/1000;
}
 
/**
* @param reDeliveryDelay the reDeliveryDelay to set
*/
public void setReDeliveryDelay(int reDeliveryDelay) {
this.reDeliveryDelay = reDeliveryDelay*1000;
}
 
/**
* Threshold above which messages are considered as undeliverable because
* constantly denied; 0 stands for no threshold, -1 for value not set.
*/
446,6 → 467,9
protected void initialize(boolean firstTime) throws Exception {
cleanWaitingRequest(System.currentTimeMillis());
 
if (reDeliveryDelay == 0)
reDeliveryDelay = AgentServer.getInteger(AdminCommandConstant.RE_DELIVERY_DELAY, 0) * 1000;
receiving = false;
messages = new Vector();
712,7 → 736,7
protected void denyRequest(AgentId from, DenyRequest not) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Queue.DenyRequest(" + from + ',' + not + ')');
 
Enumeration ids = not.getIds();
 
String msgId;
752,6 → 776,12
message.setRedelivered();
else
message.setDeliveryCount(message.getDeliveryCount()-1);
if (reDeliveryDelay > 0 && (message.isRedelivered() || message.getDeliveryCount() > 0)) {
message.setDeliveryTime(System.currentTimeMillis() + reDeliveryDelay);
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Queue.DenyRequest: setDeliveryTime " + message.getDeliveryTime());
}
 
// If message considered as undeliverable, adding
// it to the list of dead messages:
802,8 → 832,13
message.setRedelivered();
else
message.setDeliveryCount(message.getDeliveryCount()-1);
if (reDeliveryDelay > 0 && (message.isRedelivered() || message.getDeliveryCount() > 0)) {
message.setDeliveryTime(System.currentTimeMillis() + reDeliveryDelay);
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Queue.DenyRequest: setDeliveryTime " + message.getDeliveryTime());
}
 
 
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, " -> deny " + msgId);
if (logmsg.isLoggable(BasicLevel.INFO))
822,8 → 857,15
dmqManager.addDeadMessage(message.getFullMessage(), MessageErrorConstants.UNDELIVERABLE);
} else {
try {
// Else, putting the message back into the deliverables list:
storeMessageHeader(message, false);
if (message.getDeliveryTime() > 0) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Queue.DenyRequest: scheduleDeliveryTimeMessage " + message.getId() + ", reDeliveryDelay = " + reDeliveryDelay);
scheduleDeliveryTimeMessage(message.getMsg(), false);
deliveryTable.remove(message.getId());
} else {
// Else, putting the message back into the deliverables list:
storeMessageHeader(message, false);
}
} catch (AccessException e) {/* never happens */}
}
 
1109,6 → 1151,8
msg = new Message(sharedMsg);
 
if (sharedMsg.deliveryTime > currentTime) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Queue.doClientMessages: scheduleDeliveryTimeMessage " + sharedMsg.id + ", reDeliveryDelay = " + reDeliveryDelay);
scheduleDeliveryTimeMessage(sharedMsg, throwsExceptionOnFullDest);
} else {
msg.order = arrivalState.getAndIncrementArrivalCount(msg.isPersistent());
/trunk/joram/joram/mom/core/src/main/java/org/objectweb/joram/mom/dest/QueueMBean.java
1,6 → 1,6
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2004 - 2015 ScalAgent Distributed Technologies
* Copyright (C) 2004 - 2016 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
154,4 → 154,14
public List getMessagesView();
 
// public CompositeData[] getMessages() throws Exception;
/**
* Return the reDeliveryDelay (unit: second)
*/
int getReDeliveryDelay();
 
/**
* @param reDeliveryDelay the reDeliveryDelay to set(unit: second)
*/
void setReDeliveryDelay(int reDeliveryDelay);
}