OW2 Consortium dream

Rev

Rev 2177 | Blame | Compare with Previous | Last modification | View Log | RSS feed

/**
 * Dream
 * Copyright (C) 2003-2004 INRIA Rhone-Alpes
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 *
 * Contact: dream@objectweb.org
 *
 * Initial developer(s):
 * Contributor(s): 
 */

package org.objectweb.dream.protocol.messagePassing.overChannel;

import org.objectweb.dream.IOPushException;
import org.objectweb.dream.cache.api.CacheEntry;
import org.objectweb.dream.cache.api.CacheEntryFilter;
import org.objectweb.dream.cache.api.CacheException;
import org.objectweb.dream.cache.api.CacheManager;
import org.objectweb.dream.cache.api.FixableCacheEntry;
import org.objectweb.dream.cache.api.UnFixProtocolException;
import org.objectweb.dream.cache.api.UnbindManager;
import org.objectweb.dream.cache.lib.BasicCacheManagerAttributeController;
import org.objectweb.dream.cache.replacement.api.ReplaceableCacheEntry;
import org.objectweb.dream.protocol.ExportIdentifier;
import org.objectweb.dream.util.Error;
import org.objectweb.dream.util.LoggerUtil;
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;

/**
 * Identified Session (i.e. with a remote export identifier) Manager
 * 
 * @author pgarcia
 */
public class IdentifiedSessionManager {

    /**
     * Logger of the class
     */
    Logger logger;

    /**
     * Cache fractal interface to bind/fix cache entries
     */
    private final CacheManager cacheManagerItf;

    /**
     * Cache fractal interface to unbind cache entries
     */
    private final UnbindManager unbindManagerItf;

    /**
     * Cache entry filter for the closeAll() method
     */
    private final CacheEntryFilter cacheEntryFilterClosingSocket = new CacheEntryFilterClosingSocket();

    /**
     * Cache entry filter to use only unfixed entries
     */
    private final CacheEntryFilter cacheEntryfilter = new CacheUnfixedEntryFilter();

    /**
     * Control interface used to manage parameters of the cache like the
     * maxSize, the currentSize...
     */
    private final BasicCacheManagerAttributeController cacheAttributeControllerItf;

    /**
     * Session manager
     */
    private final SessionManager sessionManager;

    /**
     * Constructor
     * 
     * @param messagePassingOverChannelImpl
     *            : the related protocol
     */
    public IdentifiedSessionManager(MessagePassingOverChannelImpl messagePassingOverChannelImpl,
            SessionManager sessionManager) {
        this.logger = messagePassingOverChannelImpl.logger;
        this.cacheManagerItf = messagePassingOverChannelImpl.cacheManagerItf;
        this.unbindManagerItf = messagePassingOverChannelImpl.unbindManagerItf;
        this.cacheAttributeControllerItf = messagePassingOverChannelImpl.cacheAttributeControllerItf;
        this.sessionManager = sessionManager;
    }

    /**
     * Reserve the given Session.
     * 
     * @param session
     * 
     * @throws IOPushException
     */
    public void reserveSession(Session session) throws IOPushException {
        if (session.replacingSession != null) {
            // wait that the replacing session is done
            try {
                session.wait();
            } catch (final InterruptedException e) {
                throw new IOPushException("Interrupted while waiting for the session toward "
                        + session.remoteExportIdentifier + " to be available");
            }
        }

        final FixableCacheEntry ce = (FixableCacheEntry) this.cacheManagerItf
                .lookup(session.remoteExportIdentifier);
        try {
            this.cacheManagerItf.fix(ce);
        } catch (final CacheException e) {
            Error.bug(this.logger, e);
        }

        if (this.logger.isLoggable(BasicLevel.DEBUG)) {
            this.logger.log(BasicLevel.DEBUG, "Session reserved for "
                    + session.remoteExportIdentifier);
        }
    }

    /**
     * Close all sessions in cache
     */
    public synchronized void closeAll() {
        LoggerUtil.call(this.logger);
        try {
            this.unbindManagerItf.unbind(this.cacheEntryFilterClosingSocket, true);

        } catch (final CacheException e) {
            Error.bug(this.logger, e);
        }
    }

    /**
     * Bind the export identifier to the related session
     * 
     * @param to
     * @param newSession
     * @throws IOPushException
     */
    public void bind(ExportIdentifier to, Session newSession) {
        // open new session.
        try {
            this.waitForCacheSpace();
            this.cacheManagerItf.bind(to, newSession);
        } catch (final CacheException e) {
            Error.bug(this.logger, e);
        }
    }

    /**
     * Unbind the entry.
     * 
     * @param id
     *            the related export identifier
     * @return false if the entry was really unbound
     */
    public boolean unbind(Session session) {
        boolean result = true;

        try {
            result = this.unbindManagerItf.unbind(session.remoteExportIdentifier, true);

            synchronized (this.cacheAttributeControllerItf) {
                this.cacheAttributeControllerItf.notify();
            }

        } catch (final CacheException e) {
            Error.bug(this.logger, e);
        }

        return result;
    }

    /**
     * Mark the session as used
     * 
     * @param cacheEntry
     */
    public void fix(Session session) {
        final FixableCacheEntry cacheEntry = (FixableCacheEntry) this.cacheManagerItf
                .lookup(session.remoteExportIdentifier);

        cacheEntry.fixCe();
    }

    /**
     * Mark the session as unused
     * 
     * @param cacheEntry
     */
    public void unfix(Session session) {
        try {
            final FixableCacheEntry cacheEntry = (FixableCacheEntry) this.cacheManagerItf
                    .lookup(session.remoteExportIdentifier);

            cacheEntry.unfixCe();

            if (this.logger.isLoggable(BasicLevel.DEBUG) && this.isInUse(cacheEntry)) {
                Error.bug(this.logger);
            }

            // an entry in the cache is available, notify it
            synchronized (this.cacheAttributeControllerItf) {
                this.cacheAttributeControllerItf.notify();
            }
        } catch (final UnFixProtocolException e) {
            Error.bug(this.logger, e);
        }
    }

    /**
     * Lookup for a session identifier by the given export identifier
     * 
     * @param to
     * @return
     */
    public Session lookup(ExportIdentifier to) {
        Session session = null;
        final ReplaceableCacheEntry ce = (ReplaceableCacheEntry) this.cacheManagerItf.lookup(to);
        if (ce != null) {
            session = (Session) ce.getCeObject();
        }

        return session;
    }

    /**
     * Wait for cache space
     * 
     * @throws CacheException
     *             the cache is full since 1 minute
     */
    public void waitForCacheSpace() throws CacheException {
        synchronized (this.cacheAttributeControllerItf) {

            if (this.isCacheFull()) {
                this.logger.log(BasicLevel.DEBUG, "Cache of 'messagePassing' sessions is full");
                // try to remove unused sessions
                try {
                    this.unbindManagerItf.unbind(this.cacheEntryfilter, true);
                } catch (final CacheException e1) {
                    this.logger.log(BasicLevel.WARN,
                            "an interruption occured while trying to remove unused session");
                }

                try {
                    // wait for 10 second if a session is released
                    this.logger.log(BasicLevel.DEBUG, "Waiting 60 seconds for cache space");
                    this.cacheAttributeControllerItf.wait(60000);
                } catch (final InterruptedException e) {
                    this.logger.log(BasicLevel.WARN, "Interrupted while waiting for cache space");
                }
            }

            if (this.isCacheFull()) {
                // no session released
                throw new CacheException("Cache of messagePassing sessions (size="
                        + this.cacheAttributeControllerItf.getCacheMaxSize()
                        + ") is full since 1 minute");
            }
        }
    }

    /**
     * Filter used to close every session in cache
     */
    private class CacheEntryFilterClosingSocket implements CacheEntryFilter {
        /**
         * Close session contained in every cache entry
         * 
         * @return <code>true</code>
         */
        public boolean accept(CacheEntry ce) {
            IdentifiedSessionManager.this.logger.log(BasicLevel.DEBUG, "Closing session : "
                    + ce.getCeIdentifier());
            final Session session = (Session) ce.getCeObject();
            IdentifiedSessionManager.this.sessionManager.closeSessionSilently(session);

            return true;
        }
    }

    /**
     * Filter used to treat only unused sessions
     */
    private static final class CacheUnfixedEntryFilter implements CacheEntryFilter {

        public boolean accept(CacheEntry cacheEntry) {
            return ((FixableCacheEntry) cacheEntry).getCeFixCount() == 0;
        }
    }

    /**
     * Get the session contained in a cache entry
     * 
     * @param ce
     * @return
     */
    Session getSession(ExportIdentifier id) {
        return (Session) this.cacheManagerItf.lookup(id).getCeObject();
    }

    boolean isInUse(FixableCacheEntry ce) {
        return ce.getCeFixCount() > 0;
    }

    /**
     * @param session
     * @return
     */
    protected boolean isInUse(Session session) {
        return (((FixableCacheEntry) this.cacheManagerItf.lookup(session.remoteExportIdentifier))
                .getCeFixCount()) > 0;
    }

    /**
     * @param id
     * @return
     */
    protected boolean isInUse(ExportIdentifier id) {
        return (((FixableCacheEntry) this.cacheManagerItf.lookup(id)).getCeFixCount()) > 0;
    }

    /**
     * @return true if the cache is full
     */
    private boolean isCacheFull() {
        return this.cacheAttributeControllerItf.getCurrentSize() >= this.cacheAttributeControllerItf
                .getCacheMaxSize();
    }
}