OW2 Consortium contrail

Rev

Rev 4671 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
4671 ema 1
"""
2691 aaasz 2
Copyright (c) 2010-2012, Contrail consortium.
3
All rights reserved.
4
 
4404 msoloi 5
Redistribution and use in source and binary forms,
2691 aaasz 6
with or without modification, are permitted provided
7
that the following conditions are met:
4671 ema 8
 
9
 1. Redistributions of source code must retain the
2691 aaasz 10
    above copyright notice, this list of conditions
11
    and the following disclaimer.
12
 2. Redistributions in binary form must reproduce
4404 msoloi 13
    the above copyright notice, this list of
2691 aaasz 14
    conditions and the following disclaimer in the
15
    documentation and/or other materials provided
16
    with the distribution.
2693 aaasz 17
 3. Neither the name of the Contrail consortium nor the
2691 aaasz 18
    names of its contributors may be used to endorse
4404 msoloi 19
    or promote products derived from this software
2691 aaasz 20
    without specific prior written permission.
21
 
22
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
23
CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
24
INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
25
MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
26
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
27
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
4404 msoloi 28
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
29
BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
30
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
2691 aaasz 31
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
32
WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
34
OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
35
POSSIBILITY OF SUCH DAMAGE.
4671 ema 36
"""
2691 aaasz 37
 
4404 msoloi 38
from threading import Thread, Lock, Event
4572 ema 39
 
4404 msoloi 40
import os.path
41
import time
42
import json
4418 ema 43
import socket
4404 msoloi 44
import urlparse
2691 aaasz 45
from string import Template
4572 ema 46
 
2691 aaasz 47
from conpaas.core.log import create_logger
48
from conpaas.core import iaas
3477 ema 49
from conpaas.core import https
2691 aaasz 50
 
51
class Controller(object):
52
    """Implementation of the clouds controller. This class implements functions
53
       to easily work with the available cloud objects.
54
 
55
       So far, it provides the following functionalities/abstractions:
56
         - crediting system (also terminate service when user out of credits)
4404 msoloi 57
         - adding nodes (VMs)
2691 aaasz 58
         - removing nodes (VMs)
59
    """
60
 
61
    def __init__(self, config_parser, **kwargs):
4526 ema 62
        # Params for director callback
4527 ema 63
        self.__conpaas_creditUrl = config_parser.get('manager',
4526 ema 64
                                                'CREDIT_URL')
4527 ema 65
        self.__conpaas_terminateUrl = config_parser.get('manager',
4526 ema 66
                                                   'TERMINATE_URL')
4527 ema 67
        self.__conpaas_service_id = config_parser.get('manager',
4526 ema 68
                                                 'SERVICE_ID')
4527 ema 69
        self.__conpaas_user_id = config_parser.get('manager',
4526 ema 70
                                              'USER_ID')
4527 ema 71
        self.__conpaas_app_id = config_parser.get('manager',
72
                                              'APP_ID')
73
        self.__conpaas_caUrl = config_parser.get('manager',
4526 ema 74
                                            'CA_URL')
2691 aaasz 75
 
4406 ema 76
        # Set the CA URL as IPOP's base namespace
4527 ema 77
        self.__ipop_base_namespace = self.__conpaas_caUrl
4406 ema 78
 
4484 ema 79
        if config_parser.has_option('manager', 'IPOP_BASE_IP'):
4572 ema 80
            # Application-level network
4484 ema 81
            self.__ipop_base_ip = config_parser.get('manager', 'IPOP_BASE_IP')
82
        else:
83
            self.__ipop_base_ip = None
84
 
85
        if config_parser.has_option('manager', 'IPOP_NETMASK'):
4572 ema 86
            # Application-level netmask
4484 ema 87
            self.__ipop_netmask = config_parser.get('manager', 'IPOP_NETMASK')
88
        else:
89
            self.__ipop_netmask = None
90
 
4572 ema 91
        if config_parser.has_option('manager', 'IPOP_SUBNET'):
4636 ema 92
            # Only import from netaddr if IPOP has to be started
93
            from netaddr import IPNetwork
94
 
4572 ema 95
            # Subnet assigned to this service by the director
96
            self.__ipop_subnet = IPNetwork(
97
                config_parser.get('manager', 'IPOP_SUBNET'))
98
        else:
99
            self.__ipop_subnet = None
100
 
2691 aaasz 101
        # For crediting system
102
        self.__reservation_logger = create_logger('ReservationTimer')
4404 msoloi 103
        self.__reservation_map = {'manager': ReservationTimer(['manager'],
104
                                  55 * 60,  # 55mins
105
                                  self.__deduct_and_check_credit,
106
                                  self.__reservation_logger)}
2691 aaasz 107
        self.__reservation_map['manager'].start()
108
        self.__force_terminate_lock = Lock()
4404 msoloi 109
 
4268 ema 110
        self.config_parser = config_parser
2691 aaasz 111
        self.__created_nodes = []
112
        self.__partially_created_nodes = []
113
        self.__logger = create_logger(__name__)
114
 
4606 ema 115
        self.__available_clouds = []
4404 msoloi 116
        self.__default_cloud = None
117
        if config_parser.has_option('iaas', 'DRIVER'):
118
            self.__default_cloud = iaas.get_cloud_instance(
119
                'iaas',
120
                config_parser.get('iaas', 'DRIVER').lower(),
121
                config_parser)
4606 ema 122
            self.__available_clouds.append(self.__default_cloud)
2691 aaasz 123
 
4404 msoloi 124
        if config_parser.has_option('iaas', 'CLOUDS'):
4606 ema 125
            self.__available_clouds.extend(iaas.get_clouds(config_parser))
4404 msoloi 126
            # if there is no default cloud defined in 'iaas'
127
            if self.__default_cloud is None:
4606 ema 128
                self.__default_cloud = self.__available_clouds.pop(0)
4404 msoloi 129
 
3843 ema 130
        # Setting VM role
131
        self.role = 'agent'
132
 
4572 ema 133
    def get_available_ipop_address(self):
134
        """Return an unassigned IP address in this manager's VPN subnet"""
135
        # Network iterator
136
        network = self.__ipop_subnet.iter_hosts()
137
 
138
        # Currently running hosts
139
        running_hosts = [ str(node.ip) for node in self.__created_nodes ]
140
 
141
        self.__logger.debug("get_available_ipop_address: running nodes: %s"
142
            % running_hosts)
143
 
144
        # The first address is used by IPOP internally
145
        network.next()
146
        # The second one is taken by manager
147
        network.next()
148
 
149
        for host in network:
150
            host = str(host)
151
 
152
            if host not in running_hosts:
153
                self.__logger.debug("get_available_ipop_address: returning %s"
154
                    % host)
155
                return host
156
 
4404 msoloi 157
    #=========================================================================#
158
    #               create_nodes(self, count, contextFile, test_agent)        #
159
    #=========================================================================#
4547 ema 160
    def create_nodes(self, count, test_agent, port, cloud=None, inst_type=None):
4404 msoloi 161
        """
162
        Creates the VMs associated with the list of nodes. It also tests
163
        if the agents started correctly.
2691 aaasz 164
 
4404 msoloi 165
        @param count The number of nodes to be created
2691 aaasz 166
 
4404 msoloi 167
        @param test_agent A callback function to test if the agent
168
                        started correctly in the newly created VM
2691 aaasz 169
 
4404 msoloi 170
        @param port The port on which the agent will listen
171
 
172
        @param cloud (Optional) If specified, this function will start new
173
                        nodes inside cloud, otherwise it will start new nodes
174
                        inside the default cloud or wherever the controller
175
                        wants (for now only the default cloud is used)
176
 
177
        @return A list of nodes of type node.ServiceNode
178
 
2691 aaasz 179
        """
180
        ready = []
4572 ema 181
        poll = []
2691 aaasz 182
        iteration = 0
183
 
4404 msoloi 184
        if cloud is None:
185
            cloud = self.__default_cloud
186
 
4268 ema 187
        if not self.deduct_credit(count):
2691 aaasz 188
            raise Exception('Could not add nodes. Not enough credits.')
4404 msoloi 189
 
2691 aaasz 190
        while len(ready) < count:
191
            iteration += 1
4547 ema 192
            msg = '[_create_nodes]: iteration %d: creating %d nodes' % (
193
                iteration, count - len(ready))
194
 
195
            if inst_type:
196
                msg += ' of type %s' % inst_type
197
 
198
            self.__logger.debug(msg)
199
 
2691 aaasz 200
            try:
201
                self.__force_terminate_lock.acquire()
3198 aaasz 202
                if iteration == 1:
203
                    request_start = time.time()
3843 ema 204
 
4404 msoloi 205
                service_type = self.config_parser.get('manager', 'TYPE')
206
 
3787 ema 207
                # eg: conpaas-agent-php-u34-s316
3843 ema 208
                name = "conpaas-%s-%s-u%s-s%s" % (self.role, service_type,
4527 ema 209
                       self.__conpaas_user_id, self.__conpaas_service_id)
3787 ema 210
 
4572 ema 211
                if self.__ipop_base_ip and self.__ipop_netmask:
212
                    # If IPOP has to be used we need to update VMs
213
                    # contextualization data for each new instance
214
                    for _ in range(count - len(ready)):
215
                        vpn_ip = self.get_available_ipop_address()
216
                        self.update_context({ 'IPOP_IP_ADDRESS': vpn_ip })
217
 
218
                        for newinst in cloud.new_instances(1, name, inst_type):
219
                            newinst.ip = vpn_ip
220
                            poll.append(newinst)
221
 
222
                        self.__logger.debug("cloud.new_instances: %s" % poll)
223
                else:
224
                    poll = cloud.new_instances(count - len(ready), name, inst_type)
225
 
4416 msoloi 226
                try:
227
                    self.__partially_created_nodes += poll
228
                except TypeError:
229
                    self.__partially_created_nodes.append(poll)
2691 aaasz 230
            except Exception as e:
4404 msoloi 231
                self.__logger.exception(
232
                    '[_create_nodes]: Failed to request new nodes')
4606 ema 233
                self.delete_nodes(ready)
2691 aaasz 234
                self.__partially_created_nodes = []
235
                raise e
236
            finally:
237
                self.__force_terminate_lock.release()
4416 msoloi 238
            poll, failed = self.__wait_for_nodes(
4606 ema 239
                self.__partially_created_nodes, test_agent, port)
2691 aaasz 240
            ready += poll
241
            poll = []
242
            if failed:
4404 msoloi 243
                self.__logger.debug('[_create_nodes]: %d nodes '
244
                                    'failed to startup properly: %s'
2691 aaasz 245
                                    % (len(failed), str(failed)))
4606 ema 246
                self.__partially_created_nodes = []
247
                self.delete_nodes(failed)
2691 aaasz 248
        self.__force_terminate_lock.acquire()
3198 aaasz 249
        self.__created_nodes += ready
2691 aaasz 250
        self.__partially_created_nodes = []
251
        self.__force_terminate_lock.release()
4404 msoloi 252
 
2691 aaasz 253
        # start reservation timer with slack of 3 mins + time already wasted
254
        # this should be enough time to terminate instances before
255
        # hitting the following hour
4404 msoloi 256
        timer = ReservationTimer([i.id for i in ready],
257
                                 (55 * 60) - (time.time() - request_start),
258
                                 self.__deduct_and_check_credit,
259
                                 self.__reservation_logger)
2691 aaasz 260
        timer.start()
261
        # set mappings
262
        for i in ready:
3198 aaasz 263
            self.__reservation_map[i.id] = timer
4404 msoloi 264
        return ready
2691 aaasz 265
 
4404 msoloi 266
    #=========================================================================#
267
    #                    delete_nodes(self, nodes)                            #
268
    #=========================================================================#
2691 aaasz 269
    def delete_nodes(self, nodes):
270
        """Kills the VMs associated with the list of nodes.
4404 msoloi 271
 
272
            @param nodes The list of nodes to be removed;
273
                            - a node must be of type ServiceNode
274
                              or a class that extends ServiceNode
2691 aaasz 275
        """
4606 ema 276
        for node in nodes:
277
            cloud = self.get_cloud_by_name(node.cloud_name)
278
            self.__logger.debug('[delete_nodes]: killing ' + str(node.id))
279
            try:
280
            # node may not be in map if it failed to start
281
                if node.id in self.__reservation_map:
282
                    timer = self.__reservation_map.pop(node.id)
283
                    if timer.remove_node(node.id) < 1:
284
                        timer.stop()
285
                cloud.kill_instance(node)
286
            except:
287
                self.__logger.exception('[delete_nodes]: '
288
                                        'Failed to kill node %s', node.id)
2691 aaasz 289
 
4404 msoloi 290
    #=========================================================================#
291
    #                    list_vms(self, cloud=None)                           #
292
    #=========================================================================#
2691 aaasz 293
    def list_vms(self, cloud=None):
294
        """Returns an array with the VMs running at the given/default(s) cloud.
4404 msoloi 295
 
2691 aaasz 296
            @param cloud (Optional) If specified, this method will return the
297
                         VMs already running at the given cloud
4404 msoloi 298
        """
299
        if cloud is None:
300
            cloud = self.__default_cloud
2691 aaasz 301
 
4404 msoloi 302
        return cloud.list_vms()
2691 aaasz 303
 
4404 msoloi 304
    #=========================================================================#
305
    #               generate_context(self, service_name, replace, cloud)      #
306
    #=========================================================================#
307
    def generate_context(self, service_name,
308
                         cloud=None, ip_whitelist=None):
2691 aaasz 309
        """Generates the contextualization file for the default/given cloud.
4404 msoloi 310
 
2691 aaasz 311
            @param cloud (Optional) If specified, the context will be generated
4606 ema 312
                         for it, otherwise it will be generated for all the
313
                         available clouds
2691 aaasz 314
 
315
            @param service_name Used to know which config_files and scripts
4404 msoloi 316
                                to select
2691 aaasz 317
        """
4606 ema 318
        def __set_cloud_ctx(cloud):
319
            contxt = self._get_context_file(service_name,
320
                                            cloud.get_cloud_type())
321
            cloud.set_context_template(contxt)
2691 aaasz 322
 
4404 msoloi 323
        if cloud is None:
4606 ema 324
            for cloud in self.__available_clouds:
325
                __set_cloud_ctx(cloud)
326
        else:
327
            __set_cloud_ctx(cloud)
4404 msoloi 328
 
329
    #=========================================================================#
330
    #               update_context(self, replace, cloud)                      #
331
    #=========================================================================#
332
    def update_context(self, replace={}, cloud=None):
2691 aaasz 333
        """Updates the contextualization file for the default/given cloud.
4404 msoloi 334
 
335
            @param replace A dictionary that specifies which words shoud be
2691 aaasz 336
                           replaced with what. For example:
337
                           replace = dict(name='A', age='57')
338
                           context1 =  '$name , $age'
339
                           => new_context1 = 'A , 57'
340
                           context2 ='${name}na, ${age}'
341
                           => new_context2 = 'Ana, 57'
342
 
343
            @param cloud (Optional) If specified, the context will be generated
344
                         for it, otherwise it will be generated for the default
345
                         cloud
346
 
347
        """
4404 msoloi 348
        if cloud is None:
349
            cloud = self.__default_cloud
350
 
351
        contxt = cloud.get_context_template()
2691 aaasz 352
        contxt = Template(contxt).safe_substitute(replace)
4404 msoloi 353
        cloud.config(context=contxt)
2691 aaasz 354
 
4404 msoloi 355
    #=========================================================================#
356
    #               get_clouds(self)                                          #
357
    #=========================================================================#
2691 aaasz 358
    def get_clouds(self):
359
        """
360
            @return The list of cloud objects
4404 msoloi 361
 
2691 aaasz 362
        """
4606 ema 363
        return self.__available_clouds
2691 aaasz 364
 
4404 msoloi 365
    #=========================================================================#
4606 ema 366
    #               get_cloud_by_name(self)                                   #
367
    #=========================================================================#
368
    def get_cloud_by_name(self, cloud_name):
369
        """
370
            @param cloud_name
371
 
372
            @return The cloud object which name is the same as @param name
373
 
374
        """
4775 ema 375
        return [ cloud for cloud in self.__available_clouds
376
            if cloud.get_cloud_name() == cloud_name ][0]
4606 ema 377
 
378
    #=========================================================================#
4404 msoloi 379
    #               config_cloud(self, cloud, config_params)                  #
380
    #=========================================================================#
2691 aaasz 381
    def config_cloud(self, cloud, config_params):
382
        """Configures some parameters in the given cloud
4404 msoloi 383
 
2691 aaasz 384
            @param cloud The cloud to be configured
4404 msoloi 385
 
386
            @param config_params A dictionary containing the configuration
2691 aaasz 387
                                 parameters (are specific to the cloud)
388
        """
389
        cloud.config(config_params)
390
 
4404 msoloi 391
    #=========================================================================#
4606 ema 392
    #               config_clouds(self, config_params)                        #
393
    #=========================================================================#
394
    def config_clouds(self, config_params):
395
        """Same as config_cloud but for all available clouds
2691 aaasz 396
 
4606 ema 397
            @param config_params A dictionary containing the configuration
398
                                 parameters (are specific to the cloud)
399
        """
400
        for cloud in self.__available_clouds:
401
            cloud.config(config_params)
4404 msoloi 402
 
4614 ema 403
    def __check_node(self, node, test_agent, port):
404
        """Return True if the given node has properly started an agent on the
405
        given port"""
406
        if node.ip == '' or node.private_ip == '':
407
            return False
2691 aaasz 408
 
4614 ema 409
        try:
410
            self.__logger.debug('[__check_node]: test_agent(%s, %s)' % (
411
                node.ip, port))
3198 aaasz 412
 
4614 ema 413
            test_agent(node.ip, port)
414
            return True
415
        except socket.error, err:
416
            self.__logger.debug('[__check_node]: %s' % err)
417
 
418
        return False
419
 
420
    def __wait_for_nodes(self, nodes, test_agent, port, poll_interval=10):
4404 msoloi 421
        self.__logger.debug('[__wait_for_nodes]: going to start polling')
4614 ema 422
 
4404 msoloi 423
        done = []
424
        poll_cycles = 0
4614 ema 425
 
4404 msoloi 426
        while len(nodes) > 0:
427
            poll_cycles += 1
2691 aaasz 428
 
4614 ema 429
            # Add to 'done' the nodes on which an agent has been started
430
            # properly.
431
            done.extend([ node for node in nodes
432
                if self.__check_node(node, test_agent, port) ])
433
 
434
            # Put in 'nodes' only those who have not started yet.
435
            nodes = [ node for node in nodes if node not in done ]
436
 
437
            if len(nodes) == 0:
438
                # All the nodes are ready.
439
                break
440
            elif poll_cycles * poll_interval > 300:
441
                # We have waited for more than 5 mins of sleeping + poll time.
442
                # Let's return whatever we have.
443
                return (done, nodes)
444
 
445
            self.__logger.debug('[__wait_for_nodes]: waiting %d secs for %d nodes'
446
                                % (poll_interval, len(nodes)))
4404 msoloi 447
            time.sleep(poll_interval)
4614 ema 448
 
449
            # Check if some nodes still do not have an IP address.
450
            no_ip_nodes = [ node for node in nodes
451
                           if node.ip == '' or node.private_ip == '' ]
4404 msoloi 452
            if no_ip_nodes:
4606 ema 453
                self.__logger.debug('[__wait_for_nodes]: refreshing %d nodes'
4404 msoloi 454
                                    % len(no_ip_nodes))
4614 ema 455
 
4404 msoloi 456
                for node in no_ip_nodes:
4606 ema 457
                    refreshed_list = self.list_vms(
458
                        self.get_cloud_by_name(node.cloud_name))
4614 ema 459
 
4416 msoloi 460
                    for refreshed_node in refreshed_list:
461
                        if refreshed_node.id == node.id:
462
                            node.ip = refreshed_node.ip
463
                            node.private_ip = refreshed_node.private_ip
4404 msoloi 464
 
4606 ema 465
        self.__logger.debug('[__wait_for_nodes]: All nodes are ready %s'
4404 msoloi 466
                            % str(done))
467
        return (done, [])
468
 
4606 ema 469
    def _get_context_file(self, service_name, cloud_type):
4404 msoloi 470
        '''
471
        the context file runs the scripts necessary on each node created
472
        it's installing all the necessary dependencies for the service
473
        on the cloud you are installing
2691 aaasz 474
 
4404 msoloi 475
        '''
476
        conpaas_home = self.config_parser.get('manager', 'CONPAAS_HOME')
477
        cloud_scripts_dir = conpaas_home + '/scripts/cloud'
478
        agent_cfg_dir = conpaas_home + '/config/agent'
479
        agent_scripts_dir = conpaas_home + '/scripts/agent'
2691 aaasz 480
 
4404 msoloi 481
        bootstrap = self.config_parser.get('manager', 'BOOTSTRAP')
482
        manager_ip = self.config_parser.get('manager', 'MY_IP')
2691 aaasz 483
 
4404 msoloi 484
        # Get contextualization script for the corresponding cloud
4606 ema 485
        cloud_script_file = open(cloud_scripts_dir + '/' + cloud_type, 'r')
4404 msoloi 486
        cloud_script = cloud_script_file.read()
2691 aaasz 487
 
4404 msoloi 488
        # Get agent setup file
489
        agent_setup_file = open(agent_scripts_dir + '/agent-setup', 'r')
4572 ema 490
        agent_setup = Template(agent_setup_file.read()).safe_substitute(
491
            SOURCE=bootstrap)
2691 aaasz 492
 
4404 msoloi 493
        # Get agent config file - add to the default one the one specific
494
        # to the service if it exists
495
        default_agent_cfg_file = open(agent_cfg_dir + '/default-agent.cfg')
4406 ema 496
        agent_cfg = Template(default_agent_cfg_file.read()).safe_substitute(
497
            AGENT_TYPE=service_name,
498
            MANAGER_IP=manager_ip,
4527 ema 499
            CONPAAS_USER_ID=self.__conpaas_user_id,
500
            CONPAAS_SERVICE_ID=self.__conpaas_service_id,
501
            CONPAAS_APP_ID=self.__conpaas_app_id,
4406 ema 502
            IPOP_BASE_NAMESPACE=self.__ipop_base_namespace)
2691 aaasz 503
 
4572 ema 504
        # Add IPOP_BASE_IP, IPOP_NETMASK and IPOP_IP_ADDRESS if necessary
505
        if self.__ipop_base_ip and self.__ipop_netmask:
4484 ema 506
            agent_cfg += '\nIPOP_BASE_IP = %s' % self.__ipop_base_ip
507
            agent_cfg += '\nIPOP_NETMASK = %s' % self.__ipop_netmask
4572 ema 508
            agent_cfg += '\nIPOP_IP_ADDRESS = $IPOP_IP_ADDRESS'
4484 ema 509
 
4404 msoloi 510
        if os.path.isfile(agent_cfg_dir + '/' + service_name + '-agent.cfg'):
511
            agent_cfg_file = open(agent_cfg_dir +
512
                                  '/' + service_name + '-agent.cfg')
513
            agent_cfg += '\n' + agent_cfg_file.read()
2691 aaasz 514
 
4404 msoloi 515
        # Get agent start file - if none for this service, use the default one
516
        if os.path.isfile(agent_scripts_dir +
517
                          '/' + service_name + '-agent-start'):
518
            agent_start_file = open(agent_scripts_dir +
519
                                    '/' + service_name + '-agent-start')
520
        else:
521
            agent_start_file = open(agent_scripts_dir + '/default-agent-start')
522
        agent_start = agent_start_file.read()
3477 ema 523
 
4404 msoloi 524
        # Get key and a certificate from CA
525
        agent_certs = self._get_certificate()
2691 aaasz 526
 
4404 msoloi 527
        # Concatenate the files
528
        context_file = (cloud_script + '\n\n'
529
                        + 'cat <<EOF > /tmp/cert.pem\n'
530
                        + agent_certs['cert'] + '\n' + 'EOF\n\n'
531
                        + 'cat <<EOF > /tmp/key.pem\n'
532
                        + agent_certs['key'] + '\n' + 'EOF\n\n'
533
                        + 'cat <<EOF > /tmp/ca_cert.pem\n'
534
                        + agent_certs['ca_cert'] + '\n' + 'EOF\n\n'
535
                        + agent_setup + '\n\n'
536
                        + 'cat <<EOF > $ROOT_DIR/config.cfg\n'
537
                        + agent_cfg + '\n' + 'EOF\n\n')
3891 ema 538
 
4404 msoloi 539
        # Get user-provided startup script's absolute path
540
        basedir = self.config_parser.get('manager', 'CONPAAS_HOME')
541
        startup_script = os.path.join(basedir, 'startup.sh')
3891 ema 542
 
4404 msoloi 543
        # Append user-provided startup script (if any)
544
        if os.path.isfile(startup_script):
545
            context_file += open(startup_script).read() + '\n'
3924 ema 546
 
4404 msoloi 547
        # Finally, the agent startup script
548
        context_file += agent_start + '\n'
2691 aaasz 549
 
4404 msoloi 550
        return context_file
551
 
3477 ema 552
    def _get_certificate(self):
553
        '''
554
        Requests a certificate from the CA
555
        '''
4527 ema 556
        parsed_url = urlparse.urlparse(self.__conpaas_caUrl)
3477 ema 557
 
558
        req_key = https.x509.gen_rsa_keypair()
4059 ema 559
 
560
        x509_req = https.x509.create_x509_req(
561
            req_key,
4527 ema 562
            userId=self.__conpaas_user_id,
563
            serviceLocator=self.__conpaas_service_id,
4059 ema 564
            O='ConPaaS',
4404 msoloi 565
            emailAddress='info@conpaas.eu',
566
            CN='ConPaaS',
4059 ema 567
            role='agent'
568
        )
569
 
3983 ema 570
        x509_req_as_pem = https.x509.x509_req_as_pem(x509_req)
4404 msoloi 571
        _, cert = https.client.https_post(parsed_url.hostname,
572
                                          parsed_url.port or 443,
573
                                          parsed_url.path,
574
                                          files=[('csr', 'csr.pem',
575
                                                  x509_req_as_pem)])
4268 ema 576
        cert_dir = self.config_parser.get('manager', 'CERT_DIR')
3477 ema 577
        ca_cert_file = open(os.path.join(cert_dir, 'ca_cert.pem'), 'r')
578
        ca_cert = ca_cert_file.read()
579
 
4404 msoloi 580
        certs = {'ca_cert': ca_cert,
581
                 'key': https.x509.key_as_pem(req_key),
582
                 'cert': cert}
583
 
3477 ema 584
        return certs
585
 
2691 aaasz 586
    def __force_terminate_service(self):
4404 msoloi 587
        # DO NOT release lock after acquiring it
588
        # to prevent the creation of more nodes
589
        self.__force_terminate_lock.acquire()
590
        self.__logger.debug('OUT OF CREDIT, TERMINATING SERVICE')
2691 aaasz 591
 
4404 msoloi 592
        # kill all partially created nodes
4606 ema 593
        self.delete_nodes(self.__partially_created_nodes)
2691 aaasz 594
 
4404 msoloi 595
        # kill all created nodes
4606 ema 596
        self.delete_nodes(self.__created_nodes)
4404 msoloi 597
 
598
        # notify front-end, attempt 10 times until successful
599
        for _ in range(10):
600
            try:
4527 ema 601
                parsed_url = urlparse.urlparse(self.__conpaas_terminateUrl)
4404 msoloi 602
                _, body = https.client.https_post(parsed_url.hostname,
603
                                                  parsed_url.port or 443,
604
                                                  parsed_url.path,
605
                                                  {'sid':
4527 ema 606
                                                      self.__conpaas_service_id})
4404 msoloi 607
                obj = json.loads(body)
608
                if not obj['error']:
609
                    break
610
            except:
611
                self.__logger.exception('Failed to terminate service')
612
 
613
    def deduct_credit(self, value):
2691 aaasz 614
        try:
4527 ema 615
            parsed_url = urlparse.urlparse(self.__conpaas_creditUrl)
4404 msoloi 616
            _, body = https.client.https_post(parsed_url.hostname,
617
                                              parsed_url.port or 443,
618
                                              parsed_url.path,
4527 ema 619
                                              {'sid': self.__conpaas_service_id,
4404 msoloi 620
                                               'decrement': value})
621
            obj = json.loads(body)
622
            return not obj['error']
2691 aaasz 623
        except:
4404 msoloi 624
            self.__logger.exception('Failed to deduct credit')
625
            return False
626
 
2691 aaasz 627
    def __deduct_and_check_credit(self, value):
4404 msoloi 628
        if not self.deduct_credit(value):
629
            self.__force_terminate_service()
2691 aaasz 630
 
631
 
632
class ReservationTimer(Thread):
4404 msoloi 633
    def __init__(self, nodes, delay, callback,
634
                 reservation_logger, interval=3600):
2691 aaasz 635
 
4404 msoloi 636
        Thread.__init__(self)
637
        self.nodes = nodes
638
        self.event = Event()
639
        self.delay = delay
640
        self.interval = interval
641
        self.callback = callback
642
        self.lock = Lock()
643
        self.reservation_logger = reservation_logger
644
        self.reservation_logger.debug('RTIMER Creating timer for %s'
645
                                      % (str(self.nodes)))
2691 aaasz 646
 
647
    def remove_node(self, node_id):
4404 msoloi 648
        with self.lock:
649
            self.nodes.remove(node_id)
650
            self.reservation_logger.debug('RTIMER removed node %s, '
651
                                          'updated list %s'
652
                                          % (node_id, str(self.nodes)))
2691 aaasz 653
        return len(self.nodes)
654
 
655
    def run(self):
4404 msoloi 656
        self.event.wait(self.delay)
657
        while not self.event.is_set():
658
            with self.lock:
659
                list_size = len(self.nodes)
660
                self.reservation_logger.debug('RTIMER charging user credit '
661
                                              'for hour of %d instances %s'
662
                                              % (list_size, str(self.nodes)))
663
            Thread(target=self.callback, args=[list_size]).start()
664
            self.event.wait(self.interval)
2691 aaasz 665
 
666
    def stop(self):
4404 msoloi 667
        self.event.set()