OW2 Consortium joram

Compare Revisions

Ignore whitespace Rev 6442 → Rev 6443

/trunk/joram/joram/mom/extensions/restbridge/src/main/java/com/scalagent/joram/mom/dest/rest/RESTDistribution.java
22,12 → 22,22
*/
package com.scalagent.joram.mom.dest.rest;
 
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.UnsupportedEncodingException;
import java.net.ConnectException;
import java.net.URI;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
 
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
47,6 → 57,7
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
 
import fr.dyade.aaa.common.Daemon;
import fr.dyade.aaa.common.Debug;
 
/**
75,6 → 86,9
private URI uriSendMsg;
private URI uriCloseProducer;
private ReconnectDaemon reconnectDaemon;
private int reconnectSleep = 1000;
public void init(Properties properties, boolean firstTime) {
destName = properties.getProperty(DESTINATION_NAME_PROP);
if (destName == null) {
98,22 → 112,49
password = properties.getProperty(PASSWORD_PROP);
}
ClientConfig config = new ClientConfig();
client = ClientBuilder.newClient(config);
target = client.target(UriBuilder.fromUri("http://" + hostName + ":" + port + "/joram/").build());
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "RESTDistribution.init Target : " + target.getUri());
uriCreateProducer = null;
try {
ClientConfig config = new ClientConfig();
client = ClientBuilder.newClient(config);
target = client.target(UriBuilder.fromUri("http://" + hostName + ":" + port + "/joram/").build());
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "RESTDistribution.init Target : " + target.getUri());
}
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR, "Exception:: RESTDistribution.init " + e.getMessage());
}
}
 
public void distribute(Message message) throws Exception {
// if (logger.isLoggable(BasicLevel.DEBUG))
// logger.log(BasicLevel.DEBUG, "RESTDistribution.distribute(" + message + ')');
if (uriCreateProducer == null)
createProducer(userName, password);
try {
if (uriCreateProducer == null)
createProducer(userName, password);
 
sendMessage(message);
int ret = sendMessage(message);
if (ret > 300) {
createProducer(userName, password);
ret = sendMessage(message);
if (ret > 300) {
throw new Exception("can't distribute the message: " + message + ", response status: " + ret);
}
}
} catch (Exception e) {
Throwable th = e.getCause();
if ( th instanceof ConnectException) {
// reconnect
if (reconnectDaemon == null) {
reconnectDaemon = new ReconnectDaemon();
reconnectDaemon.start();
}
} else {
logger.log(BasicLevel.ERROR, "RESTDistribution.distribute:" + message, e);
}
throw e;
}
}
 
public void close() {
160,7 → 201,7
logger.log(BasicLevel.DEBUG, "RESTDistribution.createProducer (" + destName + ") = " + response.getStatus());
if (response.getStatus() > 300) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RESTDistribution.createProducer" + destName + " ERROR ==== " + response.readEntity(String.class));
logger.log(BasicLevel.DEBUG, "RESTDistribution.createProducer" + destName + " ERROR:: " + response.readEntity(String.class));
return response.getStatus();
}
print(response.getLinks());
170,6 → 211,58
return response.getStatus();
}
private Map getMapMessageToJsonBodyMap(Message message) throws JMSException {
Map msgMap = null;
ByteArrayInputStream bais = null;
ObjectInputStream ois = null;
try {
bais = new ByteArrayInputStream(message.getBody());
ois = new ObjectInputStream(bais);
msgMap = (Map) ois.readObject();
} catch (Exception exc) {
MessageFormatException jE =
new MessageFormatException("Error while getting the body.");
jE.setLinkedException(exc);
throw jE;
} finally {
try {
ois.close();
} catch (IOException exc) {}
try {
bais.close();
} catch (IOException exc) {}
}
if (msgMap == null)
return null;
HashMap<String, Object> jsonBodyMap = new HashMap<>();
Iterator<Map.Entry> entries = msgMap.entrySet().iterator();
while (entries.hasNext()) {
Entry entry = entries.next();
String key = (String) entry.getKey();
Object v = entry.getValue();
if (v != null) {
String[] value = new String[2];
if (v instanceof byte[]) {
try {
value[0] = new String((byte[])v, "UTF-8");
} catch (UnsupportedEncodingException e) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN, "", e);
MessageFormatException jE =
new MessageFormatException("Error while encode the bytes to string.");
jE.setLinkedException(e);
throw jE;
}
} else {
value[0] = ""+v;
}
value[1] = v.getClass().getName();
jsonBodyMap.put(key, value);
}
}
return jsonBodyMap;
}
private int sendMessage(Message message) throws Exception {
if (uriSendMsg == null) {
return 400;
177,9 → 270,18
Response response;
if (message.properties == null && message.type == Message.TEXT) {
// no properties, only senda text message
response = client.target(uriSendMsg).request().
accept(MediaType.TEXT_PLAIN).post(Entity.entity(message.getText(), MediaType.TEXT_PLAIN));
// no properties, only send a text message
WebTarget wt = client.target(uriSendMsg)
.queryParam("persistent", (message.persistent?Message.PERSISTENT:Message.NON_PERSISTENT))
.queryParam("priority", message.priority);
if (message.deliveryTime > 0)
wt = wt.queryParam("delivery-time", message.deliveryTime);
if (message.expiration > 0)
wt = wt.queryParam("time-to-live", message.expiration);
if (message.correlationId != null && !message.correlationId.isEmpty())
wt = wt.queryParam("correlation-id", message.correlationId);
response = wt.request()
.accept(MediaType.TEXT_PLAIN).post(Entity.entity(message.getText(), MediaType.TEXT_PLAIN));
if (response.getStatus() != 200) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RESTDistribution.sendMessage: " + response.getStatus() + ", not send: " + message);
194,12 → 296,15
if (message.type == Message.TEXT) {
message.jmsType = "TextMessage";
maps.put("body", message.getText());
} else if (message.type == Message.BYTES) {
message.jmsType = "BytesMessage";
maps.put("body", message.body);
} else if (message.type == Message.MAP) {
message.jmsType = "MapMessage";
maps.put("body", message.getObject());
maps.put("body", getMapMessageToJsonBodyMap(message));
 
} else {
logger.log(BasicLevel.ERROR, "RESTDistribution.sendMessage: type " + message.jmsType + " not yet supported.");
return 400;
208,14 → 313,16
maps.put("type", message.jmsType);
// message properties
HashMap<String, Object> props = new HashMap<>();
Enumeration e = message.properties.keys();
while (e.hasMoreElements()) {
String key = (String) e.nextElement();
Object value = message.properties.get(key);
props.put(key, new String[]{value.toString(), value.getClass().getName()});
if (message.properties != null) {
HashMap<String, Object> props = new HashMap<>();
Enumeration e = message.properties.keys();
while (e.hasMoreElements()) {
String key = (String) e.nextElement();
Object value = message.properties.get(key);
props.put(key, new String[]{value.toString(), value.getClass().getName()});
}
maps.put("properties", props);
}
maps.put("properties", props);
 
// message header
HashMap<String, Object> header = new HashMap<>();
237,6 → 344,7
response = client.target(uriSendMsg).request().accept(MediaType.TEXT_PLAIN).post(
Entity.entity(json, MediaType.APPLICATION_JSON));
}
return response.getStatus();
}
261,4 → 369,64
logger.log(BasicLevel.DEBUG, "\t" + link.getRel() + " : " + link.getUri());
}
/**
* Daemon used to reconnect.
*/
private class ReconnectDaemon extends Daemon {
 
/** Constructs a <code>ReconnectDaemon</code> thread. */
protected ReconnectDaemon() {
super("RESTDistribution_ReconnectDaemon", logger);
setDaemon(true);
if (logmon.isLoggable(BasicLevel.DEBUG)) {
logmon.log(BasicLevel.DEBUG, "RESTDistribution_ReconnectDaemon<init>");
}
}
 
/** The daemon's loop. */
public void run() {
if (logmon.isLoggable(BasicLevel.DEBUG)) {
logmon.log(BasicLevel.DEBUG, "RESTDistribution_ReconnectDaemon.run()");
}
 
try {
int retry = 0;
while (running) {
 
canStop = true;
 
try {
retry++;
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RESTDistribution_ReconnectDaemon: reconnect retry = " + retry);
createProducer(userName, password);
break;
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RESTDistribution_ReconnectDaemon.run():: " + e.getMessage());
try {
Thread.sleep(reconnectSleep);
} catch (InterruptedException e1) { }
}
 
//canStop = false;
}
reconnectDaemon = null;
} finally {
finish();
}
}
 
@Override
protected void close() {
// TODO Auto-generated method stub
}
 
@Override
protected void shutdown() {
interrupt();
}
}
}