OW2 Consortium joram

Compare Revisions

Ignore whitespace Rev 6474 → Rev 6475

/trunk/joram/joram/mom/core/src/main/java/org/objectweb/joram/mom/dest/DistributionQueue.java
1,6 → 1,6
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2010 - 2015 ScalAgent Distributed Technologies
* Copyright (C) 2010 - 2017 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
29,6 → 29,7
import org.objectweb.joram.mom.notifications.ClientMessages;
import org.objectweb.joram.mom.notifications.WakeUpNot;
import org.objectweb.joram.mom.util.DMQManager;
import org.objectweb.joram.shared.DestinationConstants;
import org.objectweb.joram.shared.MessageErrorConstants;
import org.objectweb.joram.shared.excepts.MessageValueException;
import org.objectweb.joram.shared.excepts.RequestException;
46,16 → 47,11
* behavior, delivering messages via the {@link DistributionModule}.
*/
public class DistributionQueue extends Queue {
 
public static Logger logger = Debug.getLogger(DistributionQueue.class.getName());
 
/** Default period used to clean queue and re-distribute failing messages. */
public static final long DEFAULT_PERIOD = 1000;
 
public static final String BATCH_DISTRIBUTION_OPTION = "distribution.batch";
public static final String ASYNC_DISTRIBUTION_OPTION = "distribution.async";
 
/** define serialVersionUID for interoperability */
private static final long serialVersionUID = 1L;
 
106,9 → 102,9
isAsyncDistribution = false;
 
if (properties != null) {
if (properties.containsKey(BATCH_DISTRIBUTION_OPTION)) {
if (properties.containsKey(DestinationConstants.BATCH_DISTRIBUTION_OPTION)) {
try {
batchDistribution = ConversionHelper.toBoolean(properties.get(BATCH_DISTRIBUTION_OPTION));
batchDistribution = ConversionHelper.toBoolean(properties.get(DestinationConstants.BATCH_DISTRIBUTION_OPTION));
} catch (MessageValueException exc) {
logger.log(BasicLevel.ERROR, "DistributionModule: can't parse batch option.", exc);
}
120,11 → 116,11
 
if (firstTime) {
if (properties != null) {
distributionClassName = properties.getProperty(DistributionModule.CLASS_NAME);
properties.remove(DistributionModule.CLASS_NAME);
distributionClassName = properties.getProperty(DestinationConstants.DISTRIBUTION_CLASS_NAME);
properties.remove(DestinationConstants.DISTRIBUTION_CLASS_NAME);
}
if (distributionClassName == null) {
throw new RequestException("Distribution class name not found: " + DistributionModule.CLASS_NAME
throw new RequestException("Distribution class name not found: " + DestinationConstants.DISTRIBUTION_CLASS_NAME
+ " property must be set on queue creation.");
}
 
146,9 → 142,9
// stop distributionDaemon
distributionDaemon.close();
distributionDaemon = null;
if (properties.containsKey(BATCH_DISTRIBUTION_OPTION)) {
if (properties.containsKey(DestinationConstants.BATCH_DISTRIBUTION_OPTION)) {
try {
batchDistribution = ConversionHelper.toBoolean(properties.get(BATCH_DISTRIBUTION_OPTION));
batchDistribution = ConversionHelper.toBoolean(properties.get(DestinationConstants.BATCH_DISTRIBUTION_OPTION));
} catch (MessageValueException exc) {
logger.log(BasicLevel.ERROR, "DistributionQueue: can't parse batch option.", exc);
}
161,9 → 157,9
}
 
private boolean isAsyncDistribution(Properties properties) {
if (properties.containsKey(ASYNC_DISTRIBUTION_OPTION)) {
if (properties.containsKey(DestinationConstants.ASYNC_DISTRIBUTION_OPTION)) {
try {
return ConversionHelper.toBoolean(properties.get(ASYNC_DISTRIBUTION_OPTION));
return ConversionHelper.toBoolean(properties.get(DestinationConstants.ASYNC_DISTRIBUTION_OPTION));
} catch (MessageValueException exc) {
logger.log(BasicLevel.ERROR, "DistributionQueue: can't parse DaemonDistribution option.", exc);
}
416,27 → 412,29
// after a distribution exception
distributionDaemon.notify();
}
}
 
if (logger.isLoggable(BasicLevel.DEBUG) && distributionDaemon != null) {
logger.log(BasicLevel.DEBUG, "DistributionQueue.wakeUpNot distributionDaemon = " + distributionDaemon +
", distributionDaemon.isEmpty() = " + distributionDaemon.isEmpty());
}
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "DistributionQueue.wakeUpNot distributionDaemon = " + distributionDaemon +
", distributionDaemon.isEmpty() = " + distributionDaemon.isEmpty());
 
if (distributionDaemon != null) {
if (!distributionDaemon.isEmpty()) {
// needless to push an other message
// because the distributionDaemon can't distribute message now.
break;
} else {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "DistributionQueue.wakeUpNot " + msg.getId());
// TODO (AF): verify with JMS and AMQP bridge tests
// if (!distributionDaemon.isEmpty()) {
// // needless to push an other message
// // because the distributionDaemon can't distribute message now.
// break;
// } else {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "DistributionQueue.wakeUpNot " + msg.getId());
 
// Bug fix (JORAM-200): Avoid to duplicate a message already known by the daemon (either in
// the distributeQueue or the ackedQueue).
if (! distributionDaemon.isHandling(msg.getId()))
distributionDaemon.push(msg.getFullMessage());
// Bug fix (JORAM-200): Avoid to duplicate a message already known by the daemon (either in
// the distributeQueue or the ackedQueue).
if (! distributionDaemon.isHandling(msg.getId())) {
distributionDaemon.push(msg.getFullMessage());
} else {
// The current message is already in daemon queue, no need to parse the end of list.
break;
}
// }
}
 
if (!batchDistribution) {