OW2 Consortium jaspte

Rev

Blame | Last modification | View Log | RSS feed

/**
 * =========================================================================
 *                                      Bench4Q version 1.2.1
 * =========================================================================
 * 
 * Bench4Q is available on the Internet at http://forge.ow2.org/projects/jaspte
 * You can find latest version there.  
 * 
 * Distributed according to the GNU Lesser General Public Licence. 
 * This library is free software; you can redistribute it and/or modify it
 * under the terms of the GNU Lesser General Public License as published by   
 * the Free Software Foundation; either version 2.1 of the License, or any
 * later version.
 * 
 * SEE Copyright.txt FOR FULL COPYRIGHT INFORMATION.
 * 
 * This source code is distributed "as is" in the hope that it will be
 * useful.  It comes with no warranty, and no author or distributor
 * accepts any responsibility for the consequences of its use.
 *
 *
 * This version is a based on the implementation of TPC-W from University of Wisconsin. 
 * This version used some source code of The Grinder.
 *
 * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * 
 *  * Initial developer(s): Zhiquan Duan.
 * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * 
 * 
 */

package org.bench4Q.common.communication;

import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.bench4Q.common.UncheckedInterruptedException;
import org.bench4Q.common.communication.ResourcePool.Reservation;
import org.bench4Q.common.util.thread.InterruptibleRunnable;
import org.bench4Q.common.util.thread.ThreadPool;
import org.bench4Q.common.util.thread.ThreadSafeQueue;
import org.bench4Q.common.util.thread.ThreadSafeQueue.ShutdownException;

/**
 * Manages the receipt of messages from many clients.
 * 
 */
public final class ServerReceiver implements Receiver {

        private final MessageQueue m_messageQueue = new MessageQueue(true);
        private final List m_threadPools = new ArrayList();

        /**
         * Registers a new {@link Acceptor} from which the
         * <code>ServerReceiver</code> should process messages. Actively polls
         * connections of the given types for messages, deserialises them, and
         * queues them for retrieval using {@link #waitForMessage()}.
         * 
         * <p>
         * A single <code>ServerReceiver</code> can listen to messages from multiple
         * {@link Acceptor}s. You can register the same {@link Acceptor} with
         * multiple <code>ServerReceiver</code>s, but then there is no way of
         * controlling which receiver will receive messages from a given
         * {@link Acceptor}.
         * </p>
         * 
         * @param acceptor
         *            Acceptor.
         * @param connectionTypes
         *            Type of connections to listen for.
         * @param numberOfThreads
         *            How many threads to dedicate to processing the Acceptor. The
         *            threads this method spawns just read, deserialise, and queue.
         *            Set <code>numberOfThreads</code> to the number of concurrent
         *            streams you expect to be able to read.
         * @param idleThreadPollDelay
         *            Time in milliseconds that an idle thread should sleep if there
         *            are no sockets to process.
         * 
         * @exception CommunicationException
         *                If this <code>ServerReceiver</code> has been shutdown.
         */
        public void receiveFrom(Acceptor acceptor,
                        ConnectionType[] connectionTypes, int numberOfThreads,
                        final int idleThreadPollDelay) throws CommunicationException {

                if (connectionTypes.length == 0) {
                        // Nothing to do.
                        return;
                }

                final ResourcePool[] acceptedSocketSets = new ResourcePool[connectionTypes.length];

                for (int i = 0; i < connectionTypes.length; ++i) {
                        acceptedSocketSets[i] = acceptor.getSocketSet(connectionTypes[i]);
                }

                final ThreadPool.InterruptibleRunnableFactory runnableFactory = new ThreadPool.InterruptibleRunnableFactory() {
                        public InterruptibleRunnable create() {
                                return new ServerReceiverRunnable(new CombinedResourcePool(
                                                acceptedSocketSets), idleThreadPollDelay);
                        }
                };

                final ThreadPool threadPool = new ThreadPool("ServerReceiver ("
                                + acceptor.getPort() + ", " + Arrays.asList(connectionTypes)
                                + ")", numberOfThreads, runnableFactory);

                synchronized (this) {
                        try {
                                m_messageQueue.checkIfShutdown();
                        } catch (ShutdownException e) {
                                throw new CommunicationException("Shut down", e);
                        }

                        m_threadPools.add(threadPool);
                }

                threadPool.start();
        }

        /**
         * Block until a message is available, or another thread has called
         * {@link #shutdown}. Typically called from a message dispatch loop.
         * 
         * <p>
         * Multiple threads can call this method, but only one thread will receive a
         * given message.
         * </p>
         * 
         * @return The message or <code>null</code> if shut down.
         * @throws CommunicationException
         *             If an error occurred receiving a message.
         */
        public Message waitForMessage() throws CommunicationException {

                try {
                        return m_messageQueue.dequeue(true);
                } catch (ThreadSafeQueue.ShutdownException e) {
                        return null;
                }
        }

        /**
         * Shut down this receiver.
         */
        public synchronized void shutdown() {

                m_messageQueue.shutdown();

                final Iterator iterator = m_threadPools.iterator();

                while (iterator.hasNext()) {
                        ((ThreadPool) iterator.next()).stop();
                }
        }

        /**
         * Return the number of active threads. Package scope; used by the unit
         * tests.
         * 
         * @return The number of active threads.
         */
        synchronized int getActveThreadCount() {
                int result = 0;

                final Iterator iterator = m_threadPools.iterator();

                while (iterator.hasNext()) {
                        result += ((ThreadPool) iterator.next()).getThreadGroup()
                                        .activeCount();
                }

                return result;
        }

        /**
         * Obtain reservations from multiple ResourcePools. Delegates reserveNext()
         * to the next resource pool.
         * 
         * @author Philip Aston
         * @version $Revision: 3824 $
         */
        private static final class CombinedResourcePool {
                private final ResourcePool[] m_resourcePools;

                // Guarded by m_resourcePools.
                private int m_next;

                CombinedResourcePool(ResourcePool[] resourcePools) {
                        assert resourcePools.length > 0;
                        m_resourcePools = resourcePools;
                }

                public Reservation reserveNext() {
                        int i = 0;
                        int start;

                        synchronized (m_resourcePools) {
                                start = ++m_next;
                        }

                        while (true) {
                                final Reservation reservation = m_resourcePools[(start + i)
                                                % m_resourcePools.length].reserveNext();

                                if (!reservation.isSentinel()
                                                || i == m_resourcePools.length - 1) {
                                        return reservation;
                                }

                                ++i;
                        }
                }
        }

        private final class ServerReceiverRunnable implements InterruptibleRunnable {

                private final CombinedResourcePool m_sockets;
                private final int m_delay;

                private ServerReceiverRunnable(CombinedResourcePool sockets, int delay) {
                        m_sockets = sockets;
                        m_delay = delay;
                }

                public void interruptibleRun() {
                        try {
                                // Did we do some work on the last pass?
                                boolean idle = false;

                                while (true) {
                                        final Reservation reservation = m_sockets.reserveNext();
                                        boolean holdReservation = false;

                                        try {
                                                if (reservation.isSentinel()) {
                                                        if (idle) {
                                                                Thread.sleep(m_delay);
                                                        }

                                                        idle = true;
                                                } else {
                                                        final SocketWrapper socketWrapper = (SocketWrapper) reservation
                                                                        .getResource();

                                                        // We don't need to synchronise access to the
                                                        // SocketWrapper
                                                        // stream; access is protected through the socket
                                                        // set and only we
                                                        // hold the reservation.
                                                        final InputStream inputStream = socketWrapper
                                                                        .getInputStream();

                                                        if (inputStream.available() > 0) {

                                                                idle = false;

                                                                final ObjectInputStream objectStream = new ObjectInputStream(
                                                                                inputStream);

                                                                final Message message = (Message) objectStream
                                                                                .readObject();

                                                                if (message instanceof CloseCommunicationMessage) {
                                                                        reservation.close();
                                                                        continue;
                                                                }

                                                                if (message instanceof MessageRequiringResponse) {

                                                                        final MessageRequiringResponse messageRequiringResponse = (MessageRequiringResponse) message;

                                                                        messageRequiringResponse
                                                                                        .setResponder(new SenderWithReservation(
                                                                                                        new StreamSender(
                                                                                                                        socketWrapper
                                                                                                                                        .getOutputStream()),
                                                                                                        reservation));

                                                                        m_messageQueue.queue(message);

                                                                        // Whatever handles the
                                                                        // MessageExpectingResponse takes
                                                                        // responsibility for the reservation.
                                                                        holdReservation = true;
                                                                } else {
                                                                        m_messageQueue.queue(message);
                                                                }
                                                        }
                                                }
                                        } catch (IOException e) {
                                                reservation.close();
                                                UncheckedInterruptedException.ioException(e);
                                                m_messageQueue.queue(e);
                                        } catch (ClassNotFoundException e) {
                                                reservation.close();
                                                m_messageQueue.queue(e);
                                        } catch (InterruptedException e) {
                                                reservation.close();
                                                throw new UncheckedInterruptedException(e);
                                        } finally {
                                                if (!holdReservation) {
                                                        reservation.free();
                                                }
                                        }
                                }
                        } catch (ThreadSafeQueue.ShutdownException e) {
                                // We've been shutdown, exit this thread.
                        } finally {
                                // Ensure we're shutdown.
                                shutdown();
                        }
                }
        }

        /**
         * SenderWithReservation. A one-shot use Sender, only accessed through a
         * {@link MessageRequiringResponse} wrapper which ensures send() is called
         * at most once.
         * 
         * <p>
         * If no {@link MessageRequiringResponse#sendResponse(Message)} is not
         * called, the reservation will not be freed. No way to avoid this - we're
         * handing off responsibility to respond to a stream.
         * </p>
         * 
         */
        private static final class SenderWithReservation implements Sender {
                private final Sender m_delegateSender;
                private final Reservation m_reservation;

                private SenderWithReservation(Sender delegateSender,
                                Reservation reservation) {
                        m_delegateSender = delegateSender;
                        m_reservation = reservation;
                }

                public void send(Message message) throws CommunicationException {
                        try {
                                m_delegateSender.send(message);
                        } finally {
                                shutdown();
                        }
                }

                public void shutdown() {
                        // We rely on our ResponseSender's single call to send to ensure we
                        // don't
                        // free the reservation multiple times.
                        m_reservation.free();
                }
        }
}