OW2 Consortium joram

Compare Revisions

Ignore whitespace Rev 6482 → Rev 6483

/trunk/joram/joram/mom/extensions/restbridge/src/main/java/com/scalagent/joram/mom/dest/rest/RESTAcquisitionDaemon.java
44,8 → 44,8
 
import org.glassfish.jersey.client.ClientConfig;
import org.objectweb.joram.mom.dest.AcquisitionDaemon;
import org.objectweb.joram.mom.dest.AcquisitionModule;
import org.objectweb.joram.mom.dest.ReliableTransmitter;
import org.objectweb.joram.shared.DestinationConstants;
import org.objectweb.joram.shared.excepts.MessageValueException;
import org.objectweb.joram.shared.messages.ConversionHelper;
import org.objectweb.joram.shared.messages.Message;
59,7 → 59,6
import fr.dyade.aaa.common.Debug;
 
public class RESTAcquisitionDaemon implements AcquisitionDaemon {
 
private static final Logger logger = Debug.getLogger(RESTAcquisitionDaemon.class.getName());
private static final String HOST_PROP = "rest.hostName";
179,9 → 178,9
mediaTypeJson = Boolean.parseBoolean(properties.getProperty(MEDIA_TYPE_JSON_PROP));
}
 
if (properties.containsKey(AcquisitionModule.PERSISTENT_PROPERTY)) {
if (properties.containsKey(DestinationConstants.ACQUISITION_PERSISTENT)) {
try {
persistent = ConversionHelper.toBoolean(properties.get(AcquisitionModule.PERSISTENT_PROPERTY));
persistent = ConversionHelper.toBoolean(properties.get(DestinationConstants.ACQUISITION_PERSISTENT));
} catch (MessageValueException e) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN, "", e);
/trunk/joram/joram/mom/extensions/restbridge/src/main/java/com/scalagent/joram/mom/dest/rest/RESTDistribution.java
26,7 → 26,6
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.UnsupportedEncodingException;
import java.net.ConnectException;
import java.net.URI;
import java.util.Enumeration;
import java.util.HashMap;
34,22 → 33,20
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
 
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.Invocation.Builder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Link;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.client.WebTarget;import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
 
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.ClientProperties;
import org.objectweb.joram.mom.dest.DistributionHandler;
import org.objectweb.joram.shared.DestinationConstants;
import org.objectweb.joram.shared.messages.Message;
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;
57,160 → 54,191
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
 
import fr.dyade.aaa.common.Daemon;
import fr.dyade.aaa.common.Debug;
 
/**
* Distribution handler for the REST distribution bridge.
*/
public class RESTDistribution implements DistributionHandler {
 
private static final Logger logger = Debug.getLogger(RESTDistribution.class.getName());
 
private static final String HOST_PROP = "rest.hostName";
private static final String PORT_PROP = "rest.port";
private static final String USER_NAME_PROP = "rest.userName";
private static final String PASSWORD_PROP = "rest.password";
private static final String DESTINATION_NAME_PROP = "jms.destination";
 
private String hostName = "localhost";
private int port = 8989;
private Client client;
private WebTarget target;
 
private int connectTimeout = 10000;
private int readTimeout = 10000;
private String userName = null;
private String password = null;
 
private String destName;
private URI uriCreateProducer;
private URI uriSendNextMsg;
private URI uriSendMsg;
private URI uriCloseProducer;
private ReconnectDaemon reconnectDaemon;
private int reconnectSleep = 1000;
private String idleTimeout = "60"; // TODO (AF): default value (Be careful in seconds)
 
private String prodName = null;
private String clientId = null;
 
private Client client;
private URI uriSendNextMsg = null;
private URI uriCloseProducer = null;
public void init(Properties properties, boolean firstTime) {
destName = properties.getProperty(DESTINATION_NAME_PROP);
if (properties.containsKey(DestinationConstants.REST_HOST_PROP)) {
hostName = properties.getProperty(DestinationConstants.REST_HOST_PROP);
} else {
logger.log(BasicLevel.WARN,
"Missing property " + DestinationConstants.REST_HOST_PROP + ", use default value: " + hostName);
}
if (properties.containsKey(DestinationConstants.REST_PORT_PROP)) {
try {
port = Integer.parseInt(properties.getProperty(DestinationConstants.REST_PORT_PROP));
} catch (NumberFormatException exc) {
logger.log(BasicLevel.ERROR,
"Property " + DestinationConstants.REST_PORT_PROP + " could not be parsed properly, use default value: " + port, exc);
}
} else {
logger.log(BasicLevel.WARN,
"Missing property " + DestinationConstants.REST_PORT_PROP + ", use default value: " + port);
}
if (properties.containsKey(DestinationConstants.REST_USERNAME_PROP)) {
userName = properties.getProperty(DestinationConstants.REST_USERNAME_PROP);
} else {
logger.log(BasicLevel.WARN,
"Missing property " + DestinationConstants.REST_USERNAME_PROP + ", use default value: " + userName);
}
if (properties.containsKey(DestinationConstants.REST_PASSWORD_PROP)) {
password = properties.getProperty(DestinationConstants.REST_PASSWORD_PROP);
} else {
logger.log(BasicLevel.WARN,
"Missing property " + DestinationConstants.REST_PASSWORD_PROP + ", use default value: " + password);
}
 
if (properties.containsKey(DestinationConstants.IDLETIMEOUT_PROP)) {
try {
idleTimeout = properties.getProperty(DestinationConstants.IDLETIMEOUT_PROP);
} catch (NumberFormatException exc) { }
}
 
destName = properties.getProperty(DestinationConstants.DESTINATION_NAME_PROP);
if (destName == null) {
throw new IllegalArgumentException("Missing Destination JNDI name.");
}
if (properties.containsKey(HOST_PROP)) {
hostName = properties.getProperty(HOST_PROP);
}
private void createProducer(String userName, String password) throws Exception {
if (uriCloseProducer != null)
return;
 
URI base = UriBuilder.fromUri("http://" + hostName + ":" + port + "/joram/").build();
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG,
"RestDistribution.createProducer(), use base URI " + base);
 
// Initializes URI
uriSendNextMsg = null;
 
// Initializes Rest client and target
try {
// TODO (AF): It seems that there is no exceptions thrown by these methods.
client = ClientBuilder.newClient(new ClientConfig() // TODO (AF): Fix these properties with configuration values
.property(ClientProperties.CONNECT_TIMEOUT, connectTimeout)
.property(ClientProperties.READ_TIMEOUT, readTimeout));
} catch (Exception exc) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR,
"RestDistribution.createProducer(): cannot initialize Rest client", exc);
throw exc;
}
// Get the destination
Response response = null;
try {
if (properties.containsKey(PORT_PROP)) {
port = Integer.parseInt(properties.getProperty(PORT_PROP));
}
} catch (NumberFormatException nfe) {
logger.log(BasicLevel.ERROR, "Property " + PORT_PROP
+ "could not be parsed properly, use default value.", nfe);
response = client.target(base).path("jndi").path(destName)
.request().accept(MediaType.TEXT_PLAIN).head();
} catch (Exception exc) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR,
"RestDistribution.createProducer(): cannot get destination " + destName, exc);
throw exc;
}
if (properties.containsKey(USER_NAME_PROP)) {
userName = properties.getProperty(USER_NAME_PROP);
}
if (properties.containsKey(PASSWORD_PROP)) {
password = properties.getProperty(PASSWORD_PROP);
}
uriCreateProducer = null;
if (response.getStatus() == 201) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG,
"RestDistribution.createProducer(): get destination -> " + response.getStatusInfo());
} else {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR,
"RestDistribution.createProducer(): cannot get destination " + destName + " -> " + response.getStatusInfo());
throw new Exception("Cannot get destination " + destName);
}
 
try {
ClientConfig config = new ClientConfig();
client = ClientBuilder.newClient(config);
target = client.target(UriBuilder.fromUri("http://" + hostName + ":" + port + "/joram/").build());
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "RESTDistribution.init Target : " + target.getUri());
}
} catch (Exception e) {
URI uriCreateProducer = response.getLink("create-producer").getUri();
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG,
"RestDistribution.createProducer(): create-producer = " + uriCreateProducer);
 
// TODO (AF): We should fix name and client-id to be unique.
WebTarget target = client.target(uriCreateProducer);
if (prodName != null) target = target.queryParam("name", prodName);
if (clientId != null) target = target.queryParam("client-id", clientId);
if (userName != null) target = target.queryParam("user", userName);
if (password != null) target = target.queryParam("password", password);
if (idleTimeout != null) target = target.queryParam("idle-timeout", idleTimeout);
 
response = target.request().accept(MediaType.TEXT_PLAIN).post(null);
} catch (Exception exc) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR, "Exception:: RESTDistribution.init " + e.getMessage());
logger.log(BasicLevel.ERROR,
"RestDistribution.createProducer(): cannot create producer", exc);
throw exc;
}
if (response.getStatus() == 201) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG,
"RestDistribution.createProducer(): create producer -> " + response.getStatusInfo());
 
uriCloseProducer = response.getLink("close-context").getUri();
uriSendNextMsg = response.getLink("send-next-message").getUri();
} else {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR,
"RestDistribution.createProducer(): cannot create producer -> " + response.getStatusInfo());
throw new Exception("Cannot create producer");
}
}
 
public void distribute(Message message) throws Exception {
// if (logger.isLoggable(BasicLevel.DEBUG))
// logger.log(BasicLevel.DEBUG, "RESTDistribution.distribute(" + message + ')');
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RESTDistribution.distribute(" + message + ')');
try {
if (uriCreateProducer == null)
createProducer(userName, password);
 
int ret = sendMessage(message);
if (ret > 300) {
createProducer(userName, password);
ret = sendMessage(message);
if (ret > 300) {
throw new Exception("can't distribute the message: " + message + ", response status: " + ret);
}
}
try {
createProducer(userName, password);
} catch (Exception exc) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR, "RestDistribution.distribute(): Cannot create producer", exc);
throw exc;
}
try {
sendNextMessage(message);
} catch (Exception e) {
Throwable th = e.getCause();
if ( th instanceof ConnectException) {
// reconnect
if (reconnectDaemon == null) {
reconnectDaemon = new ReconnectDaemon();
reconnectDaemon.start();
}
} else {
logger.log(BasicLevel.ERROR, "RESTDistribution.distribute:" + message, e);
}
closeProducer();
throw e;
}
}
 
public void close() {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RESTDistribution.close()");
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN, "RestDistribution.close()");
closeProducer();
}
private int lookup() {
Builder builder = target.path("jndi").path(destName).request();
Response response = builder.accept(MediaType.TEXT_PLAIN).get();
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RESTDistribution.lookup \"" + destName +"\" = " + response.getStatus());
print(response.getLinks());
//TODO test status
if (uriCreateProducer == null) {
uriCreateProducer = response.getLink("create-producer").getUri();
}
return response.getStatus();
}
private int createProducer(String userName, String password) {
if (uriCreateProducer == null) {
WebTarget wTarget = target.path("jndi").path(destName).path("create-producer")
.queryParam("name", "prod-" + destName)
.queryParam("client-id", "id-" + destName);
if (userName != null)
wTarget = wTarget.queryParam("user", userName);
if (password != null)
wTarget = wTarget.queryParam("password", password);
uriCreateProducer = wTarget.getUri();
}
 
// Create the producer prod-"destName"
WebTarget wTarget = client.target(uriCreateProducer)
.queryParam("name", "prod-" + destName)
.queryParam("client-id", "id-" + destName);
if (userName != null)
wTarget = wTarget.queryParam("user", userName);
if (password != null)
wTarget = wTarget.queryParam("password", password);
Response response = wTarget.request().accept(MediaType.TEXT_PLAIN).post(null);
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RESTDistribution.createProducer (" + destName + ") = " + response.getStatus());
if (response.getStatus() > 300) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RESTDistribution.createProducer" + destName + " ERROR:: " + response.readEntity(String.class));
return response.getStatus();
}
print(response.getLinks());
uriCloseProducer = response.getLink("close-context").getUri();
uriSendNextMsg = response.getLink("send-next-message").getUri();
uriSendMsg = response.getLink("send-message").getUri();
return response.getStatus();
}
private Map getMapMessageToJsonBodyMap(Message message) throws JMSException {
Map msgMap = null;
ByteArrayInputStream bais = null;
263,56 → 291,75
return jsonBodyMap;
}
private int sendMessage(Message message) throws Exception {
if (uriSendMsg == null) {
return 400;
public static final String BytesMessage = "BytesMessage";
public static final String MapMessage = "MapMessage";
public static final String TextMessage = "TextMessage";
 
private void sendNextMessage(Message message) throws Exception {
Response response = null;
 
if (uriSendNextMsg == null) {
logger.log(BasicLevel.ERROR,
"RestDistribution.sendNextMessage(): URI not initialized.");
throw new Exception("URI not initialized");
}
Response response;
if (message.properties == null && message.type == Message.TEXT) {
// no properties, only send a text message
WebTarget wt = client.target(uriSendMsg)
.queryParam("persistent", (message.persistent?Message.PERSISTENT:Message.NON_PERSISTENT))
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG,
"RestDistribution.sendNextMessage: Send simple message");
 
// If the message contains only text and defines no properties, we can send an
// optimized message without using JSON.
WebTarget target = client.target(uriSendNextMsg)
.queryParam("delivery-mode", (message.persistent?Message.PERSISTENT:Message.NON_PERSISTENT))
.queryParam("priority", message.priority);
if (message.deliveryTime > 0)
wt = wt.queryParam("delivery-time", message.deliveryTime);
target = target.queryParam("delivery-time", message.deliveryTime);
if (message.expiration > 0)
wt = wt.queryParam("time-to-live", message.expiration);
target = target.queryParam("time-to-live", (message.expiration - System.currentTimeMillis()));
if (message.correlationId != null && !message.correlationId.isEmpty())
wt = wt.queryParam("correlation-id", message.correlationId);
response = wt.request()
target = target.queryParam("correlation-id", message.correlationId);
response = target.request()
.accept(MediaType.TEXT_PLAIN).post(Entity.entity(message.getText(), MediaType.TEXT_PLAIN));
if (response.getStatus() != 200) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RESTDistribution.sendMessage: " + response.getStatus() + ", not send: " + message);
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR,
"RestDistribution.sendNextMessage: cannot send message -> " + response.getStatus());
throw new Exception("Cannot send message: " + response.getStatus());
} else {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RESTDistribution.sendMessage: " + message);
logger.log(BasicLevel.DEBUG,
"RestDistribution.sendNextMessage: message sent -> " + response.getStatus());
}
} else {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG,
"RestDistribution.sendNextMessage: Send JSON message");
// Use JSON to encode the message
HashMap<String, Object> maps = new HashMap<>();
// message body
// Encode message body
if (message.type == Message.TEXT) {
message.jmsType = "TextMessage";
maps.put("type", TextMessage);
maps.put("body", message.getText());
} else if (message.type == Message.BYTES) {
message.jmsType = "BytesMessage";
maps.put("type", BytesMessage);
maps.put("body", message.body);
} else if (message.type == Message.MAP) {
message.jmsType = "MapMessage";
maps.put("type", MapMessage);
maps.put("body", getMapMessageToJsonBodyMap(message));
 
} else {
logger.log(BasicLevel.ERROR, "RESTDistribution.sendMessage: type " + message.jmsType + " not yet supported.");
return 400;
logger.log(BasicLevel.ERROR,
"RestDistribution.sendNextMessage: type " + message.type + " not yet supported.");
throw new Exception("Message type " + message.type + " not yet supported.");
}
// message type
maps.put("type", message.jmsType);
// message properties
// Transform JMS message properties
if (message.properties != null) {
HashMap<String, Object> props = new HashMap<>();
Enumeration e = message.properties.keys();
330,8 → 377,10
header.put("CorrelationID", new String[]{message.correlationId, String.class.getName()});
if (message.priority > 0)
header.put("Priority", new String[]{""+message.priority, Integer.class.getName()});
header.put("Type", message.jmsType);
//TODO: other headers prop
// TODO (AF): not correct?
// header.put("Type", message.jmsType);
//TODO (AF): other headers prop
maps.put("header", header);
 
Gson gson = new GsonBuilder().create();
341,92 → 390,33
logger.log(BasicLevel.DEBUG, "RESTDistribution.sendMessage: json " + json);
// Send next message
response = client.target(uriSendMsg).request().accept(MediaType.TEXT_PLAIN).post(
response = client.target(uriSendNextMsg).request().accept(MediaType.TEXT_PLAIN).post(
Entity.entity(json, MediaType.APPLICATION_JSON));
if (response.getStatus() != 200) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR,
"RestDistribution.sendNextMessage: cannot send message -> " + response.getStatus());
throw new Exception("Cannot send message: " + response.getStatus());
} else {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG,
"RestDistribution.sendNextMessage: message sent -> " + response.getStatus());
}
}
return response.getStatus();
uriSendNextMsg = response.getLink("send-next-message").getUri();
}
private int closeProducer() {
// close the producer
if (uriCloseProducer == null) {
return 400;
}
Response response = client.target(uriCloseProducer).request().accept(MediaType.TEXT_PLAIN).delete();
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RESTDistribution:: close-producer = " + response.getStatus());
return response.getStatus();
}
 
private void print(Set<Link> links) {
if (links.isEmpty())
return;
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, " links :");
for (Link link : links)
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "\t" + link.getRel() + " : " + link.getUri());
}
/**
* Daemon used to reconnect.
* Close the producer.
*/
private class ReconnectDaemon extends Daemon {
 
/** Constructs a <code>ReconnectDaemon</code> thread. */
protected ReconnectDaemon() {
super("RESTDistribution_ReconnectDaemon", logger);
setDaemon(true);
if (logmon.isLoggable(BasicLevel.DEBUG)) {
logmon.log(BasicLevel.DEBUG, "RESTDistribution_ReconnectDaemon<init>");
}
private void closeProducer() {
if (uriCloseProducer != null) {
Response response = client.target(uriCloseProducer).request().accept(MediaType.TEXT_PLAIN).delete();
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RestDistribution.closeProducer(): -> " + response.getStatus());
uriCloseProducer = null;
}
 
/** The daemon's loop. */
public void run() {
if (logmon.isLoggable(BasicLevel.DEBUG)) {
logmon.log(BasicLevel.DEBUG, "RESTDistribution_ReconnectDaemon.run()");
}
 
try {
int retry = 0;
while (running) {
 
canStop = true;
 
try {
retry++;
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RESTDistribution_ReconnectDaemon: reconnect retry = " + retry);
createProducer(userName, password);
break;
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RESTDistribution_ReconnectDaemon.run():: " + e.getMessage());
try {
Thread.sleep(reconnectSleep);
} catch (InterruptedException e1) { }
}
 
//canStop = false;
}
reconnectDaemon = null;
} finally {
finish();
}
}
 
@Override
protected void close() {
// TODO Auto-generated method stub
}
 
@Override
protected void shutdown() {
interrupt();
}
}
}
/trunk/joram/joram/mom/extensions/restbridge/src/main/java/com/scalagent/joram/mom/dest/rest/RestAcquisitionAsync.java
New file
0,0 → 1,865
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2017 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package com.scalagent.joram.mom.dest.rest;
 
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Constructor;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
 
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
 
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.ClientProperties;
import org.objectweb.joram.mom.dest.AcquisitionDaemon;
import org.objectweb.joram.mom.dest.ReliableTransmitter;
import org.objectweb.joram.shared.DestinationConstants;
import org.objectweb.joram.shared.messages.Message;
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;
 
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
 
import fr.dyade.aaa.common.Daemon;
import fr.dyade.aaa.common.Debug;
 
public class RestAcquisitionAsync implements AcquisitionDaemon {
private static final Logger logger = Debug.getLogger(RestAcquisitionAsync.class.getName());
private Properties properties = null;
private ReliableTransmitter transmitter = null;
 
private String hostName = "localhost";
private int port = 8989;
 
private int connectTimeout = 10000;
private int readTimeout = 10000;
 
private String userName = null;
private String password = null;
private String destName = null;
private String consName = null;
private String clientId = null;
private String idleTimeout = "60"; // TODO (AF): default value (Be careful in seconds)
private String timeout = "30000"; // TODO (AF): default value (Be careful in milliseconds)
private boolean mediaTypeJson = true; //default true, use "application/json"
private boolean persistent = true;
 
private XDaemon daemon = null;
 
@Override
public void start(Properties properties, ReliableTransmitter transmitter) {
this.properties = properties;
this.transmitter = transmitter;
initFromProperties();
if (destName == null) {
logger.log(BasicLevel.ERROR,
"Missing Destination JNDI name, should fixed property " + DestinationConstants.DESTINATION_NAME_PROP);
return;
}
 
daemon = new XDaemon("RestAcquisitionAsync.Daemon-" + destName, logger);
daemon.start();
}
 
/**
* Initializes fields from properties.
*/
private void initFromProperties() {
if (properties.containsKey(DestinationConstants.REST_HOST_PROP)) {
hostName = properties.getProperty(DestinationConstants.REST_HOST_PROP);
} else {
logger.log(BasicLevel.WARN,
"Missing property " + DestinationConstants.REST_HOST_PROP + ", use default value: " + hostName);
}
if (properties.containsKey(DestinationConstants.REST_PORT_PROP)) {
try {
port = Integer.parseInt(properties.getProperty(DestinationConstants.REST_PORT_PROP));
} catch (NumberFormatException exc) {
logger.log(BasicLevel.ERROR,
"Property " + DestinationConstants.REST_PORT_PROP + " could not be parsed properly, use default value: " + port, exc);
}
} else {
logger.log(BasicLevel.WARN,
"Missing property " + DestinationConstants.REST_PORT_PROP + ", use default value: " + port);
}
if (properties.containsKey(DestinationConstants.REST_USERNAME_PROP)) {
userName = properties.getProperty(DestinationConstants.REST_USERNAME_PROP);
} else {
logger.log(BasicLevel.WARN,
"Missing property " + DestinationConstants.REST_USERNAME_PROP + ", use default value: " + userName);
}
if (properties.containsKey(DestinationConstants.REST_PASSWORD_PROP)) {
password = properties.getProperty(DestinationConstants.REST_PASSWORD_PROP);
} else {
logger.log(BasicLevel.WARN,
"Missing property " + DestinationConstants.REST_PASSWORD_PROP + ", use default value: " + password);
}
 
destName = properties.getProperty(DestinationConstants.DESTINATION_NAME_PROP);
if (destName == null) {
logger.log(BasicLevel.ERROR,
"Missing property Destination JNDI name.");
// TODO (AF):
// throw new IllegalArgumentException("Missing Destination JNDI name.");
}
 
if (properties.containsKey(DestinationConstants.MEDIA_TYPE_JSON_PROP)) {
mediaTypeJson = Boolean.parseBoolean(properties.getProperty(DestinationConstants.MEDIA_TYPE_JSON_PROP));
} else {
logger.log(BasicLevel.WARN,
"Missing property " + DestinationConstants.MEDIA_TYPE_JSON_PROP + ", use default value: " + mediaTypeJson);
}
if (properties.containsKey(DestinationConstants.ACQUISITION_PERSISTENT)) {
persistent = Boolean.parseBoolean(properties.getProperty(DestinationConstants.ACQUISITION_PERSISTENT));
} else {
logger.log(BasicLevel.WARN,
"Missing property " + DestinationConstants.ACQUISITION_PERSISTENT + ", use default value: " + persistent);
}
if (properties.containsKey(DestinationConstants.TIMEOUT_PROP)) {
try {
timeout = properties.getProperty(DestinationConstants.TIMEOUT_PROP);
} catch (NumberFormatException exc) { }
}
 
if (properties.containsKey(DestinationConstants.IDLETIMEOUT_PROP)) {
try {
idleTimeout = properties.getProperty(DestinationConstants.IDLETIMEOUT_PROP);
} catch (NumberFormatException exc) { }
}
}
private URI uriCloseConsumer = null;
private URI uriReceiveNextMsg = null;
private URI uriAcknowledgeMsg = null;
Client client = null;
 
/**
* Gets destination and initializes the Rest/JMS consumer.
*/
private void createConsumer() throws Exception {
if (uriCloseConsumer != null)
return;
 
URI base = UriBuilder.fromUri("http://" + hostName + ":" + port + "/joram/").build();
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG,
"RestAcquisitionAsync.createConsumer(), use base URI " + base);
// Initializes URI
uriAcknowledgeMsg = null;
uriReceiveNextMsg = null;
// Initializes Rest client and target
try {
// TODO (AF): It seems that there is no exceptions thrown by these methods.
client = ClientBuilder.newClient(
new ClientConfig() // TODO (AF): Fix these properties with configuration values
.property(ClientProperties.CONNECT_TIMEOUT, connectTimeout)
.property(ClientProperties.READ_TIMEOUT, Integer.parseInt(timeout) + readTimeout));
} catch (Exception exc) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR,
"RestAcquisitionAsync.createConsumer(): cannot initialize Rest client", exc);
throw exc;
}
// Get the destination
Response response = null;
try {
response = client.target(base).path("jndi").path(destName)
.request().accept(MediaType.TEXT_PLAIN).head();
} catch (Exception exc) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR,
"RestAcquisitionAsync.createConsumer(): cannot get destination " + destName, exc);
throw exc;
}
if (response.getStatus() == 201) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG,
"RestAcquisitionAsync.createConsumer(): get destination -> " + response.getStatusInfo());
} else {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR,
"RestAcquisitionAsync.createConsumer(): cannot get destination " + destName + " -> " + response.getStatusInfo());
throw new Exception("Cannot get destination " + destName);
}
try {
// Get URI to create a consumer in ClientAcknowledge mode.
URI uriCreateConsumer = response.getLink("create-consumer-client-ack").getUri();
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RestAcquisitionAsync.createConsumer(): create-consumer = " + uriCreateConsumer);
 
// TODO (AF): We should fix name and client-id to be unique.
WebTarget target = client.target(uriCreateConsumer);
if (consName != null) target = target.queryParam("name", consName);
if (clientId != null) target = target.queryParam("client-id", clientId);
if (idleTimeout != null) target = target.queryParam("idle-timeout", idleTimeout);
if (userName != null) target = target.queryParam("user", userName);
if (password != null) target = target.queryParam("password", password);
 
response = target.request().accept(MediaType.TEXT_PLAIN).post(null);
} catch (Exception exc) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR,
"RestAcquisitionAsync.createConsumer(): cannot create consumer", exc);
throw exc;
}
 
if (response.getStatus() == 201) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG,
"RestAcquisitionAsync.createConsumer(): create consumer -> " + response.getStatusInfo());
 
uriCloseConsumer = response.getLink("close-context").getUri();
uriAcknowledgeMsg = null;
uriReceiveNextMsg = response.getLink("receive-next-message").getUri();
} else {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR,
"RestAcquisitionAsync.createConsumer(): cannot create consumer -> " + response.getStatusInfo());
throw new Exception("Cannot create consumer");
}
}
 
private Message recvNextMessage() throws Exception {
Response response = null;
Message msg = null;
if (uriReceiveNextMsg == null) {
logger.log(BasicLevel.ERROR,
"RestAcquisitionAsync.recvNextMessage(): URI not initialized.");
throw new Exception("URI not initialized");
}
 
if (mediaTypeJson) {
response = client.target(uriReceiveNextMsg)
.queryParam("timeout", timeout)
.request()
.accept(MediaType.APPLICATION_JSON)
.get();
if (response.getStatus() != 200) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN,
"RestAcquisitionAsync.receiveNextMessage: cannot receive next message ->" + response.getStatusInfo());
return null;
}
 
String json = response.readEntity(String.class);
if ((json == null) || json.isEmpty() || "null".equals(json)){
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN,
"RestAcquisitionAsync.receiveNextMessage: receive empty message.");
return null;
}
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG,
"RestAcquisitionAsync.receiveNextMessage: receive " + json);
Gson gson = new GsonBuilder().create();
HashMap<String, Object> map = gson.fromJson(json, HashMap.class);
if (map != null) {
try {
msg = createJSonMessage(map);
} catch (Exception exc) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR,
"RestAcquisitionAsync.receiveNextMessage: cannot create message", exc);
 
return null;
}
} else {
String text = gson.fromJson(json, String.class);
if (text != null) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN,
"RestAcquisitionAsync.receiveNextMessage: not receive a JSon message -> " + json);
try {
msg = createTextMessage(text);
} catch (Exception exc) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR,
"RestAcquisitionAsync.receiveNextMessage: cannot create message", exc);
 
return null;
}
} else {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN,
"RestAcquisitionAsync.receiveNextMessage: receive bad message -> " + json);
return null;
}
}
} else {
response = client.target(uriReceiveNextMsg)
.queryParam("timeout", timeout)
.request()
.accept(MediaType.TEXT_PLAIN)
.get();
if (response.getStatus() != 200) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN,
"RestAcquisitionAsync.receiveNextMessage: cannot receive next message ->" + response.getStatusInfo());
return null;
}
 
String text = response.readEntity(String.class);
if ((text == null) || text.isEmpty()) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN,
"RestAcquisitionAsync.receiveNextMessage: receive empty message.");
return null;
}
try {
msg = createTextMessage(text);
} catch (Exception exc) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR,
"RestAcquisitionAsync.receiveNextMessage: cannot create message", exc);
 
return null;
}
}
uriReceiveNextMsg = response.getLink("receive-next-message").getUri();
uriAcknowledgeMsg = response.getLink("acknowledge-message").getUri();
 
return msg;
}
private Object getPropertyValue(String type, String value) {
switch (type) {
case "java.lang.Boolean":
return Boolean.valueOf(value);
case "java.lang.Byte":
return Byte.valueOf(value);
case "java.lang.Short":
return Short.valueOf(value);
case "java.lang.Integer":
return Integer.valueOf(value);
case "java.lang.Long":
return Long.valueOf(value);
case "java.lang.Float":
return Float.valueOf(value);
case "java.lang.Double":
return Double.valueOf(value);
case "java.lang.String":
return value;
default:
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN,
"RESTAcquisitionAsync.getPropertyValue: unknown property type: " + type);
}
return null;
}
private void setJMSMessageHeader(Message msg, Map header) {
if (header == null || (header.size() == 0)) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN,
"RESTAcquisitionAsync.setJMSMessageHeader: empty JMS header map");
return;
}
 
if (header.containsKey("DeliveryMode")) {
try {
msg.persistent = "PERSISTENT".equals(header.get("DeliveryMode"));
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN,
"RESTAcquisitionAsync.setJMSMessageHeader -- DeliveryMode = " + header.get("DeliveryMode"));
}
}
 
if (header.containsKey("Priority")) {
try {
msg.priority = ((Double) header.get("Priority")).intValue();
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN,
"RESTAcquisitionAsync.setJMSMessageHeader -- Priority = " + header.get("Priority"));
}
}
 
if (header.containsKey("Redelivered")) {
try {
msg.redelivered = (boolean) header.get("Redelivered");
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN,
"RESTAcquisitionAsync.setJMSMessageHeader-- Redelivered = " + header.get("Redelivered"));
}
}
 
if (header.containsKey("Timestamp")) {
try {
msg.timestamp = ((Double) header.get("Timestamp")).longValue();
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN,
"RESTAcquisitionAsync.setJMSMessageHeader-- Timestamp = " + header.get("Timestamp"));
}
}
 
if (header.containsKey("Expiration")) {
try {
msg.expiration = ((Double) header.get("Expiration")).longValue();
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN,
"RESTAcquisitionAsync.setJMSMessageHeader -- Expiration = " + header.get("Expiration"));
}
}
 
if (header.containsKey("CorrelationID")) {
try {
msg.correlationId = (String) header.get("CorrelationID");
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN,
"RESTAcquisitionAsync.setJMSMessageHeader -- CorrelationID = " + header.get("CorrelationID"));
}
}
 
if (header.containsKey("CorrelationIDAsBytes")) {
// TODO "CorrelationIDAsBytes"
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN,
"RESTAcquisitionAsync.setJMSMessageHeader -- TODO CorrelationIDAsBytes");
}
 
// TODO (AF): Is it correct? The destination should correspond to the current destination.
if (header.containsKey("Destination")) {
try {
Map dest = (Map) header.get("Destination");
String id = (String) dest.get("agentId");
String name = (String) dest.get("adminName");
byte type = ((Double) dest.get("type")).byteValue();
msg.setDestination(id, name, type);
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN, "-- Destination = " + header.get("Destination"));
}
}
 
if (header.containsKey("MessageID")) {
try {
msg.id = (String) header.get("MessageID");
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN,
"RESTAcquisitionAsync.setJMSMessageHeader -- MessageID = " + header.get("MessageID"));
}
}
 
if (header.containsKey("ReplyTo")) {
try {
msg.replyToId = (String) header.get("ReplyTo");
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN,
"RESTAcquisitionAsync.setJMSMessageHeader -- ReplyTo = " + header.get("ReplyTo"));
}
}
 
if (header.containsKey("Type")) {
try {
msg.jmsType = (String) header.get("Type");
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN,
"RESTAcquisitionAsync.setJMSMessageHeader -- Type = " + header.get("Type"));
}
}
}
 
private void setJMSProperties(Message msg, Map props) {
if (props != null && props.size() > 0) {
Set<Map.Entry> entrySet = props.entrySet();
for (Map.Entry entry : entrySet) {
String key = (String) entry.getKey();
ArrayList pair = (ArrayList) entry.getValue();
Object value = getPropertyValue((String) pair.get(1), (String)pair.get(0));
 
if (key != null && value != null) {
msg.setProperty(key, value);
} else {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN,
"RESTAcquisitionAsync.setProperties: bad property <" + key + ", " + value + ">");
}
}
}
}
// TODO (AF): To verify
private Map<String, Object> getMapMessage(Map<String, Object> jsonMap) {
Map<String, Object> map = new HashMap<String, Object>();
 
// parse the json map
for (String key : jsonMap.keySet()) {
ArrayList<String> array = null;
try {
array = (ArrayList<String>) jsonMap.get(key);
} catch (Exception exc) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN,
"RESTAcquisitionAsync.getMapMessage: bad element, ignore map entry " + key, exc);
continue;
}
 
if (array.size() != 2) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN,
"RESTAcquisitionAsync.getMapMessage: bad element, ignore map entry " + key);
continue;
}
String classname = array.get(1);
try {
Object value = null;
if (Character.class.getName().equals(classname)) {
value = array.get(0).charAt(0);
} else if (byte[].class.getName().equals(classname)) {
value = array.get(0).getBytes("UTF-8");
} else {
Constructor<?> constructor = Class.forName(classname).getConstructor(String.class);
value = constructor.newInstance(array.get(0));
}
map.put(key, value);
} catch (Exception exc) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR,
"RESTAcquisitionAsync.getMapMessage: ignore map entry " + key + ", " + array.get(0) + " / " + classname, exc);
continue;
}
 
 
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG,
"RESTAcquisitionAsync.getMapMessage: get key=" + key + ", value = " + array.get(0) + " / " + classname);
}
return map;
}
 
private Message createJSonMessage(HashMap<String, Object> map) throws Exception {
Message msg = new Message();
 
// Get JMS header
setJMSMessageHeader(msg, (Map) map.get("header"));
 
// Get JMS properties
setJMSProperties(msg, (Map) map.get("properties"));
 
if (msg.id == null) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN,
"RESTAcquisitionAsync.createJSonMessage: message unique identifier not set");
// TODO (AF): Should set a unique message identifier
msg.id = uriReceiveNextMsg.getPath();
}
// Get message body
String type = (String) map.get("type");
Object body = map.get("body");
if (body == null ) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR,
"RESTAcquisitionAsync.createJSonMessage: no body definition for the message");
throw new Exception("Cannot convert message");
}
switch (type) {
case "TextMessage": {
msg.type = Message.TEXT;
if (body instanceof String) {
try {
msg.setText((String) body);
return msg;
} catch (Exception exc) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR,
"RESTAcquisitionAsync.createJSonMessage: error setting message body", exc);
throw new Exception("Cannot convert message", exc);
}
} else {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR,
"RESTAcquisitionAsync.createJSonMessage: body of TextMessage should not be " + body.getClass());
throw new Exception("Cannot convert message");
}
}
case "MapMessage": {
if (body instanceof Map) {
msg.type = Message.MAP;
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(getMapMessage((Map) body));
oos.flush();
msg.setBody(baos.toByteArray());
oos.close();
baos.close();
return msg;
} catch (Exception exc) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR,
"RESTAcquisitionAsync.createJSonMessage: error serializing message body", exc);
throw new Exception("Cannot convert message", exc);
}
} else {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR,
"RESTAcquisitionAsync.createJSonMessage: body of MapMessage should not be " + body.getClass());
throw new Exception("Cannot convert message");
}
}
case "BytesMessage": {
if (body instanceof ArrayList) {
ArrayList jmsBody = (ArrayList) body;
msg.type = Message.BYTES;
byte[] bytes = new byte[jmsBody.size()];
try {
for (int i = 0; i < bytes.length; i++) {
bytes[i] = ((Number) jmsBody.get(i)).byteValue();
}
} catch (Exception exc) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR,
"RESTAcquisitionAsync.createJSonMessage: error converting BytesMessage", exc);
throw new Exception("Cannot convert message", exc);
}
msg.body = bytes;
return msg;
} else {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR,
"RESTAcquisitionAsync.createJSonMessage: body of BytesMessage should not be " + body.getClass());
throw new Exception("Cannot convert message");
}
}
default:
logger.log(BasicLevel.ERROR,
"RESTAcquisitionAsync.createJSonMessage: type " + type + " not supported.");
throw new Exception("Cannot convert message");
}
}
 
private Message createTextMessage(String text) throws Exception {
Message msg = new Message();
msg.type = Message.TEXT;
msg.id = uriReceiveNextMsg.getPath();
msg.persistent = persistent;
msg.setText(text);
return msg;
}
void acknowledgeMessage() throws Exception {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG,
"RestAcquisitionAsync.acknowledgeMessage: " + uriAcknowledgeMsg);
if (uriAcknowledgeMsg != null) {
Response response = client.target(uriAcknowledgeMsg)
.request()
.accept(MediaType.TEXT_PLAIN)
.delete();
 
if (response.getStatus() != 200) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN,
"RestAcquisitionAsync.acknowledgeMessage: error during acknowledge ->" + response.getStatusInfo());
throw new Exception("Error during acknowledge: "+ response.getStatusInfo());
}
} else {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN,
"RestAcquisitionAsync.acknowledgeMessage: Cannot acknowledge message, empty URI.");
throw new Exception("Error during acknowledge: empty URI");
}
uriAcknowledgeMsg = null;
}
 
private void closeConsumer() {
if (uriCloseConsumer != null) {
try {
Response response = client.target(uriCloseConsumer).request().accept(MediaType.TEXT_PLAIN).delete();
if (logger.isLoggable(BasicLevel.DEBUG))
// TODO (AF): remove stacktrace
logger.log(BasicLevel.DEBUG, "RestDistribution.closeConsumer(): -> " + response.getStatus(), new Exception());
} catch (Exception exc) {
logger.log(BasicLevel.ERROR, "RestDistribution.closeConsumer()", exc);
return;
} finally {
// TODO (AF): To remove.
logger.log(BasicLevel.ERROR, "RestDistribution.closeConsumer()", new Exception());
uriCloseConsumer = null;
}
}
}
@Override
public void stop() {
if ((daemon != null) && daemon.isRunning())
daemon.stop();
daemon = null;
}
 
private class XDaemon extends Daemon {
 
protected XDaemon(String name, Logger logmon) {
super(name, logmon);
setDaemon(true);
}
 
@Override
public void run() {
if (logger.isLoggable(BasicLevel.INFO))
logger.log(BasicLevel.INFO, "RestAcquisitionAsync.Daemon.run(): starting");
try {
while (running) {
canStop = false;
try {
createConsumer();
} catch (Exception exc) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR, "RestAcquisitionAsync.Daemon.run()", exc);
 
try {
canStop = true;
// TODO (AF): Handles this pause smartly.
Thread.sleep(1000L);
} catch (InterruptedException ie) {}
canStop = false;
continue;
}
 
Message msg = null;
try {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RestAcquisitionAsync.Daemon.run(): waits for new message.");
 
msg = recvNextMessage();
} catch (Exception exc) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR, "RestAcquisitionAsync.Daemon.run()", exc);
 
closeConsumer();
continue;
}
 
if (msg == null) continue;
 
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RestAcquisitionAsync.Daemon.run(): receives " + msg.id);
 
try {
transmitter.transmit(msg, msg.id);
 
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RestAcquisitionAsync.Daemon.run(): " + msg.id + " transmitted");
 
} catch (Exception exc) {
// TODO (AF): Error during transmit ??
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR, "RestAcquisitionAsync.Daemon.run()", exc);
 
closeConsumer();
continue;
}
 
try {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RestAcquisitionAsync.Daemon.run(): acknowledges message.");
 
acknowledgeMessage();
} catch (Exception exc) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR, "RestAcquisitionAsync.Daemon.run()", exc);
 
closeConsumer();
continue;
}
}
} catch (Throwable exc) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR, "RestAcquisitionAsync.Daemon.run()", exc);
} finally {
if (logger.isLoggable(BasicLevel.INFO))
logger.log(BasicLevel.INFO, "RestAcquisitionAsync.Daemon.run(): finishing", new Exception());
finish();
}
}
@Override
public synchronized void stop() {
logger.log(BasicLevel.ERROR, "RestAcquisitionAsync.Daemon.stop()", new Exception());
}
 
@Override
protected void close() {
closeConsumer();
}
 
@Override
protected void shutdown() {
// TODO (AF): force jersey to close
}
}
}
Property changes:
Added: svn:mime-type
+ text/plain
/trunk/joram/joram/mom/extensions/restbridge/src/main/java/com/scalagent/joram/mom/dest/rest/RESTAcquisition.java
44,8 → 44,8
 
import org.glassfish.jersey.client.ClientConfig;
import org.objectweb.joram.mom.dest.AcquisitionHandler;
import org.objectweb.joram.mom.dest.AcquisitionModule;
import org.objectweb.joram.mom.dest.ReliableTransmitter;
import org.objectweb.joram.shared.DestinationConstants;
import org.objectweb.joram.shared.excepts.MessageValueException;
import org.objectweb.joram.shared.messages.ConversionHelper;
import org.objectweb.joram.shared.messages.Message;
64,15 → 64,6
 
private static final Logger logger = Debug.getLogger(RESTAcquisition.class.getName());
 
private static final String HOST_PROP = "rest.hostName";
private static final String PORT_PROP = "rest.port";
private static final String USER_NAME_PROP = "rest.userName";
private static final String PASSWORD_PROP = "rest.password";
private static final String TIMEOUT_PROP = "rest.timeout";
private static final String NB_MAX_MSG_PROP = "rest.nbMaxMsgByPeriode";
private static final String MEDIA_TYPE_JSON_PROP = "rest.mediaTypeJson";
private static final String DESTINATION_NAME_PROP = "jms.destination";
 
private String hostName = "localhost";
private int port = 8989;
private Client client;
90,38 → 81,39
private URI uriCloseConsumer;
public void init(Properties properties) {
destName = properties.getProperty(DESTINATION_NAME_PROP);
destName = properties.getProperty(DestinationConstants.DESTINATION_NAME_PROP);
if (destName == null) {
throw new IllegalArgumentException("Missing Destination JNDI name.");
}
if (properties.containsKey(HOST_PROP)) {
hostName = properties.getProperty(HOST_PROP);
if (properties.containsKey(DestinationConstants.REST_HOST_PROP)) {
hostName = properties.getProperty(DestinationConstants.REST_HOST_PROP);
}
 
if (properties.containsKey(PORT_PROP)) {
if (properties.containsKey(DestinationConstants.REST_PORT_PROP)) {
try {
port = Integer.parseInt(properties.getProperty(PORT_PROP));
port = Integer.parseInt(properties.getProperty(DestinationConstants.REST_PORT_PROP));
} catch (NumberFormatException nfe) {
logger.log(BasicLevel.ERROR, "Property " + PORT_PROP
logger.log(BasicLevel.ERROR, "Property " + DestinationConstants.REST_PORT_PROP
+ " could not be parsed properly, use default value.", nfe);
}
}
if (properties.containsKey(USER_NAME_PROP)) {
userName = properties.getProperty(USER_NAME_PROP);
if (properties.containsKey(DestinationConstants.REST_USERNAME_PROP)) {
userName = properties.getProperty(DestinationConstants.REST_USERNAME_PROP);
}
if (properties.containsKey(PASSWORD_PROP)) {
password = properties.getProperty(PASSWORD_PROP);
if (properties.containsKey(DestinationConstants.REST_PASSWORD_PROP)) {
password = properties.getProperty(DestinationConstants.REST_PASSWORD_PROP);
}
if (properties.containsKey(TIMEOUT_PROP)) {
if (properties.containsKey(DestinationConstants.TIMEOUT_PROP)) {
try {
timeout = Long.parseLong(properties.getProperty(TIMEOUT_PROP));
timeout = Long.parseLong(properties.getProperty(DestinationConstants.TIMEOUT_PROP));
} catch (NumberFormatException exc) { }
}
if (properties.containsKey(NB_MAX_MSG_PROP)) {
if (properties.containsKey(DestinationConstants.NB_MAX_MSG_PROP)) {
try {
nbMaxMsg = Integer.parseInt(properties.getProperty(NB_MAX_MSG_PROP));
nbMaxMsg = Integer.parseInt(properties.getProperty(DestinationConstants.NB_MAX_MSG_PROP));
} catch (NumberFormatException exc) { }
}
// init client and target
if (client == null) {
ClientConfig config = new ClientConfig();
136,8 → 128,8
client.target(uriCloseConsumer).request().accept(MediaType.TEXT_PLAIN).delete();
uriCreateConsumer = null;
}
if (properties.containsKey(MEDIA_TYPE_JSON_PROP)) {
mediaTypeJson = Boolean.parseBoolean(properties.getProperty(MEDIA_TYPE_JSON_PROP));
if (properties.containsKey(DestinationConstants.MEDIA_TYPE_JSON_PROP)) {
mediaTypeJson = Boolean.parseBoolean(properties.getProperty(DestinationConstants.MEDIA_TYPE_JSON_PROP));
}
createConsumer();
}
227,7 → 219,7
}
if (jsonMessageHeader.containsKey("CorrelationIDAsBytes")) {
// TODO "CorrelationIDAsBytes"
// TODO "CorrelationIDAsBytes"
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN, "-- TODO CorrelationIDAsBytes");
}
483,9 → 475,9
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RESTAcquisition.setProperties properties = " + properties);
if (properties.containsKey(AcquisitionModule.PERSISTENT_PROPERTY)) {
if (properties.containsKey(DestinationConstants.ACQUISITION_PERSISTENT)) {
try {
persistent = ConversionHelper.toBoolean(properties.get(AcquisitionModule.PERSISTENT_PROPERTY));
persistent = ConversionHelper.toBoolean(properties.get(DestinationConstants.ACQUISITION_PERSISTENT));
} catch (MessageValueException e) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN, "", e);