OW2 Consortium jaspte

Rev

Blame | Last modification | View Log | RSS feed

/**
 * =========================================================================
 *                                      Bench4Q version 1.2.1
 * =========================================================================
 * 
 * Bench4Q is available on the Internet at http://forge.ow2.org/projects/jaspte
 * You can find latest version there.  
 * 
 * Distributed according to the GNU Lesser General Public Licence. 
 * This library is free software; you can redistribute it and/or modify it
 * under the terms of the GNU Lesser General Public License as published by   
 * the Free Software Foundation; either version 2.1 of the License, or any
 * later version.
 * 
 * SEE Copyright.txt FOR FULL COPYRIGHT INFORMATION.
 * 
 * This source code is distributed "as is" in the hope that it will be
 * useful.  It comes with no warranty, and no author or distributor
 * accepts any responsibility for the consequences of its use.
 *
 *
 * This version is a based on the implementation of TPC-W from University of Wisconsin. 
 * This version used some source code of The Grinder.
 *
 * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * 
 *  * Initial developer(s): Zhiquan Duan.
 * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * 
 * 
 */

package org.bench4Q.agent;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;

import org.bench4Q.agent.common.EngineException;
import org.bench4Q.agent.messages.CacheHighWaterMark;
import org.bench4Q.agent.messages.ClearCacheMessage;
import org.bench4Q.agent.messages.DistributeFileMessage;
import org.bench4Q.agent.messages.DistributionCacheCheckpointMessage;
import org.bench4Q.common.UncheckedInterruptedException;
import org.bench4Q.common.communication.CommunicationException;
import org.bench4Q.common.communication.Message;
import org.bench4Q.common.communication.MessageDispatchRegistry;
import org.bench4Q.common.communication.MessageDispatchRegistry.AbstractHandler;
import org.bench4Q.common.util.Directory;
import org.bench4Q.common.util.FileContents;
import org.bench4Q.common.util.StreamCopier;

/**
 * Process {@link ClearCacheMessage}s and {@link DistributeFileMessage}s
 * received from the console.
 * 
 */
public final class FileStore {

        private final File m_readmeFile;

        // Access guarded by self.
        private final Directory m_incomingDirectory;

        private final Directory m_currentDirectory;

        // Guarded by m_incomingDirectory
        private boolean m_incremental;

        private volatile CacheHighWaterMark m_cacheHighWaterMark = new OutOfDateCacheHighWaterMark();

        /**
         * @param directory
         * @throws FileStoreException
         */
        public FileStore(File directory) throws FileStoreException {

                final File rootDirectory = directory.getAbsoluteFile();

                if (rootDirectory.exists()) {
                        if (!rootDirectory.isDirectory()) {
                                throw new FileStoreException("Could not write to directory '"
                                                + rootDirectory
                                                + "' as file with that name already exists");
                        }

                        if (!rootDirectory.canWrite()) {
                                throw new FileStoreException("Could not write to directory '"
                                                + rootDirectory + "'");
                        }
                }

                m_readmeFile = new File(rootDirectory, "README.txt");

                try {
                        m_incomingDirectory = new Directory(new File(rootDirectory,
                                        "incoming"));
                        m_currentDirectory = new Directory(new File(rootDirectory,
                                        "current"));
                } catch (Directory.DirectoryException e) {
                        throw new FileStoreException(e.getMessage(), e);
                }

                m_incremental = false;
        }

        /**
         * @return CacheHighWaterMark
         * @throws FileStoreException
         */
        public Directory getDirectory() throws FileStoreException {
                try {
                        synchronized (m_incomingDirectory) {
                                if (m_incomingDirectory.getFile().exists()) {
                                        m_incomingDirectory.copyTo(m_currentDirectory,
                                                        m_incremental);
                                }

                                m_incremental = true;
                        }

                        return m_currentDirectory;
                } catch (IOException e) {
                        UncheckedInterruptedException.ioException(e);
                        throw new FileStoreException(
                                        "Could not create file store directory", e);
                }
        }

        /**
         * @return CacheHighWaterMark
         */
        public CacheHighWaterMark getCacheHighWaterMark() {
                return m_cacheHighWaterMark;
        }

        /**
         * Registers message handlers with a dispatcher.
         * 
         * @param messageDispatcher
         *            The dispatcher.
         */

        public void registerMessageHandlers(
                        MessageDispatchRegistry messageDispatcher) {

                messageDispatcher.set(ClearCacheMessage.class, new AbstractHandler() {
                        public void send(Message message) throws CommunicationException {
                                // m_logger.output("Clearing file store");

                                try {
                                        synchronized (m_incomingDirectory) {
                                                m_incomingDirectory.deleteContents();
                                                m_incremental = false;
                                        }
                                } catch (Directory.DirectoryException e) {
                                        // m_logger.error(e.getMessage());
                                        throw new CommunicationException(e.getMessage(), e);
                                }
                        }
                });

                messageDispatcher.set(DistributeFileMessage.class,
                                new AbstractHandler() {
                                        public void send(Message message)
                                                        throws CommunicationException {
                                                try {
                                                        synchronized (m_incomingDirectory) {
                                                                m_incomingDirectory.create();

                                                                createReadmeFile();

                                                                final FileContents fileContents = ((DistributeFileMessage) message)
                                                                                .getFileContents();

                                                                // m_logger.output("Updating file store: " +
                                                                // fileContents);
                                                                fileContents.create(m_incomingDirectory);
                                                        }
                                                } catch (FileContents.FileContentsException e) {
                                                        // m_logger.error(e.getMessage());
                                                        throw new CommunicationException(e.getMessage(), e);
                                                } catch (Directory.DirectoryException e) {
                                                        // m_logger.error(e.getMessage());
                                                        throw new CommunicationException(e.getMessage(), e);
                                                }
                                        }
                                });

                messageDispatcher.set(DistributionCacheCheckpointMessage.class,
                                new AbstractHandler() {
                                        public void send(Message message)
                                                        throws CommunicationException {
                                                m_cacheHighWaterMark = ((DistributionCacheCheckpointMessage) message)
                                                                .getCacheHighWaterMark();
                                        }
                                });
        }

        private void createReadmeFile() throws CommunicationException {
                if (!m_readmeFile.exists()) {
                        try {
                                new StreamCopier(4096, true).copy(getClass()
                                                .getResourceAsStream("resources/FileStoreReadme.txt"),
                                                new FileOutputStream(m_readmeFile));
                        } catch (IOException e) {
                                UncheckedInterruptedException.ioException(e);
                                // m_logger.error(e.getMessage());
                                throw new CommunicationException(e.getMessage(), e);
                        }
                }
        }

        /**
         * Exception that indicates a <code>FileStore</code> related problem.
         */
        public static final class FileStoreException extends EngineException {
                FileStoreException(String message) {
                        super(message);
                }

                FileStoreException(String message, Throwable e) {
                        super(message, e);
                }
        }

        private static final class OutOfDateCacheHighWaterMark implements
                        CacheHighWaterMark {

                public long getTime() {
                        return -1;
                }

                public boolean isForSameCache(CacheHighWaterMark other) {
                        return false;
                }
        }
}