OW2 Consortium contrail

Rev

Rev 2691 | Rev 2729 | Go to most recent revision | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
2691 aaasz 1
'''
2
Copyright (c) 2010-2012, Contrail consortium.
3
All rights reserved.
4
 
5
Redistribution and use in source and binary forms,
6
with or without modification, are permitted provided
7
that the following conditions are met:
8
 
9
 1. Redistributions of source code must retain the
10
    above copyright notice, this list of conditions
11
    and the following disclaimer.
12
 2. Redistributions in binary form must reproduce
13
    the above copyright notice, this list of
14
    conditions and the following disclaimer in the
15
    documentation and/or other materials provided
16
    with the distribution.
2693 aaasz 17
 3. Neither the name of the Contrail consortium nor the
2691 aaasz 18
    names of its contributors may be used to endorse
19
    or promote products derived from this software
20
    without specific prior written permission.
21
 
22
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
23
CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
24
INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
25
MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
26
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
27
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
29
BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
30
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
32
WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
34
OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
35
POSSIBILITY OF SUCH DAMAGE.
36
 
37
 
38
Created on Feb 8, 2011
39
 
40
@package conpaas.core
41
@author ielhelw, aaasz
42
@file
43
 
44
'''
45
 
46
from threading import Thread, Lock, Timer, Event
47
import inspect, tempfile, os, os.path, tarfile, time, stat, json, urlparse
48
from string import Template
49
 
50
from conpaas.core.log import create_logger
51
from conpaas.core.http import HttpJsonResponse, HttpErrorResponse,\
52
                         HttpFileDownloadResponse, HttpRequest,\
53
                         FileUploadField, HttpError, _http_post
54
 
55
from conpaas.core.node import ServiceNode
56
from conpaas.core import iaas
57
from conpaas.core.misc import get_ip_address
58
 
59
class Controller(object):
60
    """Implementation of the clouds controller. This class implements functions
61
       to easily work with the available cloud objects.
62
 
63
       So far, it provides the following functionalities/abstractions:
64
         - crediting system (also terminate service when user out of credits)
65
         - adding nodes (VMs)
66
         - removing nodes (VMs)
67
    """
68
 
69
    def __init__(self, config_parser, **kwargs):
70
        # Params for frontend callback
71
        self.__fe_creditUrl = config_parser.get('manager', \
72
                                                'FE_CREDIT_URL')
73
        self.__fe_terminateUrl = config_parser.get('manager', \
74
                                                   'FE_TERMINATE_URL')
75
        self.__fe_service_id = config_parser.get('manager', \
76
                                                 'FE_SERVICE_ID')
77
 
78
        # For crediting system
79
        self.__reservation_logger = create_logger('ReservationTimer')
80
        self.__reservation_map = {
81
                               'manager': ReservationTimer(['manager'],
82
                               55 * 60,# 55mins
83
                               self.__deduct_and_check_credit,
84
                               self.__reservation_logger)
85
        }
86
        self.__reservation_map['manager'].start()
87
        self.__force_terminate_lock = Lock()
88
 
89
        self.__config_parser = config_parser
90
        self.__created_nodes = []
91
        self.__partially_created_nodes = []
92
        self.__logger = create_logger(__name__)
93
 
94
        # TODO: for now, it receives only 1 cloud credentials - we will
95
        # modify this in the near future
96
        drivername = config_parser.get('iaas', 'DRIVER').lower()
97
        self.__default_cloud = iaas.get_cloud_instance('iaas', \
98
                                               drivername,   \
99
                                               config_parser)
100
 
101
    #===========================================================================#
102
    #                create_nodes(self, count, contextFile, test_agent)         #
103
    #===========================================================================#
104
    #TODO: be able to select cloud
105
    def create_nodes(self, count, test_agent, port, cloud = None):
106
        """Creates the VMs associated with the list of nodes. It also tests
107
           if the agents started correctly.
108
 
109
            @param count The number of nodes to be created
110
 
111
            @param test_agent A callback function to test if the agent
112
                          started correctly in the newly created VM
113
 
114
            @param port The port on which the agent will listen
115
 
116
            @param cloud (Optional) If specified, this function will start new
117
                         nodes inside cloud, otherwise it will start new nodes
118
                         inside the default cloud or wherever the controller
119
                         wants (for now only the default cloud is used)
120
 
121
            @return A list of nodes of type ServiceNode
122
        """
123
 
124
        ready = []
125
        iteration = 0
126
 
127
        if not self.__deduct_credit(count):
128
            raise Exception('Could not add nodes. Not enough credits.')
129
 
130
        while len(ready) < count:
131
            iteration += 1
132
            self.__logger.debug('[_create_nodes]: iteration %d: creating %d nodes' \
133
                          % (iteration, count - len(ready)))
134
            try:
135
                self.__force_terminate_lock.acquire()
136
                if iteration == 1: request_start = time.time()
137
                poll = self.__default_cloud.new_instances(count - len(ready))
138
                self.__partially_created_nodes += [ i['id'] for i in poll ]
139
            except Exception as e:
140
                self.__logger.exception('[_create_nodes]: Failed to request new nodes')
141
                self.__kill_nodesById([i['id'] for i in ready])
142
                self.__partially_created_nodes = []
143
                raise e
144
            finally:
145
                self.__force_terminate_lock.release()
146
 
147
            poll, failed = self.__wait_for_nodes(poll, test_agent, port)
148
            ready += poll
149
            poll = []
150
            if failed:
151
                self.__logger.debug('[_create_nodes]: %d nodes ' \
152
                                    'failed to startup properly: %s' \
153
                                    % (len(failed), str(failed)))
154
                self.__kill_nodesById([i['id'] for i in failed])
155
 
156
        additional_nodes = [ ServiceNode(i['id'], i['ip'], \
157
                                         self.__default_cloud.get_cloud_name()) \
158
                                         for i in ready ]
159
        self.__force_terminate_lock.acquire()
160
        #self.__created_nodes += additional_nodes
161
        self.__created_nodes += [ i['id'] for i in ready ]
162
        self.__partially_created_nodes = []
163
        self.__force_terminate_lock.release()
164
 
165
        # start reservation timer with slack of 3 mins + time already wasted
166
        # this should be enough time to terminate instances before
167
        # hitting the following hour
168
        timer = ReservationTimer([ i['id'] for i in ready ],
169
                               (55 * 60) - (time.time() - request_start),
170
                               self.__deduct_and_check_credit,
171
                               self.__reservation_logger)
172
        timer.start()
173
        # set mappings
174
        for i in ready:
175
            self.__reservation_map[i['id']] = timer
176
        return additional_nodes
177
 
178
    #===========================================================================#
179
    #                     delete_nodes(self, nodes)                             #
180
    #===========================================================================#
181
    def delete_nodes(self, nodes):
182
        """Kills the VMs associated with the list of nodes.
183
 
184
            @param nodes The list of nodes to be removed - a node must be of type
185
                         ServiceNode or a class that extends ServiceNode
186
        """
187
 
188
        self.__kill_nodesById([ i.vmid for i in nodes ])
189
 
190
    #===========================================================================#
191
    #                     list_vms(self, cloud=None)                            #
192
    #===========================================================================#
193
    def list_vms(self, cloud=None):
194
        """Returns an array with the VMs running at the given/default(s) cloud.
195
 
196
            @param cloud (Optional) If specified, this method will return the
197
                         VMs already running at the given cloud
198
        """
199
        if cloud != None:
200
            c = cloud
201
        else:
202
            c = self.__default_cloud
203
 
204
        #TODO: return ServiceNode(s)
205
        return c.list_vms()
206
 
207
    #===========================================================================#
208
    #                generate_context(self, service_name, replace, cloud)       #
209
    #===========================================================================#
210
    def generate_context(self, service_name, cloud = None, ip_whitelist = None):
211
        """Generates the contextualization file for the default/given cloud.
212
 
213
            @param cloud (Optional) If specified, the context will be generated
214
                         for it, otherwise it will be generated for the default
215
                         cloud
216
 
217
            @param service_name Used to know which config_files and scripts
218
                                to select
219
        """
220
 
221
        if cloud != None:
222
            c = cloud
223
        else:
224
            c = self.__default_cloud
225
        contxt = self._get_context_file(service_name, \
226
                                        c.get_cloud_type())
227
        c.set_context_template(contxt)
228
 
229
    #===========================================================================#
230
    #                update_context(self, replace, cloud)                       #
231
    #===========================================================================#
232
    def update_context(self, replace = {}, cloud = None):
233
        """Updates the contextualization file for the default/given cloud.
234
 
235
            @param replace A dictionary that specifies which words shoud be
236
                           replaced with what. For example:
237
                           replace = dict(name='A', age='57')
238
                           context1 =  '$name , $age'
239
                           => new_context1 = 'A , 57'
240
                           context2 ='${name}na, ${age}'
241
                           => new_context2 = 'Ana, 57'
242
 
243
            @param cloud (Optional) If specified, the context will be generated
244
                         for it, otherwise it will be generated for the default
245
                         cloud
246
 
247
        """
248
 
249
        if cloud != None:
250
            c = cloud
251
        else:
252
            c = self.__default_cloud
253
        contxt = c.get_context_template()
254
        contxt = Template(contxt).safe_substitute(replace)
255
        c.config(context=contxt)
256
 
257
    #===========================================================================#
258
    #                get_clouds(self)                                           #
259
    #===========================================================================#
260
    # TODO: For now we work on just one cloud
261
    def get_clouds(self):
262
        """
263
            @return The list of cloud objects
264
 
265
        """
266
        return [self.__default_cloud]
267
 
268
 
269
    #===========================================================================#
270
    #                config_cloud(self, cloud, config_params)                   #
271
    #===========================================================================#
272
    def config_cloud(self, cloud, config_params):
273
        """Configures some parameters in the given cloud
274
 
275
            @param cloud The cloud to be configured
276
 
277
            @param config_params A dictionary containing the configuration
278
                                 parameters (are specific to the cloud)
279
        """
280
        cloud.config(config_params)
281
 
282
    #===========================================================================#
283
 
284
    def __kill_nodesById(self, ids):
285
    #TODO: send also the cloud
286
      for id in ids:
287
        self.__logger.debug('[_kill_nodes]: killing ' + str(id))
288
        try:
289
          # node may not be in map if it failed to start
290
          if id in self.__reservation_map:
291
            timer = self.__reservation_map.pop(id)
292
            if timer.remove_node(id) < 1:
293
              timer.stop()
294
          self.__default_cloud.kill_instance(id)
295
        except: self.__logger.exception('[_kill_nodes]: ' \
296
                                      'Failed to kill node %s', id)
297
 
298
    def __wait_for_nodes(self, nodes, test_agent, port, poll_interval=10):
299
      self.__logger.debug('[__wait_for_nodes]: going to start polling')
300
      done = []
301
      poll_cycles = 0
302
      while len(nodes) > 0:
303
        poll_cycles += 1
304
        for i in nodes:
305
          up = True
306
          try:
307
            if i['ip'] != '':
308
              test_agent(i['ip'], port)
309
            else:
310
              up = False
311
          except: up = False
312
          if up:
313
            done.append(i)
314
        nodes = [ i for i in nodes if i not in done]
315
        if len(nodes):
316
          if poll_cycles * poll_interval > 180:
317
            # at least 2mins of sleeping + poll time
318
            return (done, nodes)
319
 
320
          self.__logger.debug('[_wait_for_nodes]: waiting for %d nodes' \
321
                            % len(nodes))
322
          time.sleep(poll_interval)
323
          no_ip_nodes = [ i for i in nodes if i['ip'] == '' ]
324
          if no_ip_nodes:
325
            self.__logger.debug('[_wait_for_nodes]: refreshing %d nodes' \
326
                              % len(no_ip_nodes))
327
            refreshed_list = self.__default_cloud.list_vms()
328
            for i in no_ip_nodes:
329
              i['ip'] = refreshed_list[i['id']]['ip']
330
      self.__logger.debug('[_wait_for_nodes]: All nodes are ready %s' \
331
                        % str(done))
332
      return (done, [])
333
 
334
    def _get_context_file(self, service_name, cloud):
335
      conpaas_home = self.__config_parser.get('manager', 'CONPAAS_HOME')
336
      cloud_scripts_dir = conpaas_home + '/scripts/cloud'
337
      agent_cfg_dir = conpaas_home + '/config/agent'
338
      agent_scripts_dir = conpaas_home + '/scripts/agent'
339
 
340
      bootstrap = self.__config_parser.get('manager', 'BOOTSTRAP')
341
 
342
      # Get contextualization script for the cloud in which the manager resides
343
      cloud_script_file = open(cloud_scripts_dir + '/' + cloud, 'r')
344
      cloud_script = cloud_script_file.read()
345
 
346
      # Get agent setup file
347
      agent_setup_file = open(agent_scripts_dir + '/agent-setup', 'r')
348
      agent_setup = Template(agent_setup_file.read()). \
349
                                     safe_substitute(SOURCE=bootstrap)
350
 
351
      # Get agent config file - add to the default one the one specific
352
      # to the service if it exists
353
      default_agent_cfg_file = open(agent_cfg_dir + '/default-agent.cfg')
354
      agent_cfg = Template(default_agent_cfg_file.read()). \
355
                           safe_substitute(AGENT_TYPE=service_name, \
356
                                           MANAGER_IP=get_ip_address('eth0'))
357
 
358
      if os.path.isfile(agent_cfg_dir + '/' + service_name + '-agent.cfg'):
359
          agent_cfg_file = open(agent_cfg_dir + '/'+ service_name + '-agent.cfg')
360
          agent_cfg += '\n' + agent_cfg_file.read()
361
 
362
      # Get agent start file - if none for this service, use the default one
363
      if os.path.isfile(agent_scripts_dir + '/' + service_name + '-agent-start'):
364
          agent_start_file = open(agent_scripts_dir + \
365
                                '/'+ service_name + '-agent-start')
366
      else:
367
          agent_start_file = open(agent_scripts_dir + '/default-agent-start')
368
      agent_start = agent_start_file.read()
369
 
370
      ## Concatenate the files
371
      context_file = cloud_script + '\n\n'
372
      context_file += agent_setup + '\n\n'
373
      context_file += 'cat <<EOF > $ROOT_DIR/config.cfg\n'
374
      context_file += agent_cfg + '\n' + 'EOF\n\n'
375
      context_file += agent_start + '\n'
376
 
377
      return context_file
378
 
379
    def __force_terminate_service(self):
380
      # DO NOT release lock after acquiring it
381
      # to prevent the creation of more nodes
382
      self.__force_terminate_lock.acquire()
383
      self.__logger.debug('OUT OF CREDIT, TERMINATING SERVICE')
384
 
385
      # kill all partially created nodes
386
      self.__kill_nodesById(self.__partially_created_nodes)
387
 
388
      # kill all created nodes
389
      self.__kill_nodesById(self.__created_nodes)
390
 
391
      # notify front-end, attempt 10 times until successful
392
      for _ in range(10):
393
        try:
394
          parsed_url = urlparse.urlparse(self.__fe_terminateUrl)
395
          _, body = _http_post(parsed_url.hostname,
396
                               parsed_url.port or 80,
397
                               parsed_url.path,
398
                               {'sid': self.__fe_service_id})
399
          obj = json.loads(body)
400
          if not obj['error']: break
401
        except:
402
          self.__logger.exception('Failed to terminate service')
403
 
404
    def __deduct_credit(self, value):
405
      try:
406
        parsed_url = urlparse.urlparse(self.__fe_creditUrl)
407
        _, body = _http_post(parsed_url.hostname, parsed_url.port or 80,
408
                             parsed_url.path, {'sid': self.__fe_service_id,
409
                                               'decrement': value})
410
        obj = json.loads(body)
411
        return not obj['error']
412
      except:
413
        self.__logger.exception('Failed to deduct credit')
414
        return False
415
 
416
    def __deduct_and_check_credit(self, value):
417
      if not self.__deduct_credit(value):
418
        self.__force_terminate_service()
419
 
420
 
421
class ReservationTimer(Thread):
422
    def __init__(self, nodes, delay, callback,
423
                       reservation_logger, interval=3600):
424
 
425
      Thread.__init__(self)
426
      self.nodes = nodes
427
      self.event = Event()
428
      self.delay = delay
429
      self.interval = interval
430
      self.callback = callback
431
      self.lock = Lock()
432
      self.reservation_logger = reservation_logger
433
      self.reservation_logger.debug('RTIMER Creating timer for %s' \
434
                                    % (str(self.nodes)))
435
 
436
    def remove_node(self, node_id):
437
      with self.lock:
438
        self.nodes.remove(node_id)
439
        self.reservation_logger.debug('RTIMER removed node %s, ' \
440
                                      'updated list %s' \
441
                                      % (node_id, str(self.nodes)))
442
        return len(self.nodes)
443
 
444
    def run(self):
445
      self.event.wait(self.delay)
446
      while not self.event.is_set():
447
        with self.lock:
448
          list_size = len(self.nodes)
449
          self.reservation_logger.debug('RTIMER charging user credit for ' \
450
                                        'hour of %d instances %s' \
451
                                        % (list_size, str(self.nodes)))
452
        Thread(target=self.callback, args=[list_size]).start()
453
        self.event.wait(self.interval)
454
 
455
    def stop(self):
456
      self.event.set()
457