OW2 Consortium joram

Compare Revisions

Ignore whitespace Rev 6439 → Rev 6440

/trunk/joram/samples/config/config_restbridge.properties
New file
0,0 → 1,166
# JORAM: Java(TM) Open Reliable Asynchronous Messaging
# Copyright (C) 2017 ScalAgent Distributed Technologies
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
# USA.
 
################################
# Framework config properties. #
################################
 
# To override the packages the framework exports by default from the
# class path, set this variable.
#org.osgi.framework.system.packages=
 
# To append packages to the default set of exported system packages,
# set this value.
#org.osgi.framework.system.packages.extra=
 
# The following property makes specified packages from the class path
# available to all bundles. You should avoid using this property.
#org.osgi.framework.bootdelegation=sun.*,com.sun.*
 
# Felix tries to guess when to implicitly boot delegate in certain
# situations to ease integration without outside code. This feature
# is enabled by default, uncomment the following line to disable it.
#felix.bootdelegation.implicit=false
 
# The following property explicitly specifies the location of the bundle
# cache, which defaults to "felix-cache" in the current working directory.
# If this value is not absolute, then the felix.cache.rootdir controls
# how the absolute location is calculated. (See next property)
#org.osgi.framework.storage=${felix.cache.rootdir}/felix-cache
 
# The following property is used to convert a relative bundle cache
# location into an absolute one by specifying the root to prepend to
# the relative cache path. The default for this property is the
# current working directory.
#felix.cache.rootdir=${user.dir}
 
# The following property controls whether the bundle cache is flushed
# the first time the framework is initialized. Possible values are
# "none" and "onFirstInit"; the default is "none".
#org.osgi.framework.storage.clean=onFirstInit
 
# The following property determines which actions are performed when
# processing the auto-deploy directory. It is a comma-delimited list of
# the following values: 'install', 'start', 'update', and 'uninstall'.
# An undefined or blank value is equivalent to disabling auto-deploy
# processing.
felix.auto.deploy.action=install,start
 
# The following property specifies the directory to use as the bundle
# auto-deploy directory; the default is 'bundle' in the working directory.
# felix.auto.deploy.dir=../ship/bundle
 
# The following property is a space-delimited list of bundle URLs
# to install when the framework starts. The ending numerical component
# is the target start level. Any number of these properties may be
# specified for different start levels.
felix.auto.start.1= \
file:../../../ship/bundle/org.apache.felix.shell.jar \
file:../../../ship/bundle/org.apache.felix.shell.tui.jar \
file:../../../ship/bundle/org.osgi.compendium.jar \
file:../../../ship/bundle/monolog.jar \
file:../../../ship/bundle/a3-common.jar \
file:../../../ship/bundle/a3-rt.jar \
file:../../../ship/bundle/jcup.jar \
file:../../../ship/bundle/jndi-shared.jar \
file:../../../ship/bundle/jndi-server.jar \
file:../../../ship/bundle/joram-shared.jar \
file:../../../ship/bundle/joram-mom-core.jar \
file:../../../ship/bundle/a3-osgi.jar \
file:../../../ship/bundle/joram-mom-extensions-ftp.jar \
file:../../../ship/bundle/joram-mom-extensions-mail.jar \
file:../../../ship/bundle/joram-mom-extensions-collector.jar \
file:../../../ship/bundle/joram-mom-extensions-scheduler.jar \
file:../../../ship/bundle/ow2-jta-1.1-spec.jar \
file:../../../ship/bundle/ow2-jms-2.0-spec.jar \
file:../../../ship/bundle/javax.annotation-api.jar \
file:../../../ship/bundle/javax.inject.jar \
file:../../../ship/bundle/geronimo-servlet_3.0_spec.jar \
file:../../../ship/bundle/javax.ws.rs-api.jar \
file:../../../ship/bundle/validation-api.jar \
file:../../../ship/bundle/jersey-container-servlet-core.jar \
file:../../../ship/bundle/jersey-server.jar \
file:../../../ship/bundle/jersey-guava.jar \
file:../../../ship/bundle/hk2-api.jar \
file:../../../ship/bundle/aopalliance-repackaged.jar \
file:../../../ship/bundle/hk2-utils.jar \
file:../../../ship/bundle/jersey-common.jar \
file:../../../ship/bundle/hk2-locator.jar \
file:../../../ship/bundle/osgi-resource-locator.jar \
file:../../../ship/bundle/jersey-client.jar \
file:../../../ship/bundle/javassist.jar \
file:../../../ship/bundle/jetty-server.jar \
file:../../../ship/bundle/jetty-http.jar \
file:../../../ship/bundle/jetty-util.jar \
file:../../../ship/bundle/jetty-io.jar \
file:../../../ship/bundle/jetty-servlet.jar \
file:../../../ship/bundle/jetty-security.jar \
file:../../../ship/bundle/jetty-continuation.jar \
file:../../../ship/bundle/servlet.jar \
file:../../../ship/bundle/jetty.jar \
file:../../../ship/bundle/gson.jar \
file:../../../ship/bundle/jndi-client.jar \
file:../../../ship/bundle/joram-client-jms.jar \
file:../../../ship/bundle/joram-mom-extensions-restbridge.jar \
file:../../../ship/bundle/joram-mom-extensions-jmsbridge.jar
felix.log.level=1
 
# Sets the initial start level of the framework upon startup.
#org.osgi.framework.startlevel.beginning=1
 
# Sets the start level of newly installed bundles.
felix.startlevel.bundle=1
 
# Felix installs a stream and content handler factories by default,
# uncomment the following line to not install them.
#felix.service.urlhandlers=false
 
# The launcher registers a shutdown hook to cleanly stop the framework
# by default, uncomment the following line to disable it.
#felix.shutdown.hook=false
 
#############################
# Bundle config properties. #
#############################
 
# IP Address on which the remote shell is accessible (since 1.0.4).
# Note: Starting with version 1.0.4 the remote shell does not listen on
# all interfaces by default but on the localhost only. That is, by default
# the remote shell is only accessible from the host on which the remote
# shell is running. To access the system from another host, you have to
# configure the IP address of the interface to which the remote shell
# should be attached.
#osgi.shell.telnet.ip=127.0.0.1
 
# Port on which the remote shell is accessible.
#osgi.shell.telnet.port=6667
 
# The maximum number of simultaneous connections.
#osgi.shell.telnet.maxconn=2
 
# Sets the SO_TIMEOUT socket option to the given number of milliseconds.
# The default is no timeout.
#osgi.shell.telnet.socketTimeout=0
 
# Sets the identifier of the JORAM agent server to start, as defined in
# a3servers.xml file. Server 0 is started by default.
#fr.dyade.aaa.agent.AgentServer.id=0
 
# Sets the directory name where the agent server stores its persistent data.
#fr.dyade.aaa.agent.AgentServer.storage=s${fr.dyade.aaa.agent.AgentServer.id}
Property changes:
Added: svn:mime-type
+ text/plain
/trunk/joram/samples/config/restbridge_a3servers.xml
New file
0,0 → 1,30
<?xml version="1.0"?>
<!--
- Copyright (C) 2017 ScalAgent Distributed Technologies
-
- This library is free software; you can redistribute it and/or
- modify it under the terms of the GNU Lesser General Public
- License as published by the Free Software Foundation; either
- version 2.1 of the License, or any later version.
-
- This library is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public
- License along with this library; if not, write to the Free Software
- Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- USA.
-->
<config>
<property name="Transaction" value="fr.dyade.aaa.ext.NGTransaction"/>
<server id="1" name="S1" hostname="localhost">
<service class="fr.dyade.aaa.jndi2.server.JndiServer" args="16401"/>
 
<service class="org.objectweb.joram.mom.proxies.ConnectionManager" args="root root" />
<service class="org.objectweb.joram.mom.proxies.tcp.TcpProxyService" args="16011" />
</server>
</config>
Property changes:
Added: svn:mime-type
+ text/plain
/trunk/joram/samples/src/joram/rest/joramAdmin.xml
New file
0,0 → 1,60
<?xml version="1.0"?>
<!--
- Copyright (C) 2017 ScalAgent Distributed Technologies
-
- This library is free software; you can redistribute it and/or
- modify it under the terms of the GNU Lesser General Public
- License as published by the Free Software Foundation; either
- version 2.1 of the License, or any later version.
-
- This library is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public
- License along with this library; if not, write to the Free Software
- Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- USA.
-->
 
<JoramAdmin>
<TcpAdminModule host="localhost" port="16010" name="root" password="root">
<property name="connectingTimer" value="60"/>
</TcpAdminModule>
<InitialContext>
<property name="java.naming.factory.initial"
value="fr.dyade.aaa.jndi2.client.NamingContextFactory"/>
<property name="java.naming.factory.host" value="localhost"/>
<property name="java.naming.factory.port" value="16400"/>
</InitialContext>
 
<TcpConnectionFactory name="cf" host="localhost" port="16010">
<jndi name="cf"/>
</TcpConnectionFactory>
<TcpConnectionFactory>
<jndi name="qcf"/>
</TcpConnectionFactory>
<TcpConnectionFactory>
<jndi name="tcf"/>
</TcpConnectionFactory>
 
<User name="anonymous" password="anonymous"/>
<Queue name="queue">
<freeReader/>
<freeWriter/>
<jndi name="queue"/>
</Queue>
<Topic name="topic">
<freeReader/>
<freeWriter/>
<jndi name="topic"/>
</Topic>
</JoramAdmin>
Property changes:
Added: svn:mime-type
+ text/plain
/trunk/joram/samples/src/joram/restbridge/Admin.java
New file
0,0 → 1,91
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2017 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package restbridge;
 
import java.util.Properties;
 
import javax.jms.ConnectionFactory;
 
import org.objectweb.joram.client.jms.Destination;
import org.objectweb.joram.client.jms.Queue;
import org.objectweb.joram.client.jms.admin.AdminModule;
import org.objectweb.joram.client.jms.admin.User;
import org.objectweb.joram.client.jms.tcp.TcpConnectionFactory;
import org.objectweb.joram.client.jms.admin.JMSAcquisitionQueue;
import org.objectweb.joram.client.jms.admin.JMSDistributionQueue;
 
public class Admin {
public static void main(String[] args) throws Exception {
System.out.println();
System.out.println("Rest Bridge administration...");
 
ConnectionFactory bridgeCF = TcpConnectionFactory.create("localhost", 16011);
 
AdminModule.connect(bridgeCF, "root", "root");
// Create a topic forwarding its messages to the configured rest queue.
Properties prop = new Properties();
prop.put("distribution.className", "com.scalagent.joram.mom.dest.rest.RESTDistribution");
prop.put("rest.hostName", "localhost");
prop.put("rest.port", "8989");
prop.put("rest.userName", "anonymous");
prop.put("rest.password", "anonymous");
prop.put("jms.destination", "queue");
Queue queueDist = Queue.create(1, "queueDist", Destination.DISTRIBUTION_QUEUE, prop);
queueDist.setFreeWriting();
System.out.println("joram distribution queue = " + queueDist);
 
// Create a queue getting its messages from the configured rest queue.
prop = new Properties();
prop.put("acquisition.className", "com.scalagent.joram.mom.dest.rest.RESTAcquisition");
prop.put("rest.hostName", "localhost");
prop.put("rest.port", "8989");
prop.put("rest.userName", "anonymous");
prop.put("rest.password", "anonymous");
prop.put("rest.nbMaxMsgByPeriode", "1000");
//prop.put("rest.mediaTypeJson", "false");
// prop.put("rest.timeout", "0");
prop.put("jms.destination", "queue");
prop.put("acquisition.period", "1000");
Queue queueAcq = Queue.create(1, "queueAcq", Destination.ACQUISITION_QUEUE, prop);
queueAcq.setFreeReading();
System.out.println("joram acquisition queue = " + queueAcq);
 
User.create("anonymous", "anonymous");
// bind foreign destination and connectionFactory
Properties jndiProps = new Properties();
jndiProps.setProperty("java.naming.factory.initial", "fr.dyade.aaa.jndi2.client.NamingContextFactory");
jndiProps.setProperty("java.naming.factory.host", "localhost");
jndiProps.setProperty("java.naming.factory.port", "16401");
javax.naming.Context jndiCtx = new javax.naming.InitialContext(jndiProps);
jndiCtx.bind("bridgeCF", bridgeCF);
jndiCtx.bind("queueDist", queueDist);
jndiCtx.bind("queueAcq", queueAcq);
jndiCtx.close();
AdminModule.disconnect();
System.out.println("Admin closed.");
}
}
Property changes:
Added: svn:mime-type
+ text/plain
/trunk/joram/samples/src/joram/restbridge/PerfConsumer.java
New file
0,0 → 1,172
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2017 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package restbridge;
 
import java.util.Properties;
 
import javax.jms.BytesMessage;
import javax.jms.TextMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.naming.InitialContext;
 
import org.objectweb.joram.client.jms.Destination;
import org.objectweb.joram.client.jms.Topic;
 
/**
* MessageConsumer receiving messages on queue or topic for performance statistics.
*/
public class PerfConsumer implements MessageListener {
static Destination dest = null;
static ConnectionFactory cf = null;
 
static int NbMsgPerRound = 10000;
static int NbMaxMessage = -1;
static boolean durable = false;
static boolean transacted = true;
static boolean dupsOk = true;
static int queueMessageReadMax = 1000;
static int topicAckBufferMax = 100;
static boolean implicitAck = true;
 
static Session session = null;
 
public static boolean getBoolean(String key, boolean def) {
String value = System.getProperty(key, Boolean.toString(def));
return Boolean.parseBoolean(value);
}
 
public static void main (String args[]) throws Exception {
durable = getBoolean("SubDurable", durable);
transacted = getBoolean("Transacted", transacted);
dupsOk = getBoolean("dupsOk", dupsOk);
queueMessageReadMax = Integer.getInteger("queueMessageReadMax", queueMessageReadMax).intValue();
topicAckBufferMax = Integer.getInteger("topicAckBufferMax", topicAckBufferMax).intValue();
implicitAck = getBoolean("implicitAck", implicitAck);
 
NbMsgPerRound = Integer.getInteger("NbMsgPerRound", NbMsgPerRound).intValue();
NbMaxMessage = Integer.getInteger("NbMaxMessage", NbMaxMessage).intValue();
Properties jndiProps = new Properties();
jndiProps.setProperty("java.naming.factory.initial", "fr.dyade.aaa.jndi2.client.NamingContextFactory");
jndiProps.setProperty("java.naming.factory.host", "localhost");
jndiProps.setProperty("java.naming.factory.port", "16401");
javax.naming.Context jndiCtx = new javax.naming.InitialContext(jndiProps);
Destination dest = (Destination) jndiCtx.lookup("queueAcq");
ConnectionFactory cf = (ConnectionFactory) jndiCtx.lookup("bridgeCF");
jndiCtx.close();
 
System.out.println("Destination: " + (dest.isQueue()?"Queue":"Topic"));
System.out.println("Subscriber: durable=" + durable + ", dupsOk=" + dupsOk);
System.out.println(" queueMessageReadMax=" + queueMessageReadMax +
", topicAckBufferMax=" + topicAckBufferMax);
System.out.println("Transacted=" + transacted);
System.out.println("Subscriber: implicitAck=" + implicitAck);
 
Connection cnx = cf.createConnection();
cnx.setClientID("cnx_dursub");
int mode;
if (dupsOk) {
mode = Session.DUPS_OK_ACKNOWLEDGE;
} else {
mode = Session.AUTO_ACKNOWLEDGE;
}
session = cnx.createSession(transacted, mode);
((org.objectweb.joram.client.jms.Session)session).setQueueMessageReadMax(queueMessageReadMax);
((org.objectweb.joram.client.jms.Session)session).setTopicAckBufferMax(topicAckBufferMax);
((org.objectweb.joram.client.jms.Session)session).setImplicitAck(implicitAck);
MessageConsumer consumer = null;
try {
if (durable && dest instanceof Topic) {
consumer = session.createDurableSubscriber((Topic)dest, "dursub");
} else {
consumer = session.createConsumer(dest);
}
PerfConsumer listener = new PerfConsumer();
consumer.setMessageListener(listener);
cnx.start();
 
if (NbMaxMessage == -1) {
System.in.read();
} else {
do {
Thread.sleep(1000L);
} while (listener.counter < NbMaxMessage);
}
} finally {
consumer.close();
if (durable && dest instanceof Topic) {
session.unsubscribe("dursub");
}
}
if (transacted) session.commit();
cnx.close();
}
 
int counter = 0;
long travel = 0L;
 
long start = 0L;
long last = 0L;
 
long t1 = 0L;
public synchronized void onMessage(Message m) {
try {
TextMessage msg = (TextMessage) m;
//BytesMessage msg = (BytesMessage) m;
 
last = System.currentTimeMillis();
int index = msg.getIntProperty("index");
if (index == 0) start = t1 = last;
 
travel += (last - msg.getLongProperty("time"));
counter += 1;
if (transacted && (((counter%10) == 9) || (index == 0)))
session.commit();
if ((counter%NbMsgPerRound) == (NbMsgPerRound -1)) {
long x = (NbMsgPerRound * 1000L) / (last - t1);
t1 = last;
System.out.println("#" + ((counter+1)/NbMsgPerRound) + " x " + NbMsgPerRound + " msg -> " + x + " msg/s " + (travel/counter));
}
 
} catch (IllegalStateException exc) {
throw exc;
} catch (Throwable exc) {
exc.printStackTrace();
}
}
}
Property changes:
Added: svn:mime-type
+ text/plain
/trunk/joram/samples/src/joram/restbridge/Consumer.java
New file
0,0 → 1,60
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2017 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package restbridge;
 
import java.util.Properties;
 
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
 
/**
* Consumes messages on a foreign destination through the JORAM bridge.
*/
public class Consumer {
 
public static void main(String[] args) throws Exception {
 
Properties jndiProps = new Properties();
jndiProps.setProperty("java.naming.factory.initial", "fr.dyade.aaa.jndi2.client.NamingContextFactory");
jndiProps.setProperty("java.naming.factory.host", "localhost");
jndiProps.setProperty("java.naming.factory.port", "16401");
javax.naming.Context jndiCtx = new javax.naming.InitialContext(jndiProps);
Destination bridgeDest = (Destination) jndiCtx.lookup("queueAcq");
ConnectionFactory bridgeCF = (ConnectionFactory) jndiCtx.lookup("bridgeCF");
jndiCtx.close();
 
Connection bridgeCnx = bridgeCF.createConnection();
Session bridgeSess = bridgeCnx.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer bridgeCons = bridgeSess.createConsumer(bridgeDest);
bridgeCons.setMessageListener(new MsgListener("bridge"));
bridgeCnx.start();
System.in.read();
 
bridgeCnx.close();
}
}
Property changes:
Added: svn:mime-type
+ text/plain
/trunk/joram/samples/src/joram/restbridge/MsgListener.java
New file
0,0 → 1,62
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2017 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package restbridge;
 
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import javax.jms.BytesMessage;
 
/**
* Implements the <code>javax.jms.MessageListener</code> interface.
*/
public class MsgListener implements MessageListener {
String who;
public MsgListener(String who) {
this.who = who;
}
public void onMessage(Message msg) {
try {
if (msg instanceof TextMessage) {
System.out.println(who + " receive on acquisition queue: " + ((TextMessage) msg).getText());
} else if (msg instanceof BytesMessage) {
byte[] value = new byte[100];
((BytesMessage) msg).readBytes(value);
String str = new String(value);
System.out.println(who + " receive on acquisition queue: " + str);
}
else
System.out.println(who + " receive on acquisition queue: " + msg);
try {
System.out.println("time = " + msg.getLongProperty("time"));
System.out.println("index = " + msg.getIntProperty("index"));
} catch (Exception exc) { }
}
catch (JMSException exc) {
System.err.println("Exception in listener: " + exc);
}
}
}
Property changes:
Added: svn:mime-type
+ text/plain
/trunk/joram/samples/src/joram/restbridge/PerfProducer.java
New file
0,0 → 1,152
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2017 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package restbridge;
 
import java.util.Properties;
 
import javax.jms.BytesMessage;
import javax.jms.TextMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.InitialContext;
 
import org.objectweb.joram.client.jms.Destination;
 
/**
* MessageProducer sending messages on queue or topic for performance statistics.
*/
public class PerfProducer implements Runnable {
static int NbClient = 1;
static int Round = 50;
static int NbMsgPerRound = 1000;
static int MsgSize = 1000;
static int mps = 10000;
 
static Destination dest = null;
static ConnectionFactory cf = null;
 
static boolean MsgTransient = true;
static boolean SwapAllowed = false;
static boolean transacted = true;
static boolean asyncSend = false;
 
public static boolean getBoolean(String key, boolean def) {
String value = System.getProperty(key, Boolean.toString(def));
return Boolean.parseBoolean(value);
}
 
public static void main (String args[]) throws Exception {
NbClient = Integer.getInteger("NbClient", NbClient).intValue();
Round = Integer.getInteger("Round", Round).intValue();
NbMsgPerRound = Integer.getInteger("NbMsgPerRound", NbMsgPerRound).intValue();
MsgSize = Integer.getInteger("MsgSize", MsgSize).intValue();
mps = Integer.getInteger("mps", mps).intValue();
 
MsgTransient = getBoolean("MsgTransient", MsgTransient);
SwapAllowed = getBoolean("SwapAllowed", SwapAllowed);
transacted = getBoolean("Transacted", transacted);
asyncSend = getBoolean("asyncSend", asyncSend);
 
Properties jndiProps = new Properties();
jndiProps.setProperty("java.naming.factory.initial", "fr.dyade.aaa.jndi2.client.NamingContextFactory");
jndiProps.setProperty("java.naming.factory.host", "localhost");
jndiProps.setProperty("java.naming.factory.port", "16401");
javax.naming.Context jndiCtx = new javax.naming.InitialContext(jndiProps);
dest = (Destination) jndiCtx.lookup("queueDist");
cf = (ConnectionFactory) jndiCtx.lookup("bridgeCF");
jndiCtx.close();
 
System.out.println("Destination: " + (dest.isQueue()?"Queue":"Topic"));
System.out.println("Message: MsgTransient=" + MsgTransient);
System.out.println("Message: SwapAllowed=" + SwapAllowed);
System.out.println("Transacted=" + transacted);
System.out.println("asyncSend=" + asyncSend);
System.out.println("NbMsg=" + (Round*NbMsgPerRound) + ", MsgSize=" + MsgSize);
((org.objectweb.joram.client.jms.ConnectionFactory) cf).getParameters().asyncSend = asyncSend;
for (int i=0; i<NbClient; i++) {
new Thread(new PerfProducer()).start();
}
}
 
public void run() {
try {
Connection cnx = cf.createConnection();
Session session = cnx.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(dest);
if (MsgTransient) {
producer.setDeliveryMode(javax.jms.DeliveryMode.NON_PERSISTENT);
}
byte[] content = new byte[MsgSize];
for (int i = 0; i< MsgSize; i++)
content[i] = (byte) (i & 0xFF);
 
long dtx = 0;
long start = System.currentTimeMillis();
for (int i=0; i<(Round*NbMsgPerRound); i++) {
TextMessage msg = session.createTextMessage();
//BytesMessage msg = session.createBytesMessage();
if (SwapAllowed) {
msg.setBooleanProperty("JMS_JORAM_SWAPALLOWED", true);
}
msg.setText(new String(content));
//msg.writeBytes(content);
msg.setLongProperty("time", System.currentTimeMillis());
msg.setIntProperty("index", i);
producer.send(msg);
 
if (transacted && ((i%10) == 9)) session.commit();
 
if ((i%NbMsgPerRound) == (NbMsgPerRound-1)) {
long dtx1 = (i * 1000L) / mps;
long dtx2 = System.currentTimeMillis() - start;
if (dtx1 > (dtx2 + 20)) {
dtx += (dtx1 - dtx2);
Thread.sleep(dtx1 - dtx2);
}
if (dtx2 > 0)
System.out.println("sent=" + i + ", mps=" + ((((long) i) * 1000L)/dtx2));
else
System.out.println("sent=" + i);
}
}
long end = System.currentTimeMillis();
long dt = end - start;
 
System.out.println("----------------------------------------------------");
System.out.println("| sender dt=" + ((dt *1000L)/(Round*NbMsgPerRound)) + "us -> " +
((1000L * (Round*NbMsgPerRound)) / (dt)) + "msg/s");
System.out.println("| sender wait=" + dtx + "ms");
 
cnx.close();
} catch (Exception exc) {
exc.printStackTrace();
}
}
}
Property changes:
Added: svn:mime-type
+ text/plain
/trunk/joram/samples/src/joram/restbridge/Producer.java
New file
0,0 → 1,74
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2017 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package restbridge;
 
import java.util.Properties;
 
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.BytesMessage;
 
/**
* Produces messages on the foreign destination.
*/
public class Producer {
 
public static void main(String[] args) throws Exception {
 
Properties jndiProps = new Properties();
jndiProps.setProperty("java.naming.factory.initial", "fr.dyade.aaa.jndi2.client.NamingContextFactory");
jndiProps.setProperty("java.naming.factory.host", "localhost");
jndiProps.setProperty("java.naming.factory.port", "16401");
javax.naming.Context jndiCtx = new javax.naming.InitialContext(jndiProps);
Destination bridgeDest = (Destination) jndiCtx.lookup("queueDist");
ConnectionFactory bridgeCF = (ConnectionFactory) jndiCtx.lookup("bridgeCF");
jndiCtx.close();
 
Connection bridgeCnx = bridgeCF.createConnection();
Session bridgeSess = bridgeCnx.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer bridgeProducer = bridgeSess.createProducer(bridgeDest);
 
TextMessage msg = bridgeSess.createTextMessage();
//BytesMessage msg = bridgeSess.createBytesMessage();
 
for (int i = 1; i < 2; i++) {
msg.setText("Joram message number " + i + " sent through distribution bridge queue.");
System.out.println("send msg = " + msg.getText());
// msg.writeBytes(new String("Joram message number " + i + " sent through distribution bridge queue.").getBytes());
// msg.setLongProperty("time", System.currentTimeMillis());
// msg.setIntProperty("index", i);
bridgeProducer.send(msg);
System.out.println("send " + msg);
}
 
//bridgeSess.commit();
 
bridgeCnx.close();
}
}
Property changes:
Added: svn:mime-type
+ text/plain
/trunk/joram/samples/src/joram/restbridge/Receiver.java
New file
0,0 → 1,65
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2017 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package restbridge;
 
import java.util.Properties;
import javax.jms.*;
import javax.naming.*;
 
/**
* Receives messages from the queue.
*/
public class Receiver {
 
public static void main(String[] args) throws Exception {
 
System.out.println("Receive from the rest bridge");
 
Properties jndiProps = new Properties();
jndiProps.setProperty("java.naming.factory.initial", "fr.dyade.aaa.jndi2.client.NamingContextFactory");
jndiProps.setProperty("java.naming.factory.host", "localhost");
jndiProps.setProperty("java.naming.factory.port", "16401");
javax.naming.Context jndiCtx = new javax.naming.InitialContext(jndiProps);
Destination bridgeDest = (Destination) jndiCtx.lookup("queueAcq");
ConnectionFactory bridgeCF = (ConnectionFactory) jndiCtx.lookup("bridgeCF");
jndiCtx.close();
 
Connection qc = bridgeCF.createConnection();
Session qs = qc.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer qrec = qs.createConsumer(bridgeDest);
 
qc.start();
 
for (int j = 0; j < 10; j++) {
Message msg = qrec.receive();
if (msg instanceof TextMessage)
System.out.println("Received: " + ((TextMessage) msg).getText());
}
 
qc.close();
 
System.out.println();
System.out.println("Consumer closed.");
}
}
Property changes:
Added: svn:mime-type
+ text/plain