OW2 Consortium elastic-grid

Rev

Rev 498 | Go to most recent revision | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
300 jeje 1
/**
182 jeje 2
 * Elastic Grid
205 jeje 3
 * Copyright (C) 2008-2009 Elastic Grid, LLC.
22 jeje 4
 *
182 jeje 5
 * This program is free software: you can redistribute it and/or modify
6
 * it under the terms of the GNU Affero General Public License as
55 jeje 7
 * published by the Free Software Foundation, either version 3 of the
182 jeje 8
 * License, or (at your option) any later version.
22 jeje 9
 *
182 jeje 10
 * This program is distributed in the hope that it will be useful,
55 jeje 11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
182 jeje 13
 * GNU Affero General Public License for more details.
55 jeje 14
 *
182 jeje 15
 * You should have received a copy of the GNU Affero General Public License
300 jeje 16
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
 */
18
 
150 jeje 19
package com.elasticgrid.platforms.ec2;
2 jeje 20
 
464 jeje 21
import com.elasticgrid.cluster.spi.AbstractCloudPlatformManager;
150 jeje 22
import com.elasticgrid.cluster.spi.CloudPlatformManager;
343 jeje 23
import com.elasticgrid.model.Cluster;
24
import com.elasticgrid.model.ClusterAlreadyRunningException;
25
import com.elasticgrid.model.ClusterException;
26
import com.elasticgrid.model.Discovery;
27
import com.elasticgrid.model.NodeProfileInfo;
469 jeje 28
import com.elasticgrid.model.ClusterNotFoundException;
118 jeje 29
import com.elasticgrid.model.ec2.EC2Cluster;
2 jeje 30
import com.elasticgrid.model.ec2.EC2Node;
337 jeje 31
import com.elasticgrid.model.ec2.EC2NodeType;
118 jeje 32
import com.elasticgrid.model.ec2.impl.EC2ClusterImpl;
150 jeje 33
import com.elasticgrid.platforms.ec2.discovery.EC2ClusterLocator;
22 jeje 34
import static java.lang.String.format;
35
import java.rmi.RemoteException;
343 jeje 36
import java.util.ArrayList;
37
import java.util.Collection;
38
import java.util.HashSet;
39
import java.util.Iterator;
40
import java.util.LinkedList;
41
import java.util.List;
42
import java.util.Set;
36 jeje 43
import java.util.concurrent.ExecutionException;
30 jeje 44
import java.util.concurrent.ExecutorService;
45
import java.util.concurrent.Executors;
46
import java.util.concurrent.Future;
36 jeje 47
import java.util.concurrent.TimeUnit;
30 jeje 48
import java.util.concurrent.TimeoutException;
49
import java.util.logging.Level;
22 jeje 50
import java.util.logging.Logger;
2 jeje 51
 
464 jeje 52
public class EC2CloudPlatformManager extends AbstractCloudPlatformManager<EC2Cluster> implements CloudPlatformManager<EC2Cluster> {
501 jeje 53
    private EC2NodeInstantiator nodeInstantiator;
150 jeje 54
    private EC2ClusterLocator clusterLocator;
354 jeje 55
 
56
    private String overridesBucket;
30 jeje 57
    private String awsAccessID, awsSecretKey;
58
    private boolean awsSecured = true;
56 jeje 59
    private String ami32, ami64;
498 jeje 60
    /** Timeout in seconds used when starting and stopping EC2 instances. */
61
    private int startStopTimeout = 10 * 60;
62
 
30 jeje 63
    private ExecutorService executor = Executors.newFixedThreadPool(5);
150 jeje 64
    private static final Logger logger = Logger.getLogger(EC2CloudPlatformManager.class.getName());
2 jeje 65
 
300 jeje 66
    public String getName() {
67
        return "Amazon EC2";
68
    }
69
 
343 jeje 70
    public void startCluster(String clusterName, List<NodeProfileInfo> clusterTopology) throws ClusterException, ExecutionException, TimeoutException, InterruptedException, RemoteException {
390 jeje 71
        // ensure the cluster name group exists
72
        String securityGroupNameForCluster = "elastic-grid-cluster-" + clusterName;
73
        if (!nodeInstantiator.getGroupsNames().contains(securityGroupNameForCluster)) {
74
            nodeInstantiator.createSecurityGroup(securityGroupNameForCluster);
75
        }
76
 
118 jeje 77
        // ensure the cluster is not already running
78
        Cluster cluster = cluster(clusterName);
79
        if (cluster != null && cluster.isRunning()) {
80
            throw new ClusterAlreadyRunningException(cluster);
2 jeje 81
        }
343 jeje 82
 
83
        List<Future<List<String>>> futures = new LinkedList<Future<List<String>>>();
84
        for (NodeProfileInfo nodeProfileInfo : clusterTopology) {
85
            for (int i = 0; i < nodeProfileInfo.getNumber(); i++) {
86
                String ami;
87
                switch ((EC2NodeType) nodeProfileInfo.getNodeType()) {
88
                    case SMALL:
89
                        ami = ami32;
90
                        break;
91
                    case MEDIUM_HIGH_CPU:
92
                        ami = ami32;
388 dennisreedy 93
                        break;
343 jeje 94
                    case LARGE:
95
                        ami = ami64;
96
                        break;
97
                    case EXTRA_LARGE:
98
                        ami = ami64;
99
                        break;
100
                    case EXTRA_LARGE_HIGH_CPU:
101
                        ami = ami64;
102
                        break;
103
                    default:
104
                        throw new IllegalArgumentException(format("Unexpected Amazon EC2 instance type '%s'",
105
                                nodeProfileInfo.getNodeType().getName()));
106
                }
362 dennisreedy 107
                logger.log(Level.INFO, "Starting cluster ["+clusterName+"], " +
108
                                       "type ["+nodeProfileInfo.getNodeType().getName()+"], " +
109
                                       "using AMI ["+ami+"]");
354 jeje 110
                String override = null;
111
                if (nodeProfileInfo.hasOverride())
382 dennisreedy 112
                    override = "s3://" + overridesBucket;
343 jeje 113
                futures.add(executor.submit(new StartInstanceTask(nodeInstantiator, clusterName,
354 jeje 114
                        nodeProfileInfo.getNodeProfile(), (EC2NodeType) nodeProfileInfo.getNodeType(),
115
                        override, ami, awsAccessID, awsSecretKey, awsSecured)));
343 jeje 116
            }
2 jeje 117
        }
343 jeje 118
 
30 jeje 119
        // wait for the threads to finish
120
        for (Future<List<String>> future : futures) {
498 jeje 121
            future.get(startStopTimeout, TimeUnit.SECONDS);
30 jeje 122
        }
2 jeje 123
    }
124
 
118 jeje 125
    public void stopCluster(String clusterName) throws ClusterException, RemoteException {
126
        logger.log(Level.INFO, "Stopping cluster ''{0}''", new Object[] { clusterName });
127
        // locate all nodes in the cluster
303 jeje 128
        Collection<EC2Node> nodes = clusterLocator.findNodes(clusterName);
22 jeje 129
        // stop each node one by one
2 jeje 130
        for (EC2Node node : nodes) {
36 jeje 131
            nodeInstantiator.shutdownInstance(node.getInstanceID());
2 jeje 132
        }
133
    }
134
 
303 jeje 135
    public Collection<EC2Cluster> findClusters() throws ClusterException, RemoteException {
136
        Collection<String> clustersNames = clusterLocator.findClusters();
138 jeje 137
        List<EC2Cluster> clusters = new ArrayList<EC2Cluster>(clustersNames.size());
118 jeje 138
        for (String cluster : clustersNames) {
139
            clusters.add(cluster(cluster));
28 jeje 140
        }
118 jeje 141
        return clusters;
28 jeje 142
    }
143
 
118 jeje 144
    public EC2Cluster cluster(String name) throws RemoteException, ClusterException {
145
        EC2Cluster cluster = new EC2ClusterImpl();
316 jeje 146
        Set<EC2Node> nodes = clusterLocator.findNodes(name);
22 jeje 147
        if (nodes == null)
150 jeje 148
            return (EC2Cluster) cluster.name(name);
22 jeje 149
        else
118 jeje 150
            return (EC2Cluster) cluster.name(name).addNodes(nodes);
2 jeje 151
    }
152
 
343 jeje 153
    public void resizeCluster(String clusterName, List<NodeProfileInfo> clusterTopology) throws ClusterException, ExecutionException, TimeoutException, InterruptedException, RemoteException {
337 jeje 154
        // inspect the current cluster in order to figure out its topology
118 jeje 155
        EC2Cluster cluster = cluster(clusterName);
135 jeje 156
 
469 jeje 157
        if (cluster.getNodes().isEmpty())
158
            throw new ClusterNotFoundException(clusterName);
159
 
135 jeje 160
        Set<EC2Node> monitors = cluster.getMonitorNodes();
161
        Set<EC2Node> agents = cluster.getAgentNodes();
337 jeje 162
        // figure out which MONITORs are actually MONITOR_AND_AGENT and update the MONITOR set
163
        Set<EC2Node> monitorsAndAgents = new HashSet<EC2Node>();
164
        Iterator<EC2Node> monitorsIterator = monitors.iterator();
165
        while (monitorsIterator.hasNext()) {
469 jeje 166
            EC2Node ec2Node =  monitorsIterator.next();
337 jeje 167
            if (ec2Node.getProfile().isAgent()) {
168
                monitorsAndAgents.add(ec2Node);
169
                monitorsIterator.remove();
170
            }
171
        }
172
 
343 jeje 173
        int numberOfMonitors = monitors.size();
174
        int numberOfMonitorsAndAgents = monitorsAndAgents.size();
175
        int numberOfAgents = agents.size();
135 jeje 176
 
469 jeje 177
        logger.log(Level.FINEST, "Cluster {0} made of {1} monitor(s), {2} agent(s) and {3} monitor(s) and agent(s)",
178
                new Object[] { clusterName, numberOfMonitors, numberOfAgents, numberOfMonitorsAndAgents });
179
 
343 jeje 180
        List<Future> futures = new LinkedList<Future>();
181
        for (NodeProfileInfo nodeProfileInfo : clusterTopology) {
469 jeje 182
            logger.log(Level.FINEST, "Cluster should be made of {0} node(s) of profile {1} and type {2}",
183
                    new Object[] { nodeProfileInfo.getNumber(), nodeProfileInfo.getNodeProfile(), nodeProfileInfo.getNodeType() });
184
            String ami;
185
            switch ((EC2NodeType) nodeProfileInfo.getNodeType()) {
186
                case SMALL:
187
                    ami = ami32;
188
                    break;
189
                case MEDIUM_HIGH_CPU:
190
                    ami = ami32;
191
                    break;
192
                case LARGE:
193
                    ami = ami64;
194
                    break;
195
                case EXTRA_LARGE:
196
                    ami = ami64;
197
                    break;
198
                case EXTRA_LARGE_HIGH_CPU:
199
                    ami = ami64;
200
                    break;
201
                default:
202
                    throw new IllegalArgumentException(format("Unexpected Amazon EC2 instance type '%s'",
203
                            nodeProfileInfo.getNodeType().getName()));
204
            }
205
            int number = 0;
206
            Iterator<EC2Node> nodesIterator = null;
207
            switch (nodeProfileInfo.getNodeProfile()) {
208
                case MONITOR:
209
                    number = nodeProfileInfo.getNumber() - numberOfMonitors;
210
                    nodesIterator = monitors.iterator();
211
                    break;
212
                case MONITOR_AND_AGENT:
213
                    number = nodeProfileInfo.getNumber() - numberOfMonitorsAndAgents;
214
                    nodesIterator = monitorsAndAgents.iterator();
215
                    break;
216
                case AGENT:
217
                    number = nodeProfileInfo.getNumber() - numberOfAgents;
218
                    nodesIterator = agents.iterator();
219
                    break;
220
            }
221
 
222
            if (number > 0) {
223
                logger.log(Level.INFO, "Scaling cluster ''{0}'' with {1} additional node(s) of profile {1}...",
224
                        new Object[] { clusterName, number, nodeProfileInfo.getNodeProfile() });
225
                String override = null;
226
                if (nodeProfileInfo.hasOverride())
227
                    override = "s3://" + overridesBucket;
228
                for (int i = 0; i < number; i++) {
229
                    futures.add(executor.submit(new StartInstanceTask(nodeInstantiator, clusterName,
230
                            nodeProfileInfo.getNodeProfile(), (EC2NodeType) nodeProfileInfo.getNodeType(),
231
                            override, ami, awsAccessID, awsSecretKey, awsSecured)));
343 jeje 232
                }
469 jeje 233
            } else {
234
                logger.log(Level.INFO, "Decreasing cluster ''{0}'' by {1} node(s) of profile {2}...",
235
                        new Object[] { clusterName, Math.abs(number), nodeProfileInfo.getNodeProfile() });
236
                int numberToStop = Math.abs(number);
237
                while (numberToStop > 0 && nodesIterator.hasNext()) {
238
                    EC2Node node = nodesIterator.next();
239
                    futures.add(executor.submit(new StopInstanceTask(nodeInstantiator, node.getInstanceID())));
343 jeje 240
                }
29 jeje 241
            }
135 jeje 242
        }
243
 
244
        // wait for the threads to finish
343 jeje 245
        for (Future future : futures) {
135 jeje 246
            future.get(5 * 60, TimeUnit.SECONDS);
247
        }
2 jeje 248
    }
249
 
501 jeje 250
    public void setNodeInstantiator(EC2NodeInstantiator nodeNodeInstantiator) throws RemoteException {
251
        this.nodeInstantiator = nodeNodeInstantiator;
456 jeje 252
        // ensure the Discovery.MONITOR group exists
501 jeje 253
        java.util.List<String> groupNames = nodeNodeInstantiator.getGroupsNames();
473 jeje 254
        if (!groupNames.contains(Discovery.MONITOR.getGroupName())) {
501 jeje 255
            nodeNodeInstantiator.createSecurityGroup(Discovery.MONITOR.getGroupName());
456 jeje 256
        }
257
        // ensure the Discovery.AGENT group exists
473 jeje 258
        if (!groupNames.contains(Discovery.AGENT.getGroupName())) {
501 jeje 259
            nodeNodeInstantiator.createSecurityGroup(Discovery.AGENT.getGroupName());
456 jeje 260
        }
2 jeje 261
    }
262
 
354 jeje 263
    public void setOverridesBucket(String overridesBucket) {
264
        this.overridesBucket = overridesBucket;
265
    }
266
 
30 jeje 267
    public void setAwsAccessID(String awsAccessID) {
268
        this.awsAccessID = awsAccessID;
269
    }
270
 
271
    public void setAwsSecretKey(String awsSecretKey) {
272
        this.awsSecretKey = awsSecretKey;
273
    }
274
 
275
    public void setAwsSecured(boolean awsSecured) {
276
        this.awsSecured = awsSecured;
277
    }
278
 
22 jeje 279
    public void setAmi32(String ami32) {
280
        this.ami32 = ami32;
2 jeje 281
    }
282
 
22 jeje 283
    public void setAmi64(String ami64) {
284
        this.ami64 = ami64;
2 jeje 285
    }
30 jeje 286
 
150 jeje 287
    public void setClusterLocator(EC2ClusterLocator clusterLocator) {
288
        this.clusterLocator = clusterLocator;
289
    }
290
 
498 jeje 291
    public void setStartStopTimeout(int startStopTimeout) {
292
        this.startStopTimeout = startStopTimeout;
293
    }
2 jeje 294
}