OW2 Consortium joram

Compare Revisions

Ignore whitespace Rev 6511 → Rev 6512

/trunk/joram/joram/tools/rest/jms/src/main/java/org/objectweb/joram/tools/rest/jms/Helper.java
1,6 → 1,6
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2016 ScalAgent Distributed Technologies
* Copyright (C) 2016 - 2017 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
64,6 → 64,7
public static final int DFLT_CLEANER_PERIOD = 15;
public static Logger logger = Debug.getLogger(Helper.class.getName());
private static final AtomicLong counter = new AtomicLong(1);
private static Helper helper = null;
private InitialContext ictx;
79,7 → 80,7
sessionCtxs = new HashMap<String, SessionContext>();
}
static public Helper getInstance() {
static public synchronized Helper getInstance() {
if (helper == null)
helper = new Helper();
return helper;
104,9 → 105,11
jndiProps.setProperty("java.naming.factory.host", "localhost");
jndiProps.setProperty("java.naming.factory.port", "16400");
}
// TODO: use the osgi service jndi?
// ServiceReference<ObjectFactory> ref = context.getServiceReference(javax.naming.spi.ObjectFactory.class);
// ObjectFactory jndiFactory = bundleContext.getService(ref);
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "jndiProperties = " + jndiProps);
210,7 → 213,7
return null;
}
public Object lookup(String name) throws NamingException {
public synchronized Object lookup(String name) throws NamingException {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Helper.lookup " + name);
if (ictx == null)
238,7 → 241,7
* @return the producer name
* @throws Exception
*/
public String createProducer(
public synchronized String createProducer(
String userName,
String password,
String clientId,
289,18 → 292,19
logger.log(BasicLevel.DEBUG, "Helper.createProducer jmsContext = " + restClientCtx.getJmsContext());
}
SessionContext prodContext = sessionCtxs.get(prodId);
if ( prodContext == null) {
ProducerContext prodContext = (ProducerContext) sessionCtxs.get(prodId);
if (prodContext == null) {
// create a new producer context
prodContext = new ProducerContext(restClientCtx);
prodContext.setJmsContext(restClientCtx.getJmsContext().createContext(sessionMode));
JMSProducer producer = prodContext.getJmsContext().createProducer();
producer.setDeliveryMode(deliveryMode);
if (correlationID != null)
producer.setJMSCorrelationID(correlationID);
producer.setPriority(priority);
producer.setTimeToLive(timeToLive);
producer.setDeliveryDelay(deliveryDelay);
prodContext.setDefaultDeliveryMode(deliveryMode);
prodContext.setDefaultJMSCorrelationID(correlationID);
prodContext.setDefaultPriority(priority);
prodContext.setDefaultTimeToLive(timeToLive);
prodContext.setDefaultDeliveryDelay(deliveryDelay);
JMSProducer producer = prodContext.getJmsContext().createProducer();
((ProducerContext) prodContext).setProducer(producer);
sessionCtxs.put(prodId, prodContext);
restClientCtx.addSessionCtxNames(prodId);
316,10 → 320,11
}
prodContext.setDest(destination);
}
return prodId;
}
public String createConsumer(
public synchronized String createConsumer(
String userName,
String password,
String clientId,
426,7 → 431,7
return consId;
}
 
private void setMapMessage(Map<String, Object> jsonMap, MapMessage msg) throws Exception {
static final void setMapMessage(Map<String, Object> jsonMap, MapMessage msg) throws Exception {
if (jsonMap == null)
return;
 
500,7 → 505,7
}
}
private Object getValue(Map map, String key) throws Exception {
static final Object getValue(Map map, String key) throws Exception {
Object value = map.get(key);
if (value instanceof ArrayList) {
ArrayList<String> array =(ArrayList<String>) value;
526,7 → 531,7
* @param jmsProps
* @param jmsBody
* @param deliveryMode
* @param deliveryTime
* @param deliveryDelay
* @param priority
* @param timeToLive
* @param correlationID
540,7 → 545,7
Map<String, Object> jmsProps,
Object jmsBody,
int deliveryMode,
long deliveryTime,
long deliveryDelay,
int priority,
long timeToLive,
String correlationID) throws Exception {
550,152 → 555,10
if (producerCtx == null)
throw new Exception(prodName + " not found.");
Message msg = null;
if (type.equals(TextMessage.class.getSimpleName())) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "send text message = " + jmsBody);
// create the text message
msg = producerCtx.getJmsContext().createTextMessage((String) jmsBody);
} else if(type.equals(BytesMessage.class.getSimpleName())) {
// create the byte message
if (jmsBody instanceof ArrayList) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "send bytes message");
msg = producerCtx.getJmsContext().createBytesMessage();
byte[] bytes = new byte[((ArrayList) jmsBody).size()];
for (int i = 0; i < ((ArrayList) jmsBody).size(); i++) {
Object value = ((ArrayList) jmsBody).get(i);
bytes[i] = ((Number) value).byteValue();
}
((BytesMessage) msg).writeBytes(bytes);
((BytesMessage) msg).reset();
} else {
throw new Exception("BytesMessage: invalid jmsBody = " + jmsBody.getClass().getName());
}
} else if(type.equals(MapMessage.class.getSimpleName())) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "send map message");
 
// create the map message
if (jmsBody instanceof Map) {
msg = producerCtx.getJmsContext().createMapMessage();
setMapMessage((Map) jmsBody, (MapMessage) msg);
} else {
throw new Exception("MapMessage: invalid jmsBody = " + jmsBody.getClass().getName());
}
} else if(type.equals(ObjectMessage.class.getSimpleName())) {
throw new Exception("type: " + type + ", not yet implemented");
} else if(type.equals(StreamMessage.class.getSimpleName())) {
throw new Exception("type: " + type + ", not yet implemented");
} else {
throw new Exception("Unknown message type: " + type);
}
if (jmsHeaders != null) {
// Header
if (deliveryMode == -1) {
Integer value = (Integer) getValue(jmsHeaders, "DeliveryMode");
if (value != null)
msg.setJMSDeliveryMode(value);
}
if (deliveryTime == -1) {
Long value = (Long) getValue(jmsHeaders, "DeliveryTime");
if (value != null)
msg.setJMSDeliveryTime(value);
}
if (priority == -1) {
Integer value = (Integer) getValue(jmsHeaders, "Priority");
if (value != null)
msg.setJMSPriority(value);
}
if (timeToLive == -1) {
Long value = (Long) getValue(jmsHeaders, "Expiration");
if (value != null)
msg.setJMSExpiration(value);
}
if (correlationID == null) {
String value = (String) getValue(jmsHeaders, "CorrelationID");
if (value != null)
msg.setJMSCorrelationID(value);
}
}
if (deliveryMode > -1)
msg.setJMSDeliveryMode(deliveryMode);
if (deliveryTime > -1)
msg.setJMSDeliveryTime(deliveryTime);
if (priority > -1)
msg.setJMSPriority(priority);
if (timeToLive > -1)
msg.setJMSExpiration(timeToLive);
if (correlationID != null)
msg.setJMSCorrelationID(correlationID);
if (jmsProps != null) {
// Properties
for (String key : jmsProps.keySet()) {
Object value = null;
try {
value = getValue(jmsProps, key);
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR, "ignore set jms properties(" + key + ", " + value + ") : " + e.getMessage());
continue;
}
if (value == null)
continue;
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "set jms properties: " + key + ", value = " + value + ", " + value.getClass().getSimpleName());
switch (value.getClass().getSimpleName()) {
case "String":
msg.setStringProperty(key, (String) value);
break;
case "Boolean":
msg.setBooleanProperty(key, (Boolean)value);
break;
case "Integer":
msg.setIntProperty(key, (Integer)value);
break;
case "Long":
msg.setLongProperty(key, (Long)value);
break;
case "Double":
msg.setDoubleProperty(key, (Double)value);
break;
case "Float":
msg.setFloatProperty(key, (Float)value);
break;
case "Short":
msg.setShortProperty(key, (Short)value);
break;
case "Byte":
msg.setByteProperty(key, (Byte)value);
break;
 
default:
try {
msg.setObjectProperty(key, value);
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR, "ignore jms setObjectProperties(" + key + ", " + value + ") : " + e.getMessage());
}
break;
}
}
}
// send the message
producerCtx.getProducer().send(producerCtx.getDest(), msg);
// Increment the last id
producerCtx.incLastId();
//update activity
producerCtx.getClientCtx().setLastActivity(System.currentTimeMillis());
return producerCtx.getLastId();
return producerCtx.send(type,
jmsHeaders, jmsProps,
jmsBody, deliveryMode, deliveryDelay, priority, timeToLive,
correlationID);
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN, e);
715,39 → 578,12
ConsumerContext consumerCtx = (ConsumerContext) sessionCtxs.get(consName);
if (consumerCtx == null)
throw new Exception(consName + " not found.");
//update activity
consumerCtx.getClientCtx().setLastActivity(System.currentTimeMillis());
Message message = consumerCtx.getMessage(msgId);
if (message != null)
return message;
if (timeout > 0)
message = consumerCtx.getConsumer().receive(timeout);
else if (timeout == 0)
message = consumerCtx.getConsumer().receiveNoWait();
else {
message = consumerCtx.getConsumer().receive();
if (message == null) {
throw new JMSException("The consumer expire (timeout)");
}
}
//update activity
consumerCtx.getClientCtx().setLastActivity(System.currentTimeMillis());
if (message != null) {
if (consumerCtx.getJmsContext().getSessionMode() == JMSContext.CLIENT_ACKNOWLEDGE) {
long id = msgId;
if (id == -1)
id = consumerCtx.incLastId();
consumerCtx.put(id, message);
} else {
consumerCtx.incLastId();
}
}
message = consumerCtx.receive(timeout, msgId);
return message;
}