OW2 Consortium contrail

Rev

Rev 2693 | Rev 2792 | Go to most recent revision | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
2691 aaasz 1
'''
2
Copyright (c) 2010-2012, Contrail consortium.
3
All rights reserved.
4
 
5
Redistribution and use in source and binary forms,
6
with or without modification, are permitted provided
7
that the following conditions are met:
8
 
9
 1. Redistributions of source code must retain the
10
    above copyright notice, this list of conditions
11
    and the following disclaimer.
12
 2. Redistributions in binary form must reproduce
13
    the above copyright notice, this list of
14
    conditions and the following disclaimer in the
15
    documentation and/or other materials provided
16
    with the distribution.
2693 aaasz 17
 3. Neither the name of the Contrail consortium nor the
2691 aaasz 18
    names of its contributors may be used to endorse
19
    or promote products derived from this software
20
    without specific prior written permission.
21
 
22
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
23
CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
24
INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
25
MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
26
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
27
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
29
BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
30
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
32
WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
34
OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
35
POSSIBILITY OF SUCH DAMAGE.
36
 
37
 
38
Created on Feb 8, 2011
39
 
40
@package conpaas.core
41
@author ielhelw, aaasz
42
@file
43
 
44
'''
45
 
46
from threading import Thread, Lock, Timer, Event
47
import inspect, tempfile, os, os.path, tarfile, time, stat, json, urlparse
48
from string import Template
49
 
50
from conpaas.core.log import create_logger
51
from conpaas.core.http import HttpJsonResponse, HttpErrorResponse,\
52
                         HttpFileDownloadResponse, HttpRequest,\
53
                         FileUploadField, HttpError, _http_post
54
 
55
from conpaas.core.node import ServiceNode
56
from conpaas.core import iaas
57
from conpaas.core.misc import get_ip_address
58
 
59
class Controller(object):
60
    """Implementation of the clouds controller. This class implements functions
61
       to easily work with the available cloud objects.
62
 
63
       So far, it provides the following functionalities/abstractions:
64
         - crediting system (also terminate service when user out of credits)
65
         - adding nodes (VMs)
66
         - removing nodes (VMs)
67
    """
68
 
69
    def __init__(self, config_parser, **kwargs):
70
        # Params for frontend callback
71
        self.__fe_creditUrl = config_parser.get('manager', \
72
                                                'FE_CREDIT_URL')
73
        self.__fe_terminateUrl = config_parser.get('manager', \
74
                                                   'FE_TERMINATE_URL')
75
        self.__fe_service_id = config_parser.get('manager', \
76
                                                 'FE_SERVICE_ID')
77
 
78
        # For crediting system
79
        self.__reservation_logger = create_logger('ReservationTimer')
80
        self.__reservation_map = {
81
                               'manager': ReservationTimer(['manager'],
82
                               55 * 60,# 55mins
83
                               self.__deduct_and_check_credit,
84
                               self.__reservation_logger)
85
        }
86
        self.__reservation_map['manager'].start()
87
        self.__force_terminate_lock = Lock()
88
 
89
        self.__config_parser = config_parser
90
        self.__created_nodes = []
91
        self.__partially_created_nodes = []
92
        self.__logger = create_logger(__name__)
93
 
94
        # TODO: for now, it receives only 1 cloud credentials - we will
95
        # modify this in the near future
96
        drivername = config_parser.get('iaas', 'DRIVER').lower()
97
        self.__default_cloud = iaas.get_cloud_instance('iaas', \
98
                                               drivername,   \
99
                                               config_parser)
100
 
101
    #===========================================================================#
102
    #                create_nodes(self, count, contextFile, test_agent)         #
103
    #===========================================================================#
104
    #TODO: be able to select cloud
105
    def create_nodes(self, count, test_agent, port, cloud = None):
106
        """Creates the VMs associated with the list of nodes. It also tests
107
           if the agents started correctly.
108
 
109
            @param count The number of nodes to be created
110
 
111
            @param test_agent A callback function to test if the agent
112
                          started correctly in the newly created VM
113
 
114
            @param port The port on which the agent will listen
115
 
116
            @param cloud (Optional) If specified, this function will start new
117
                         nodes inside cloud, otherwise it will start new nodes
118
                         inside the default cloud or wherever the controller
119
                         wants (for now only the default cloud is used)
120
 
121
            @return A list of nodes of type ServiceNode
122
        """
123
 
124
        ready = []
125
        iteration = 0
126
 
127
        if not self.__deduct_credit(count):
128
            raise Exception('Could not add nodes. Not enough credits.')
129
 
130
        while len(ready) < count:
131
            iteration += 1
132
            self.__logger.debug('[_create_nodes]: iteration %d: creating %d nodes' \
133
                          % (iteration, count - len(ready)))
134
            try:
135
                self.__force_terminate_lock.acquire()
136
                if iteration == 1: request_start = time.time()
137
                poll = self.__default_cloud.new_instances(count - len(ready))
138
                self.__partially_created_nodes += [ i['id'] for i in poll ]
139
            except Exception as e:
140
                self.__logger.exception('[_create_nodes]: Failed to request new nodes')
141
                self.__kill_nodesById([i['id'] for i in ready])
142
                self.__partially_created_nodes = []
143
                raise e
144
            finally:
145
                self.__force_terminate_lock.release()
146
            poll, failed = self.__wait_for_nodes(poll, test_agent, port)
147
            ready += poll
148
            poll = []
149
            if failed:
150
                self.__logger.debug('[_create_nodes]: %d nodes ' \
151
                                    'failed to startup properly: %s' \
152
                                    % (len(failed), str(failed)))
153
                self.__kill_nodesById([i['id'] for i in failed])
154
 
155
        additional_nodes = [ ServiceNode(i['id'], i['ip'], \
156
                                         self.__default_cloud.get_cloud_name()) \
157
                                         for i in ready ]
158
        self.__force_terminate_lock.acquire()
159
        #self.__created_nodes += additional_nodes
160
        self.__created_nodes += [ i['id'] for i in ready ]
161
        self.__partially_created_nodes = []
162
        self.__force_terminate_lock.release()
163
 
164
        # start reservation timer with slack of 3 mins + time already wasted
165
        # this should be enough time to terminate instances before
166
        # hitting the following hour
167
        timer = ReservationTimer([ i['id'] for i in ready ],
168
                               (55 * 60) - (time.time() - request_start),
169
                               self.__deduct_and_check_credit,
170
                               self.__reservation_logger)
171
        timer.start()
172
        # set mappings
173
        for i in ready:
174
            self.__reservation_map[i['id']] = timer
175
        return additional_nodes
176
 
177
    #===========================================================================#
178
    #                     delete_nodes(self, nodes)                             #
179
    #===========================================================================#
180
    def delete_nodes(self, nodes):
181
        """Kills the VMs associated with the list of nodes.
182
 
183
            @param nodes The list of nodes to be removed - a node must be of type
184
                         ServiceNode or a class that extends ServiceNode
185
        """
186
 
187
        self.__kill_nodesById([ i.vmid for i in nodes ])
188
 
189
    #===========================================================================#
190
    #                     list_vms(self, cloud=None)                            #
191
    #===========================================================================#
192
    def list_vms(self, cloud=None):
193
        """Returns an array with the VMs running at the given/default(s) cloud.
194
 
195
            @param cloud (Optional) If specified, this method will return the
196
                         VMs already running at the given cloud
197
        """
198
        if cloud != None:
199
            c = cloud
200
        else:
201
            c = self.__default_cloud
202
 
203
        #TODO: return ServiceNode(s)
204
        return c.list_vms()
205
 
206
    #===========================================================================#
207
    #                generate_context(self, service_name, replace, cloud)       #
208
    #===========================================================================#
209
    def generate_context(self, service_name, cloud = None, ip_whitelist = None):
210
        """Generates the contextualization file for the default/given cloud.
211
 
212
            @param cloud (Optional) If specified, the context will be generated
213
                         for it, otherwise it will be generated for the default
214
                         cloud
215
 
216
            @param service_name Used to know which config_files and scripts
217
                                to select
218
        """
219
 
220
        if cloud != None:
221
            c = cloud
222
        else:
223
            c = self.__default_cloud
224
        contxt = self._get_context_file(service_name, \
225
                                        c.get_cloud_type())
226
        c.set_context_template(contxt)
227
 
228
    #===========================================================================#
229
    #                update_context(self, replace, cloud)                       #
230
    #===========================================================================#
231
    def update_context(self, replace = {}, cloud = None):
232
        """Updates the contextualization file for the default/given cloud.
233
 
234
            @param replace A dictionary that specifies which words shoud be
235
                           replaced with what. For example:
236
                           replace = dict(name='A', age='57')
237
                           context1 =  '$name , $age'
238
                           => new_context1 = 'A , 57'
239
                           context2 ='${name}na, ${age}'
240
                           => new_context2 = 'Ana, 57'
241
 
242
            @param cloud (Optional) If specified, the context will be generated
243
                         for it, otherwise it will be generated for the default
244
                         cloud
245
 
246
        """
247
 
248
        if cloud != None:
249
            c = cloud
250
        else:
251
            c = self.__default_cloud
252
        contxt = c.get_context_template()
253
        contxt = Template(contxt).safe_substitute(replace)
254
        c.config(context=contxt)
255
 
256
    #===========================================================================#
257
    #                get_clouds(self)                                           #
258
    #===========================================================================#
259
    # TODO: For now we work on just one cloud
260
    def get_clouds(self):
261
        """
262
            @return The list of cloud objects
263
 
264
        """
265
        return [self.__default_cloud]
266
 
267
 
268
    #===========================================================================#
269
    #                config_cloud(self, cloud, config_params)                   #
270
    #===========================================================================#
271
    def config_cloud(self, cloud, config_params):
272
        """Configures some parameters in the given cloud
273
 
274
            @param cloud The cloud to be configured
275
 
276
            @param config_params A dictionary containing the configuration
277
                                 parameters (are specific to the cloud)
278
        """
279
        cloud.config(config_params)
280
 
281
    #===========================================================================#
282
 
283
    def __kill_nodesById(self, ids):
284
    #TODO: send also the cloud
285
      for id in ids:
286
        self.__logger.debug('[_kill_nodes]: killing ' + str(id))
287
        try:
288
          # node may not be in map if it failed to start
289
          if id in self.__reservation_map:
290
            timer = self.__reservation_map.pop(id)
291
            if timer.remove_node(id) < 1:
292
              timer.stop()
293
          self.__default_cloud.kill_instance(id)
294
        except: self.__logger.exception('[_kill_nodes]: ' \
295
                                      'Failed to kill node %s', id)
296
 
297
    def __wait_for_nodes(self, nodes, test_agent, port, poll_interval=10):
298
      self.__logger.debug('[__wait_for_nodes]: going to start polling')
299
      done = []
300
      poll_cycles = 0
301
      while len(nodes) > 0:
302
        poll_cycles += 1
303
        for i in nodes:
304
          up = True
305
          try:
306
            if i['ip'] != '':
307
              test_agent(i['ip'], port)
308
            else:
309
              up = False
310
          except: up = False
311
          if up:
312
            done.append(i)
313
        nodes = [ i for i in nodes if i not in done]
314
        if len(nodes):
315
          if poll_cycles * poll_interval > 180:
316
            # at least 2mins of sleeping + poll time
317
            return (done, nodes)
318
 
319
          self.__logger.debug('[_wait_for_nodes]: waiting for %d nodes' \
320
                            % len(nodes))
321
          time.sleep(poll_interval)
322
          no_ip_nodes = [ i for i in nodes if i['ip'] == '' ]
323
          if no_ip_nodes:
324
            self.__logger.debug('[_wait_for_nodes]: refreshing %d nodes' \
325
                              % len(no_ip_nodes))
326
            refreshed_list = self.__default_cloud.list_vms()
327
            for i in no_ip_nodes:
328
              i['ip'] = refreshed_list[i['id']]['ip']
329
      self.__logger.debug('[_wait_for_nodes]: All nodes are ready %s' \
330
                        % str(done))
331
      return (done, [])
332
 
333
    def _get_context_file(self, service_name, cloud):
334
      conpaas_home = self.__config_parser.get('manager', 'CONPAAS_HOME')
335
      cloud_scripts_dir = conpaas_home + '/scripts/cloud'
336
      agent_cfg_dir = conpaas_home + '/config/agent'
337
      agent_scripts_dir = conpaas_home + '/scripts/agent'
338
 
339
      bootstrap = self.__config_parser.get('manager', 'BOOTSTRAP')
2729 aaasz 340
      manager_ip = self.__config_parser.get('manager', 'MY_IP')
2691 aaasz 341
 
2729 aaasz 342
      # Get contextualization script for the corresponding cloud
2691 aaasz 343
      cloud_script_file = open(cloud_scripts_dir + '/' + cloud, 'r')
344
      cloud_script = cloud_script_file.read()
345
 
346
      # Get agent setup file
347
      agent_setup_file = open(agent_scripts_dir + '/agent-setup', 'r')
348
      agent_setup = Template(agent_setup_file.read()). \
349
                                     safe_substitute(SOURCE=bootstrap)
350
 
351
      # Get agent config file - add to the default one the one specific
352
      # to the service if it exists
353
      default_agent_cfg_file = open(agent_cfg_dir + '/default-agent.cfg')
354
      agent_cfg = Template(default_agent_cfg_file.read()). \
355
                           safe_substitute(AGENT_TYPE=service_name, \
2729 aaasz 356
                                           MANAGER_IP=manager_ip)
2691 aaasz 357
 
358
      if os.path.isfile(agent_cfg_dir + '/' + service_name + '-agent.cfg'):
359
          agent_cfg_file = open(agent_cfg_dir + '/'+ service_name + '-agent.cfg')
360
          agent_cfg += '\n' + agent_cfg_file.read()
361
 
362
      # Get agent start file - if none for this service, use the default one
363
      if os.path.isfile(agent_scripts_dir + '/' + service_name + '-agent-start'):
364
          agent_start_file = open(agent_scripts_dir + \
365
                                '/'+ service_name + '-agent-start')
366
      else:
367
          agent_start_file = open(agent_scripts_dir + '/default-agent-start')
368
      agent_start = agent_start_file.read()
369
 
370
      ## Concatenate the files
371
      context_file = cloud_script + '\n\n'
372
      context_file += agent_setup + '\n\n'
373
      context_file += 'cat <<EOF > $ROOT_DIR/config.cfg\n'
374
      context_file += agent_cfg + '\n' + 'EOF\n\n'
375
      context_file += agent_start + '\n'
376
 
377
      return context_file
378
 
379
    def __force_terminate_service(self):
380
      # DO NOT release lock after acquiring it
381
      # to prevent the creation of more nodes
382
      self.__force_terminate_lock.acquire()
383
      self.__logger.debug('OUT OF CREDIT, TERMINATING SERVICE')
384
 
385
      # kill all partially created nodes
386
      self.__kill_nodesById(self.__partially_created_nodes)
387
 
388
      # kill all created nodes
389
      self.__kill_nodesById(self.__created_nodes)
390
 
391
      # notify front-end, attempt 10 times until successful
392
      for _ in range(10):
393
        try:
394
          parsed_url = urlparse.urlparse(self.__fe_terminateUrl)
395
          _, body = _http_post(parsed_url.hostname,
396
                               parsed_url.port or 80,
397
                               parsed_url.path,
398
                               {'sid': self.__fe_service_id})
399
          obj = json.loads(body)
400
          if not obj['error']: break
401
        except:
402
          self.__logger.exception('Failed to terminate service')
403
 
404
    def __deduct_credit(self, value):
405
      try:
406
        parsed_url = urlparse.urlparse(self.__fe_creditUrl)
407
        _, body = _http_post(parsed_url.hostname, parsed_url.port or 80,
408
                             parsed_url.path, {'sid': self.__fe_service_id,
409
                                               'decrement': value})
410
        obj = json.loads(body)
411
        return not obj['error']
412
      except:
413
        self.__logger.exception('Failed to deduct credit')
414
        return False
415
 
416
    def __deduct_and_check_credit(self, value):
417
      if not self.__deduct_credit(value):
418
        self.__force_terminate_service()
419
 
420
 
421
class ReservationTimer(Thread):
422
    def __init__(self, nodes, delay, callback,
423
                       reservation_logger, interval=3600):
424
 
425
      Thread.__init__(self)
426
      self.nodes = nodes
427
      self.event = Event()
428
      self.delay = delay
429
      self.interval = interval
430
      self.callback = callback
431
      self.lock = Lock()
432
      self.reservation_logger = reservation_logger
433
      self.reservation_logger.debug('RTIMER Creating timer for %s' \
434
                                    % (str(self.nodes)))
435
 
436
    def remove_node(self, node_id):
437
      with self.lock:
438
        self.nodes.remove(node_id)
439
        self.reservation_logger.debug('RTIMER removed node %s, ' \
440
                                      'updated list %s' \
441
                                      % (node_id, str(self.nodes)))
442
        return len(self.nodes)
443
 
444
    def run(self):
445
      self.event.wait(self.delay)
446
      while not self.event.is_set():
447
        with self.lock:
448
          list_size = len(self.nodes)
449
          self.reservation_logger.debug('RTIMER charging user credit for ' \
450
                                        'hour of %d instances %s' \
451
                                        % (list_size, str(self.nodes)))
452
        Thread(target=self.callback, args=[list_size]).start()
453
        self.event.wait(self.interval)
454
 
455
    def stop(self):
456
      self.event.set()
457