OW2 Consortium elastic-grid

Rev

Rev 498 | Go to most recent revision | Blame | Compare with Previous | Last modification | View Log | RSS feed

/**
 * Elastic Grid
 * Copyright (C) 2008-2009 Elastic Grid, LLC.
 * 
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as
 * published by the Free Software Foundation, either version 3 of the
 * License, or (at your option) any later version.
 * 
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 * 
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

package com.elasticgrid.platforms.ec2;

import com.elasticgrid.cluster.spi.AbstractCloudPlatformManager;
import com.elasticgrid.cluster.spi.CloudPlatformManager;
import com.elasticgrid.model.Cluster;
import com.elasticgrid.model.ClusterAlreadyRunningException;
import com.elasticgrid.model.ClusterException;
import com.elasticgrid.model.Discovery;
import com.elasticgrid.model.NodeProfileInfo;
import com.elasticgrid.model.ClusterNotFoundException;
import com.elasticgrid.model.ec2.EC2Cluster;
import com.elasticgrid.model.ec2.EC2Node;
import com.elasticgrid.model.ec2.EC2NodeType;
import com.elasticgrid.model.ec2.impl.EC2ClusterImpl;
import com.elasticgrid.platforms.ec2.discovery.EC2ClusterLocator;
import static java.lang.String.format;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;

public class EC2CloudPlatformManager extends AbstractCloudPlatformManager<EC2Cluster> implements CloudPlatformManager<EC2Cluster> {
    private EC2NodeInstantiator nodeInstantiator;
    private EC2ClusterLocator clusterLocator;

    private String overridesBucket;
    private String awsAccessID, awsSecretKey;
    private boolean awsSecured = true;
    private String ami32, ami64;
    /** Timeout in seconds used when starting and stopping EC2 instances. */
    private int startStopTimeout = 10 * 60;

    private ExecutorService executor = Executors.newFixedThreadPool(5);
    private static final Logger logger = Logger.getLogger(EC2CloudPlatformManager.class.getName());

    public String getName() {
        return "Amazon EC2";
    }

    public void startCluster(String clusterName, List<NodeProfileInfo> clusterTopology) throws ClusterException, ExecutionException, TimeoutException, InterruptedException, RemoteException {
        // ensure the cluster name group exists
        String securityGroupNameForCluster = "elastic-grid-cluster-" + clusterName;
        if (!nodeInstantiator.getGroupsNames().contains(securityGroupNameForCluster)) {
            nodeInstantiator.createSecurityGroup(securityGroupNameForCluster);
        }

        // ensure the cluster is not already running
        Cluster cluster = cluster(clusterName);
        if (cluster != null && cluster.isRunning()) {
            throw new ClusterAlreadyRunningException(cluster);
        }

        List<Future<List<String>>> futures = new LinkedList<Future<List<String>>>();
        for (NodeProfileInfo nodeProfileInfo : clusterTopology) {
            for (int i = 0; i < nodeProfileInfo.getNumber(); i++) {
                String ami;
                switch ((EC2NodeType) nodeProfileInfo.getNodeType()) {
                    case SMALL:
                        ami = ami32;
                        break;
                    case MEDIUM_HIGH_CPU:
                        ami = ami32;
                        break;                     
                    case LARGE:
                        ami = ami64;
                        break;
                    case EXTRA_LARGE:
                        ami = ami64;
                        break;
                    case EXTRA_LARGE_HIGH_CPU:
                        ami = ami64;
                        break;
                    default:
                        throw new IllegalArgumentException(format("Unexpected Amazon EC2 instance type '%s'",
                                nodeProfileInfo.getNodeType().getName()));
                }
                logger.log(Level.INFO, "Starting cluster ["+clusterName+"], " +
                                       "type ["+nodeProfileInfo.getNodeType().getName()+"], " +
                                       "using AMI ["+ami+"]");
                String override = null;
                if (nodeProfileInfo.hasOverride())
                    override = "s3://" + overridesBucket;
                futures.add(executor.submit(new StartInstanceTask(nodeInstantiator, clusterName,
                        nodeProfileInfo.getNodeProfile(), (EC2NodeType) nodeProfileInfo.getNodeType(),
                        override, ami, awsAccessID, awsSecretKey, awsSecured)));
            }
        }

        // wait for the threads to finish
        for (Future<List<String>> future : futures) {
            future.get(startStopTimeout, TimeUnit.SECONDS);
        }
    }

    public void stopCluster(String clusterName) throws ClusterException, RemoteException {
        logger.log(Level.INFO, "Stopping cluster ''{0}''", new Object[] { clusterName });
        // locate all nodes in the cluster
        Collection<EC2Node> nodes = clusterLocator.findNodes(clusterName);
        // stop each node one by one
        for (EC2Node node : nodes) {
            nodeInstantiator.shutdownInstance(node.getInstanceID());
        }
    }

    public Collection<EC2Cluster> findClusters() throws ClusterException, RemoteException {
        Collection<String> clustersNames = clusterLocator.findClusters();
        List<EC2Cluster> clusters = new ArrayList<EC2Cluster>(clustersNames.size());
        for (String cluster : clustersNames) {
            clusters.add(cluster(cluster));
        }
        return clusters;
    }

    public EC2Cluster cluster(String name) throws RemoteException, ClusterException {
        EC2Cluster cluster = new EC2ClusterImpl();
        Set<EC2Node> nodes = clusterLocator.findNodes(name);
        if (nodes == null)
            return (EC2Cluster) cluster.name(name);
        else
            return (EC2Cluster) cluster.name(name).addNodes(nodes);
    }

    public void resizeCluster(String clusterName, List<NodeProfileInfo> clusterTopology) throws ClusterException, ExecutionException, TimeoutException, InterruptedException, RemoteException {
        // inspect the current cluster in order to figure out its topology
        EC2Cluster cluster = cluster(clusterName);

        if (cluster.getNodes().isEmpty())
            throw new ClusterNotFoundException(clusterName);

        Set<EC2Node> monitors = cluster.getMonitorNodes();
        Set<EC2Node> agents = cluster.getAgentNodes();
        // figure out which MONITORs are actually MONITOR_AND_AGENT and update the MONITOR set
        Set<EC2Node> monitorsAndAgents = new HashSet<EC2Node>();
        Iterator<EC2Node> monitorsIterator = monitors.iterator();
        while (monitorsIterator.hasNext()) {
            EC2Node ec2Node =  monitorsIterator.next();
            if (ec2Node.getProfile().isAgent()) {
                monitorsAndAgents.add(ec2Node);
                monitorsIterator.remove();
            }
        }

        int numberOfMonitors = monitors.size();
        int numberOfMonitorsAndAgents = monitorsAndAgents.size();
        int numberOfAgents = agents.size();

        logger.log(Level.FINEST, "Cluster {0} made of {1} monitor(s), {2} agent(s) and {3} monitor(s) and agent(s)",
                new Object[] { clusterName, numberOfMonitors, numberOfAgents, numberOfMonitorsAndAgents });

        List<Future> futures = new LinkedList<Future>();
        for (NodeProfileInfo nodeProfileInfo : clusterTopology) {
            logger.log(Level.FINEST, "Cluster should be made of {0} node(s) of profile {1} and type {2}",
                    new Object[] { nodeProfileInfo.getNumber(), nodeProfileInfo.getNodeProfile(), nodeProfileInfo.getNodeType() });
            String ami;
            switch ((EC2NodeType) nodeProfileInfo.getNodeType()) {
                case SMALL:
                    ami = ami32;
                    break;
                case MEDIUM_HIGH_CPU:
                    ami = ami32;
                    break;
                case LARGE:
                    ami = ami64;
                    break;
                case EXTRA_LARGE:
                    ami = ami64;
                    break;
                case EXTRA_LARGE_HIGH_CPU:
                    ami = ami64;
                    break;
                default:
                    throw new IllegalArgumentException(format("Unexpected Amazon EC2 instance type '%s'",
                            nodeProfileInfo.getNodeType().getName()));
            }
            int number = 0;
            Iterator<EC2Node> nodesIterator = null;
            switch (nodeProfileInfo.getNodeProfile()) {
                case MONITOR:
                    number = nodeProfileInfo.getNumber() - numberOfMonitors;
                    nodesIterator = monitors.iterator();
                    break;
                case MONITOR_AND_AGENT:
                    number = nodeProfileInfo.getNumber() - numberOfMonitorsAndAgents;
                    nodesIterator = monitorsAndAgents.iterator();
                    break;
                case AGENT:
                    number = nodeProfileInfo.getNumber() - numberOfAgents;
                    nodesIterator = agents.iterator();
                    break;
            }

            if (number > 0) {
                logger.log(Level.INFO, "Scaling cluster ''{0}'' with {1} additional node(s) of profile {1}...",
                        new Object[] { clusterName, number, nodeProfileInfo.getNodeProfile() });
                String override = null;
                if (nodeProfileInfo.hasOverride())
                    override = "s3://" + overridesBucket;
                for (int i = 0; i < number; i++) {
                    futures.add(executor.submit(new StartInstanceTask(nodeInstantiator, clusterName,
                            nodeProfileInfo.getNodeProfile(), (EC2NodeType) nodeProfileInfo.getNodeType(),
                            override, ami, awsAccessID, awsSecretKey, awsSecured)));
                }
            } else {
                logger.log(Level.INFO, "Decreasing cluster ''{0}'' by {1} node(s) of profile {2}...",
                        new Object[] { clusterName, Math.abs(number), nodeProfileInfo.getNodeProfile() });
                int numberToStop = Math.abs(number);
                while (numberToStop > 0 && nodesIterator.hasNext()) {
                    EC2Node node = nodesIterator.next();
                    futures.add(executor.submit(new StopInstanceTask(nodeInstantiator, node.getInstanceID())));
                }
            }
        }

        // wait for the threads to finish
        for (Future future : futures) {
            future.get(5 * 60, TimeUnit.SECONDS);
        }
    }

    public void setNodeInstantiator(EC2NodeInstantiator nodeNodeInstantiator) throws RemoteException {
        this.nodeInstantiator = nodeNodeInstantiator;
        // ensure the Discovery.MONITOR group exists
        java.util.List<String> groupNames = nodeNodeInstantiator.getGroupsNames();
        if (!groupNames.contains(Discovery.MONITOR.getGroupName())) {
            nodeNodeInstantiator.createSecurityGroup(Discovery.MONITOR.getGroupName());
        }
        // ensure the Discovery.AGENT group exists
        if (!groupNames.contains(Discovery.AGENT.getGroupName())) {
            nodeNodeInstantiator.createSecurityGroup(Discovery.AGENT.getGroupName());
        }
    }

    public void setOverridesBucket(String overridesBucket) {
        this.overridesBucket = overridesBucket;
    }

    public void setAwsAccessID(String awsAccessID) {
        this.awsAccessID = awsAccessID;
    }

    public void setAwsSecretKey(String awsSecretKey) {
        this.awsSecretKey = awsSecretKey;
    }

    public void setAwsSecured(boolean awsSecured) {
        this.awsSecured = awsSecured;
    }

    public void setAmi32(String ami32) {
        this.ami32 = ami32;
    }

    public void setAmi64(String ami64) {
        this.ami64 = ami64;
    }

    public void setClusterLocator(EC2ClusterLocator clusterLocator) {
        this.clusterLocator = clusterLocator;
    }

    public void setStartStopTimeout(int startStopTimeout) {
        this.startStopTimeout = startStopTimeout;
    }
}