OW2 Consortium joram

Compare Revisions

Ignore whitespace Rev 6436 → Rev 6437

/trunk/joram/joram/mom/extensions/restbridge/src/main/java/com/scalagent/joram/mom/dest/rest/RESTAcquisition.java
New file
0,0 → 1,458
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2017 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package com.scalagent.joram.mom.dest.rest;
 
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
 
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Invocation.Builder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
 
import org.glassfish.jersey.client.ClientConfig;
import org.objectweb.joram.mom.dest.AcquisitionHandler;
import org.objectweb.joram.mom.dest.AcquisitionModule;
import org.objectweb.joram.mom.dest.ReliableTransmitter;
import org.objectweb.joram.shared.excepts.MessageValueException;
import org.objectweb.joram.shared.messages.ConversionHelper;
import org.objectweb.joram.shared.messages.Message;
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;
 
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
 
import fr.dyade.aaa.common.Debug;
 
/**
* Acquisition handler for the REST acquisition bridge.
*/
public class RESTAcquisition implements AcquisitionHandler {
 
private static final Logger logger = Debug.getLogger(RESTAcquisition.class.getName());
 
private static final String HOST_PROP = "rest.hostName";
private static final String PORT_PROP = "rest.port";
private static final String USER_NAME_PROP = "rest.userName";
private static final String PASSWORD_PROP = "rest.password";
private static final String TIMEOUT_PROP = "rest.timeout";
private static final String NB_MAX_MSG_PROP = "rest.nbMaxMsgByPeriode";
private static final String MEDIA_TYPE_JSON_PROP = "rest.mediaTypeJson";
private static final String DESTINATION_NAME_PROP = "jms.destination";
 
private String hostName = "localhost";
private int port = 8989;
private Client client;
private WebTarget target;
private String userName = null;
private String password = null;
private long timeout = 0; //timeout = 0 => NoWait
private boolean persistent = false;
private int nbMaxMsg = 100;
private boolean mediaTypeJson = true;//default true "application/json"
private String destName;
private URI uriCreateConsumer;
private URI uriConsume;
private URI uriCloseConsumer;
public void init(Properties properties) {
destName = properties.getProperty(DESTINATION_NAME_PROP);
if (destName == null) {
throw new IllegalArgumentException("Missing Destination JNDI name.");
}
if (properties.containsKey(HOST_PROP)) {
hostName = properties.getProperty(HOST_PROP);
}
 
if (properties.containsKey(PORT_PROP)) {
try {
port = Integer.parseInt(properties.getProperty(PORT_PROP));
} catch (NumberFormatException nfe) {
logger.log(BasicLevel.ERROR, "Property " + PORT_PROP
+ " could not be parsed properly, use default value.", nfe);
}
}
if (properties.containsKey(USER_NAME_PROP)) {
userName = properties.getProperty(USER_NAME_PROP);
}
if (properties.containsKey(PASSWORD_PROP)) {
password = properties.getProperty(PASSWORD_PROP);
}
if (properties.containsKey(TIMEOUT_PROP)) {
try {
timeout = Long.parseLong(properties.getProperty(TIMEOUT_PROP));
} catch (NumberFormatException exc) { }
}
if (properties.containsKey(NB_MAX_MSG_PROP)) {
try {
nbMaxMsg = Integer.parseInt(properties.getProperty(NB_MAX_MSG_PROP));
} catch (NumberFormatException exc) { }
}
// init client and target
if (client == null) {
ClientConfig config = new ClientConfig();
client = ClientBuilder.newClient(config);
target = client.target(UriBuilder.fromUri("http://" + hostName + ":" + port + "/joram/").build());
}
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "RESTAcquisition.init Target : " + target.getUri());
}
// create the consumer
if (uriCreateConsumer != null) {
client.target(uriCloseConsumer).request().accept(MediaType.TEXT_PLAIN).delete();
uriCreateConsumer = null;
}
if (properties.containsKey(MEDIA_TYPE_JSON_PROP)) {
mediaTypeJson = Boolean.parseBoolean(properties.getProperty(MEDIA_TYPE_JSON_PROP));
}
createConsumer();
}
 
public void createConsumer() {
Builder builder = target.path("jndi").path(destName).request();
Response response = builder.accept(MediaType.TEXT_PLAIN).head();
// if (logger.isLoggable(BasicLevel.DEBUG))
// logger.log(BasicLevel.DEBUG, "RESTAcquisition.createConsumer: response = " + response);
if (201 != response.getStatus()) {
return;
}
uriCreateConsumer = response.getLink("create-consumer").getUri();
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RESTAcquisition.createConsumer: uriCreateConsumer = " + uriCreateConsumer);
 
WebTarget wTarget = client.target(uriCreateConsumer)
.queryParam("name", "cons-" + destName)
.queryParam("client-id", "id-" + destName);
if (userName != null)
wTarget = wTarget.queryParam("user", userName);
if (password != null)
wTarget = wTarget.queryParam("password", password);
response = wTarget.request().accept(MediaType.TEXT_PLAIN).post(null);
// if (logger.isLoggable(BasicLevel.DEBUG))
// logger.log(BasicLevel.DEBUG, "RESTAcquisition.createConsumer: response = " + response);
if (201 == response.getStatus()) {
uriCloseConsumer = response.getLink("close-context").getUri();
uriConsume = response.getLink("receive-message").getUri();
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RESTAcquisition.createConsumer: uriCloseConsumer = " + uriCloseConsumer + ", uriConsume = " + uriConsume);
}
}
private void setMessageHeader(Map jsonMessageHeader, Message message) {
if (jsonMessageHeader.containsKey("DeliveryMode"))
try {
message.persistent = "PERSISTENT".equals(jsonMessageHeader.get("DeliveryMode"));
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN, "-- DeliveryMode = " + jsonMessageHeader.get("DeliveryMode"));
}
if (jsonMessageHeader.containsKey("DeliveryMode")) {
try {
message.priority = ((Double) jsonMessageHeader.get("Priority")).intValue();
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN, "-- Priority = " + jsonMessageHeader.get("Priority"));
}
}
if (jsonMessageHeader.containsKey("Redelivered")) {
try {
message.redelivered = (boolean) jsonMessageHeader.get("Redelivered");
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN, "-- Redelivered = " + jsonMessageHeader.get("Redelivered"));
}
}
 
if (jsonMessageHeader.containsKey("Timestamp")) {
try {
message.timestamp = ((Double) jsonMessageHeader.get("Timestamp")).longValue();
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN, "-- Timestamp = " + jsonMessageHeader.get("Timestamp"));
}
}
if (jsonMessageHeader.containsKey("Expiration")) {
try {
message.expiration = ((Double) jsonMessageHeader.get("Expiration")).longValue();
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN, "-- Expiration = " + jsonMessageHeader.get("Expiration"));
}
}
if (jsonMessageHeader.containsKey("CorrelationID"))
try {
message.correlationId = (String) jsonMessageHeader.get("CorrelationID");
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN, "-- CorrelationID = " + jsonMessageHeader.get("CorrelationID"));
}
if (jsonMessageHeader.containsKey("CorrelationIDAsBytes")) {
// TODO "CorrelationIDAsBytes"
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN, "-- TODO CorrelationIDAsBytes");
}
if (jsonMessageHeader.containsKey("Destination")) {
try {
Map dest = (Map) jsonMessageHeader.get("Destination");
String id = (String) dest.get("agentId");
String name = (String) dest.get("adminName");
byte type = ((Double) dest.get("type")).byteValue();
message.setDestination(id, name, type);
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN, "-- Destination = " + jsonMessageHeader.get("Destination"));
}
}
if (jsonMessageHeader.containsKey("MessageID"))
try {
message.id = (String) jsonMessageHeader.get("MessageID");
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN, "-- MessageID = " + jsonMessageHeader.get("MessageID"));
}
if (jsonMessageHeader.containsKey("ReplyTo"))
try {
message.replyToId = (String) jsonMessageHeader.get("ReplyTo");
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN, "-- ReplyTo = " + jsonMessageHeader.get("ReplyTo"));
}
if (jsonMessageHeader.containsKey("Type")) {
try {
message.jmsType = (String) jsonMessageHeader.get("Type");
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN, "-- Type = " + jsonMessageHeader.get("Type"));
}
}
}
@Override
public void retrieve(ReliableTransmitter transmitter) throws Exception {
if (uriConsume == null)
return;
try {
ArrayList<Message> messages = new ArrayList<>();
while (messages.size() < nbMaxMsg) {
//timeout = 0 => NoWait
Builder builder = null;
Message message = null;
if (!mediaTypeJson) {
//TEXT
builder = client.target(uriConsume)
.queryParam("timeout", timeout)
.request().accept(MediaType.TEXT_PLAIN);
Response response = builder.get();
String msg = response.readEntity(String.class);
if (200 != response.getStatus() || msg == null || msg.isEmpty())
break;
message = new Message();
message.type = Message.TEXT;
message.setText(msg);
} else {
//JSON
builder = client.target(uriConsume)
.queryParam("timeout", timeout)
.request().accept(MediaType.APPLICATION_JSON);
 
Response response = builder.get();
 
// if (logger.isLoggable(BasicLevel.DEBUG))
// logger.log(BasicLevel.DEBUG, "RESTAcquisition.retrieve response = " + response);
String json = response.readEntity(String.class);
 
if (200 != response.getStatus() || json == null || json.isEmpty() || "null".equals(json))
break;
 
try {
Gson gson = new GsonBuilder().create();
HashMap<String, Object> msg = gson.fromJson(json, HashMap.class);
if (msg == null) {
String text = gson.fromJson(json, String.class);
if (text != null) {
message = new Message();
message.type = Message.TEXT;
message.setText(text);
}
} else {
 
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RESTAcquisition.retrieve msg = " + msg);
 
message = new Message();
//get Properties
Map jmsProperties = (Map) msg.get("properties");
if (jmsProperties != null && jmsProperties.size() > 0) {
Set<Map.Entry> entrySet = jmsProperties.entrySet();
for (Map.Entry entry : entrySet) {
String key = (String) entry.getKey();
ArrayList value = (ArrayList) entry.getValue();
String propType = (String) value.get(1);
Object propValue = null;
switch (propType) {
case "java.lang.Boolean":
propValue = Boolean.parseBoolean((String)value.get(0));
break;
case "java.lang.Byte":
propValue = Byte.parseByte((String)value.get(0));
break;
case "java.lang.Short":
propValue = Short.parseShort((String)value.get(0));
break;
case "java.lang.Integer":
propValue = Integer.parseInt((String)value.get(0));
break;
case "java.lang.Long":
propValue = Long.parseLong((String)value.get(0));
break;
case "java.lang.Float":
propValue = Float.parseFloat((String)value.get(0));
break;
case "java.lang.Double":
propValue = Double.parseDouble((String)value.get(0));
break;
case "java.lang.String":
propValue = (String)value.get(0);
break;
 
default:
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN, "RESTAcquisition.retrieve property type not supported: " + propType);
break;
}
if (key != null && propValue != null)
message.setProperty(key, propValue);
}
}
 
//Get Header
Map jmsHeader = (Map) msg.get("header");
if (jmsHeader != null && jmsHeader.size() > 0) {
setMessageHeader(jmsHeader, message);
}
 
//Get body
String type = (String) msg.get("type");
message.jmsType = type;
switch (type) {
case "TextMessage": {
String jmsBody = (String) msg.get("body");
message.type = Message.TEXT;
message.setText(jmsBody);
} break;
 
case "MapMessage": {
Map jmsBody = (Map) msg.get("body");
message.type = Message.MAP;
message.setObject((Serializable) jmsBody);
} break;
 
case "BytesMessage": {
ArrayList jmsBody = (ArrayList) msg.get("body");
message.type = Message.BYTES;
byte[] bytes = new byte[((ArrayList) jmsBody).size()];
for (int i = 0; i < ((ArrayList) jmsBody).size(); i++) {
Object value = ((ArrayList) jmsBody).get(i);
bytes[i] = ((Number) value).byteValue();
}
message.body = bytes;
} break;
 
default:
logger.log(BasicLevel.ERROR, "TODO::: RESTAcquisition.retrieve type = " + type + " not supported.");//NTA tmp
break;
}
}
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN, "RESTAcquisition.retrieve json = " + json, e);
message = null;
}
}
if (message != null)
messages.add(message);
}
 
if (messages.size() > 0)
transmitter.transmit(messages, persistent);
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN, "RESTAcquisition.retrieve", e);
}
}
 
@Override
public void setProperties(Properties properties) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RESTAcquisition.setProperties properties = " + properties);
if (properties.containsKey(AcquisitionModule.PERSISTENT_PROPERTY)) {
try {
persistent = ConversionHelper.toBoolean(properties.get(AcquisitionModule.PERSISTENT_PROPERTY));
} catch (MessageValueException e) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN, "", e);
}
}
init(properties);
}
 
@Override
public void close() {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Close JMSAcquisition.");
 
if (uriCloseConsumer == null) {
return;
}
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RESTAcquisition:: close-consumer = " + uriCloseConsumer);
client.target(uriCloseConsumer).request().accept(MediaType.TEXT_PLAIN).delete();
}
}
Property changes:
Added: svn:mime-type
+ text/plain
/trunk/joram/joram/mom/extensions/restbridge/src/main/java/com/scalagent/joram/mom/dest/rest/RESTDistribution.java
New file
0,0 → 1,264
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2017 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package com.scalagent.joram.mom.dest.rest;
 
import java.net.URI;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Properties;
import java.util.Set;
 
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.Invocation.Builder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Link;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
 
import org.glassfish.jersey.client.ClientConfig;
import org.objectweb.joram.mom.dest.DistributionHandler;
import org.objectweb.joram.shared.messages.Message;
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;
 
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
 
import fr.dyade.aaa.common.Debug;
 
/**
* Distribution handler for the REST distribution bridge.
*/
public class RESTDistribution implements DistributionHandler {
 
private static final Logger logger = Debug.getLogger(RESTDistribution.class.getName());
 
private static final String HOST_PROP = "rest.hostName";
private static final String PORT_PROP = "rest.port";
private static final String USER_NAME_PROP = "rest.userName";
private static final String PASSWORD_PROP = "rest.password";
private static final String DESTINATION_NAME_PROP = "jms.destination";
 
private String hostName = "localhost";
private int port = 8989;
private Client client;
private WebTarget target;
private String userName = null;
private String password = null;
 
private String destName;
private URI uriCreateProducer;
private URI uriSendNextMsg;
private URI uriSendMsg;
private URI uriCloseProducer;
public void init(Properties properties, boolean firstTime) {
destName = properties.getProperty(DESTINATION_NAME_PROP);
if (destName == null) {
throw new IllegalArgumentException("Missing Destination JNDI name.");
}
if (properties.containsKey(HOST_PROP)) {
hostName = properties.getProperty(HOST_PROP);
}
try {
if (properties.containsKey(PORT_PROP)) {
port = Integer.parseInt(properties.getProperty(PORT_PROP));
}
} catch (NumberFormatException nfe) {
logger.log(BasicLevel.ERROR, "Property " + PORT_PROP
+ "could not be parsed properly, use default value.", nfe);
}
if (properties.containsKey(USER_NAME_PROP)) {
userName = properties.getProperty(USER_NAME_PROP);
}
if (properties.containsKey(PASSWORD_PROP)) {
password = properties.getProperty(PASSWORD_PROP);
}
ClientConfig config = new ClientConfig();
client = ClientBuilder.newClient(config);
target = client.target(UriBuilder.fromUri("http://" + hostName + ":" + port + "/joram/").build());
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "RESTDistribution.init Target : " + target.getUri());
}
}
 
public void distribute(Message message) throws Exception {
// if (logger.isLoggable(BasicLevel.DEBUG))
// logger.log(BasicLevel.DEBUG, "RESTDistribution.distribute(" + message + ')');
if (uriCreateProducer == null)
createProducer(userName, password);
 
sendMessage(message);
}
 
public void close() {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RESTDistribution.close()");
closeProducer();
}
private int lookup() {
Builder builder = target.path("jndi").path(destName).request();
Response response = builder.accept(MediaType.TEXT_PLAIN).get();
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RESTDistribution.lookup \"" + destName +"\" = " + response.getStatus());
print(response.getLinks());
//TODO test status
if (uriCreateProducer == null) {
uriCreateProducer = response.getLink("create-producer").getUri();
}
return response.getStatus();
}
private int createProducer(String userName, String password) {
if (uriCreateProducer == null) {
WebTarget wTarget = target.path("jndi").path(destName).path("create-producer")
.queryParam("name", "prod-" + destName)
.queryParam("client-id", "id-" + destName);
if (userName != null)
wTarget = wTarget.queryParam("user", userName);
if (password != null)
wTarget = wTarget.queryParam("password", password);
uriCreateProducer = wTarget.getUri();
}
 
// Create the producer prod-"destName"
WebTarget wTarget = client.target(uriCreateProducer)
.queryParam("name", "prod-" + destName)
.queryParam("client-id", "id-" + destName);
if (userName != null)
wTarget = wTarget.queryParam("user", userName);
if (password != null)
wTarget = wTarget.queryParam("password", password);
Response response = wTarget.request().accept(MediaType.TEXT_PLAIN).post(null);
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RESTDistribution.createProducer (" + destName + ") = " + response.getStatus());
if (response.getStatus() > 300) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RESTDistribution.createProducer" + destName + " ERROR ==== " + response.readEntity(String.class));
return response.getStatus();
}
print(response.getLinks());
uriCloseProducer = response.getLink("close-context").getUri();
uriSendNextMsg = response.getLink("send-next-message").getUri();
uriSendMsg = response.getLink("send-message").getUri();
return response.getStatus();
}
private int sendMessage(Message message) throws Exception {
if (uriSendMsg == null) {
return 400;
}
Response response;
if (message.properties == null && message.type == Message.TEXT) {
// no properties, only senda text message
response = client.target(uriSendMsg).request().
accept(MediaType.TEXT_PLAIN).post(Entity.entity(message.getText(), MediaType.TEXT_PLAIN));
if (response.getStatus() != 200) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RESTDistribution.sendMessage: " + response.getStatus() + ", not send: " + message);
} else {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RESTDistribution.sendMessage: " + message);
}
} else {
HashMap<String, Object> maps = new HashMap<>();
// message body
if (message.type == Message.TEXT) {
message.jmsType = "TextMessage";
maps.put("body", message.getText());
} else if (message.type == Message.BYTES) {
message.jmsType = "BytesMessage";
maps.put("body", message.body);
} else if (message.type == Message.MAP) {
message.jmsType = "MapMessage";
maps.put("body", message.getObject());
} else {
logger.log(BasicLevel.ERROR, "RESTDistribution.sendMessage: type " + message.jmsType + " not yet supported.");
return 400;
}
// message type
maps.put("type", message.jmsType);
// message properties
HashMap<String, Object> props = new HashMap<>();
Enumeration e = message.properties.keys();
while (e.hasMoreElements()) {
String key = (String) e.nextElement();
Object value = message.properties.get(key);
props.put(key, new String[]{value.toString(), value.getClass().getName()});
}
maps.put("properties", props);
 
// message header
HashMap<String, Object> header = new HashMap<>();
if (message.correlationId != null)
header.put("CorrelationID", new String[]{message.correlationId, String.class.getName()});
if (message.priority > 0)
header.put("Priority", new String[]{""+message.priority, Integer.class.getName()});
header.put("Type", message.jmsType);
//TODO: other headers prop
maps.put("header", header);
 
Gson gson = new GsonBuilder().create();
String json = gson.toJson(maps);
 
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RESTDistribution.sendMessage: json " + json);
// Send next message
response = client.target(uriSendMsg).request().accept(MediaType.TEXT_PLAIN).post(
Entity.entity(json, MediaType.APPLICATION_JSON));
}
return response.getStatus();
}
private int closeProducer() {
// close the producer
if (uriCloseProducer == null) {
return 400;
}
Response response = client.target(uriCloseProducer).request().accept(MediaType.TEXT_PLAIN).delete();
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RESTDistribution:: close-producer = " + response.getStatus());
return response.getStatus();
}
 
private void print(Set<Link> links) {
if (links.isEmpty())
return;
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, " links :");
for (Link link : links)
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "\t" + link.getRel() + " : " + link.getUri());
}
}
Property changes:
Added: svn:mime-type
+ text/plain
/trunk/joram/joram/mom/extensions/restbridge/pom.xml
New file
0,0 → 1,91
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
 
<modelVersion>4.0.0</modelVersion>
<artifactId>joram-mom-extensions-restbridge</artifactId>
<packaging>bundle</packaging>
<name>JORAM :: joram :: mom :: extensions :: restbridge</name>
<description>Builds the Joram restbridge extension project.</description>
 
<parent>
<groupId>org.ow2.joram</groupId>
<artifactId>joram-mom-extensions</artifactId>
<version>5.14.0-SNAPSHOT</version>
</parent>
 
<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<version>${maven.bundle.plugin.version}</version>
<extensions>true</extensions>
<configuration>
<instructions>
<Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
<Import-Package>
fr.dyade.aaa.agent,
fr.dyade.aaa.common,
javax.jms,
javax.naming,
javax.transaction.xa,
org.objectweb.joram.mom.dest,
org.objectweb.joram.mom.messages,
org.objectweb.joram.mom.notifications,
org.objectweb.joram.mom.util,
org.objectweb.joram.shared,
org.objectweb.joram.shared.excepts,
org.objectweb.joram.shared.messages,
org.objectweb.joram.shared.selectors,
org.objectweb.util.monolog,
org.objectweb.util.monolog.api</Import-Package>
<Export-Package>com.scalagent.joram.mom.dest.rest</Export-Package>
<DynamicImport-Package>*</DynamicImport-Package>
</instructions>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.ow2.spec.ee</groupId>
<artifactId>ow2-jms-2.0-spec</artifactId>
</dependency>
<dependency>
<groupId>org.ow2.joram</groupId>
<artifactId>joram-mom-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.ow2.joram</groupId>
<artifactId>joram-client-jms</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.ow2.joram</groupId>
<artifactId>joram-tools-rest-jms</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.ow2.joram</groupId>
<artifactId>jndi-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-server</artifactId>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.containers</groupId>
<artifactId>jersey-container-servlet-core</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
</dependencies>
</project>
Property changes:
Added: svn:mime-type
+ text/plain