dream
Rev 2177 | Blame | Compare with Previous | Last modification | View Log | RSS feed
/**
* Dream
* Copyright (C) 2003-2004 INRIA Rhone-Alpes
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* Contact: dream@objectweb.org
*
* Initial developer(s):
* Contributor(s):
*/
package org.objectweb.dream.protocol.messagePassing.overChannel;
import org.objectweb.dream.IOPushException;
import org.objectweb.dream.cache.api.CacheEntry;
import org.objectweb.dream.cache.api.CacheEntryFilter;
import org.objectweb.dream.cache.api.CacheException;
import org.objectweb.dream.cache.api.CacheManager;
import org.objectweb.dream.cache.api.FixableCacheEntry;
import org.objectweb.dream.cache.api.UnFixProtocolException;
import org.objectweb.dream.cache.api.UnbindManager;
import org.objectweb.dream.cache.lib.BasicCacheManagerAttributeController;
import org.objectweb.dream.cache.replacement.api.ReplaceableCacheEntry;
import org.objectweb.dream.protocol.ExportIdentifier;
import org.objectweb.dream.util.Error;
import org.objectweb.dream.util.LoggerUtil;
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;
/**
* Identified Session (i.e. with a remote export identifier) Manager
*
* @author pgarcia
*/
public class IdentifiedSessionManager {
/**
* Logger of the class
*/
Logger logger;
/**
* Cache fractal interface to bind/fix cache entries
*/
private final CacheManager cacheManagerItf;
/**
* Cache fractal interface to unbind cache entries
*/
private final UnbindManager unbindManagerItf;
/**
* Cache entry filter for the closeAll() method
*/
private final CacheEntryFilter cacheEntryFilterClosingSocket = new CacheEntryFilterClosingSocket();
/**
* Cache entry filter to use only unfixed entries
*/
private final CacheEntryFilter cacheEntryfilter = new CacheUnfixedEntryFilter();
/**
* Control interface used to manage parameters of the cache like the
* maxSize, the currentSize...
*/
private final BasicCacheManagerAttributeController cacheAttributeControllerItf;
/**
* Session manager
*/
private final SessionManager sessionManager;
/**
* Constructor
*
* @param messagePassingOverChannelImpl
* : the related protocol
*/
public IdentifiedSessionManager(MessagePassingOverChannelImpl messagePassingOverChannelImpl,
SessionManager sessionManager) {
this.logger = messagePassingOverChannelImpl.logger;
this.cacheManagerItf = messagePassingOverChannelImpl.cacheManagerItf;
this.unbindManagerItf = messagePassingOverChannelImpl.unbindManagerItf;
this.cacheAttributeControllerItf = messagePassingOverChannelImpl.cacheAttributeControllerItf;
this.sessionManager = sessionManager;
}
/**
* Reserve the given Session.
*
* @param session
*
* @throws IOPushException
*/
public void reserveSession(Session session) throws IOPushException {
if (session.replacingSession != null) {
// wait that the replacing session is done
try {
session.wait();
} catch (final InterruptedException e) {
throw new IOPushException("Interrupted while waiting for the session toward "
+ session.remoteExportIdentifier + " to be available");
}
}
final FixableCacheEntry ce = (FixableCacheEntry) this.cacheManagerItf
.lookup(session.remoteExportIdentifier);
try {
this.cacheManagerItf.fix(ce);
} catch (final CacheException e) {
Error.bug(this.logger, e);
}
if (this.logger.isLoggable(BasicLevel.DEBUG)) {
this.logger.log(BasicLevel.DEBUG, "Session reserved for "
+ session.remoteExportIdentifier);
}
}
/**
* Close all sessions in cache
*/
public synchronized void closeAll() {
LoggerUtil.call(this.logger);
try {
this.unbindManagerItf.unbind(this.cacheEntryFilterClosingSocket, true);
} catch (final CacheException e) {
Error.bug(this.logger, e);
}
}
/**
* Bind the export identifier to the related session
*
* @param to
* @param newSession
* @throws IOPushException
*/
public void bind(ExportIdentifier to, Session newSession) {
// open new session.
try {
this.waitForCacheSpace();
this.cacheManagerItf.bind(to, newSession);
} catch (final CacheException e) {
Error.bug(this.logger, e);
}
}
/**
* Unbind the entry.
*
* @param id
* the related export identifier
* @return false if the entry was really unbound
*/
public boolean unbind(Session session) {
boolean result = true;
try {
result = this.unbindManagerItf.unbind(session.remoteExportIdentifier, true);
synchronized (this.cacheAttributeControllerItf) {
this.cacheAttributeControllerItf.notify();
}
} catch (final CacheException e) {
Error.bug(this.logger, e);
}
return result;
}
/**
* Mark the session as used
*
* @param cacheEntry
*/
public void fix(Session session) {
final FixableCacheEntry cacheEntry = (FixableCacheEntry) this.cacheManagerItf
.lookup(session.remoteExportIdentifier);
cacheEntry.fixCe();
}
/**
* Mark the session as unused
*
* @param cacheEntry
*/
public void unfix(Session session) {
try {
final FixableCacheEntry cacheEntry = (FixableCacheEntry) this.cacheManagerItf
.lookup(session.remoteExportIdentifier);
cacheEntry.unfixCe();
if (this.logger.isLoggable(BasicLevel.DEBUG) && this.isInUse(cacheEntry)) {
Error.bug(this.logger);
}
// an entry in the cache is available, notify it
synchronized (this.cacheAttributeControllerItf) {
this.cacheAttributeControllerItf.notify();
}
} catch (final UnFixProtocolException e) {
Error.bug(this.logger, e);
}
}
/**
* Lookup for a session identifier by the given export identifier
*
* @param to
* @return
*/
public Session lookup(ExportIdentifier to) {
Session session = null;
final ReplaceableCacheEntry ce = (ReplaceableCacheEntry) this.cacheManagerItf.lookup(to);
if (ce != null) {
session = (Session) ce.getCeObject();
}
return session;
}
/**
* Wait for cache space
*
* @throws CacheException
* the cache is full since 1 minute
*/
public void waitForCacheSpace() throws CacheException {
synchronized (this.cacheAttributeControllerItf) {
if (this.isCacheFull()) {
this.logger.log(BasicLevel.DEBUG, "Cache of 'messagePassing' sessions is full");
// try to remove unused sessions
try {
this.unbindManagerItf.unbind(this.cacheEntryfilter, true);
} catch (final CacheException e1) {
this.logger.log(BasicLevel.WARN,
"an interruption occured while trying to remove unused session");
}
try {
// wait for 10 second if a session is released
this.logger.log(BasicLevel.DEBUG, "Waiting 60 seconds for cache space");
this.cacheAttributeControllerItf.wait(60000);
} catch (final InterruptedException e) {
this.logger.log(BasicLevel.WARN, "Interrupted while waiting for cache space");
}
}
if (this.isCacheFull()) {
// no session released
throw new CacheException("Cache of messagePassing sessions (size="
+ this.cacheAttributeControllerItf.getCacheMaxSize()
+ ") is full since 1 minute");
}
}
}
/**
* Filter used to close every session in cache
*/
private class CacheEntryFilterClosingSocket implements CacheEntryFilter {
/**
* Close session contained in every cache entry
*
* @return <code>true</code>
*/
public boolean accept(CacheEntry ce) {
IdentifiedSessionManager.this.logger.log(BasicLevel.DEBUG, "Closing session : "
+ ce.getCeIdentifier());
final Session session = (Session) ce.getCeObject();
IdentifiedSessionManager.this.sessionManager.closeSessionSilently(session);
return true;
}
}
/**
* Filter used to treat only unused sessions
*/
private static final class CacheUnfixedEntryFilter implements CacheEntryFilter {
public boolean accept(CacheEntry cacheEntry) {
return ((FixableCacheEntry) cacheEntry).getCeFixCount() == 0;
}
}
/**
* Get the session contained in a cache entry
*
* @param ce
* @return
*/
Session getSession(ExportIdentifier id) {
return (Session) this.cacheManagerItf.lookup(id).getCeObject();
}
boolean isInUse(FixableCacheEntry ce) {
return ce.getCeFixCount() > 0;
}
/**
* @param session
* @return
*/
protected boolean isInUse(Session session) {
return (((FixableCacheEntry) this.cacheManagerItf.lookup(session.remoteExportIdentifier))
.getCeFixCount()) > 0;
}
/**
* @param id
* @return
*/
protected boolean isInUse(ExportIdentifier id) {
return (((FixableCacheEntry) this.cacheManagerItf.lookup(id)).getCeFixCount()) > 0;
}
/**
* @return true if the cache is full
*/
private boolean isCacheFull() {
return this.cacheAttributeControllerItf.getCurrentSize() >= this.cacheAttributeControllerItf
.getCacheMaxSize();
}
}