| 2691 |
aaasz |
1 |
'''
|
|
|
2 |
Copyright (c) 2010-2012, Contrail consortium.
|
|
|
3 |
All rights reserved.
|
|
|
4 |
|
|
|
5 |
Redistribution and use in source and binary forms,
|
|
|
6 |
with or without modification, are permitted provided
|
|
|
7 |
that the following conditions are met:
|
|
|
8 |
|
|
|
9 |
1. Redistributions of source code must retain the
|
|
|
10 |
above copyright notice, this list of conditions
|
|
|
11 |
and the following disclaimer.
|
|
|
12 |
2. Redistributions in binary form must reproduce
|
|
|
13 |
the above copyright notice, this list of
|
|
|
14 |
conditions and the following disclaimer in the
|
|
|
15 |
documentation and/or other materials provided
|
|
|
16 |
with the distribution.
|
| 2693 |
aaasz |
17 |
3. Neither the name of the Contrail consortium nor the
|
| 2691 |
aaasz |
18 |
names of its contributors may be used to endorse
|
|
|
19 |
or promote products derived from this software
|
|
|
20 |
without specific prior written permission.
|
|
|
21 |
|
|
|
22 |
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
|
|
|
23 |
CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
|
|
|
24 |
INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
|
|
|
25 |
MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
|
|
26 |
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
|
|
|
27 |
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
|
|
28 |
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
|
|
|
29 |
BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
|
|
30 |
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
|
31 |
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
|
|
|
32 |
WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
|
|
33 |
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
|
|
|
34 |
OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
|
35 |
POSSIBILITY OF SUCH DAMAGE.
|
|
|
36 |
|
|
|
37 |
|
|
|
38 |
Created on Feb 8, 2011
|
|
|
39 |
|
|
|
40 |
@package conpaas.core
|
|
|
41 |
@author ielhelw, aaasz
|
|
|
42 |
@file
|
|
|
43 |
|
|
|
44 |
'''
|
|
|
45 |
|
|
|
46 |
from threading import Thread, Lock, Timer, Event
|
|
|
47 |
import inspect, tempfile, os, os.path, tarfile, time, stat, json, urlparse
|
|
|
48 |
from string import Template
|
|
|
49 |
|
|
|
50 |
from conpaas.core.log import create_logger
|
|
|
51 |
from conpaas.core.http import HttpJsonResponse, HttpErrorResponse,\
|
|
|
52 |
HttpFileDownloadResponse, HttpRequest,\
|
|
|
53 |
FileUploadField, HttpError, _http_post
|
|
|
54 |
|
|
|
55 |
from conpaas.core.node import ServiceNode
|
|
|
56 |
from conpaas.core import iaas
|
|
|
57 |
from conpaas.core.misc import get_ip_address
|
|
|
58 |
|
|
|
59 |
class Controller(object):
|
|
|
60 |
"""Implementation of the clouds controller. This class implements functions
|
|
|
61 |
to easily work with the available cloud objects.
|
|
|
62 |
|
|
|
63 |
So far, it provides the following functionalities/abstractions:
|
|
|
64 |
- crediting system (also terminate service when user out of credits)
|
|
|
65 |
- adding nodes (VMs)
|
|
|
66 |
- removing nodes (VMs)
|
|
|
67 |
"""
|
|
|
68 |
|
|
|
69 |
def __init__(self, config_parser, **kwargs):
|
|
|
70 |
# Params for frontend callback
|
|
|
71 |
self.__fe_creditUrl = config_parser.get('manager', \
|
|
|
72 |
'FE_CREDIT_URL')
|
|
|
73 |
self.__fe_terminateUrl = config_parser.get('manager', \
|
|
|
74 |
'FE_TERMINATE_URL')
|
|
|
75 |
self.__fe_service_id = config_parser.get('manager', \
|
|
|
76 |
'FE_SERVICE_ID')
|
|
|
77 |
|
|
|
78 |
# For crediting system
|
|
|
79 |
self.__reservation_logger = create_logger('ReservationTimer')
|
|
|
80 |
self.__reservation_map = {
|
|
|
81 |
'manager': ReservationTimer(['manager'],
|
|
|
82 |
55 * 60,# 55mins
|
|
|
83 |
self.__deduct_and_check_credit,
|
|
|
84 |
self.__reservation_logger)
|
|
|
85 |
}
|
|
|
86 |
self.__reservation_map['manager'].start()
|
|
|
87 |
self.__force_terminate_lock = Lock()
|
|
|
88 |
|
|
|
89 |
self.__config_parser = config_parser
|
|
|
90 |
self.__created_nodes = []
|
|
|
91 |
self.__partially_created_nodes = []
|
|
|
92 |
self.__logger = create_logger(__name__)
|
|
|
93 |
|
|
|
94 |
# TODO: for now, it receives only 1 cloud credentials - we will
|
|
|
95 |
# modify this in the near future
|
|
|
96 |
drivername = config_parser.get('iaas', 'DRIVER').lower()
|
|
|
97 |
self.__default_cloud = iaas.get_cloud_instance('iaas', \
|
|
|
98 |
drivername, \
|
|
|
99 |
config_parser)
|
|
|
100 |
|
|
|
101 |
#===========================================================================#
|
|
|
102 |
# create_nodes(self, count, contextFile, test_agent) #
|
|
|
103 |
#===========================================================================#
|
|
|
104 |
#TODO: be able to select cloud
|
|
|
105 |
def create_nodes(self, count, test_agent, port, cloud = None):
|
|
|
106 |
"""Creates the VMs associated with the list of nodes. It also tests
|
|
|
107 |
if the agents started correctly.
|
|
|
108 |
|
|
|
109 |
@param count The number of nodes to be created
|
|
|
110 |
|
|
|
111 |
@param test_agent A callback function to test if the agent
|
|
|
112 |
started correctly in the newly created VM
|
|
|
113 |
|
|
|
114 |
@param port The port on which the agent will listen
|
|
|
115 |
|
|
|
116 |
@param cloud (Optional) If specified, this function will start new
|
|
|
117 |
nodes inside cloud, otherwise it will start new nodes
|
|
|
118 |
inside the default cloud or wherever the controller
|
|
|
119 |
wants (for now only the default cloud is used)
|
|
|
120 |
|
|
|
121 |
@return A list of nodes of type ServiceNode
|
|
|
122 |
"""
|
|
|
123 |
|
|
|
124 |
ready = []
|
|
|
125 |
iteration = 0
|
|
|
126 |
|
|
|
127 |
if not self.__deduct_credit(count):
|
|
|
128 |
raise Exception('Could not add nodes. Not enough credits.')
|
|
|
129 |
|
|
|
130 |
while len(ready) < count:
|
|
|
131 |
iteration += 1
|
|
|
132 |
self.__logger.debug('[_create_nodes]: iteration %d: creating %d nodes' \
|
|
|
133 |
% (iteration, count - len(ready)))
|
|
|
134 |
try:
|
|
|
135 |
self.__force_terminate_lock.acquire()
|
|
|
136 |
if iteration == 1: request_start = time.time()
|
|
|
137 |
poll = self.__default_cloud.new_instances(count - len(ready))
|
|
|
138 |
self.__partially_created_nodes += [ i['id'] for i in poll ]
|
|
|
139 |
except Exception as e:
|
|
|
140 |
self.__logger.exception('[_create_nodes]: Failed to request new nodes')
|
|
|
141 |
self.__kill_nodesById([i['id'] for i in ready])
|
|
|
142 |
self.__partially_created_nodes = []
|
|
|
143 |
raise e
|
|
|
144 |
finally:
|
|
|
145 |
self.__force_terminate_lock.release()
|
|
|
146 |
|
|
|
147 |
poll, failed = self.__wait_for_nodes(poll, test_agent, port)
|
|
|
148 |
ready += poll
|
|
|
149 |
poll = []
|
|
|
150 |
if failed:
|
|
|
151 |
self.__logger.debug('[_create_nodes]: %d nodes ' \
|
|
|
152 |
'failed to startup properly: %s' \
|
|
|
153 |
% (len(failed), str(failed)))
|
|
|
154 |
self.__kill_nodesById([i['id'] for i in failed])
|
|
|
155 |
|
|
|
156 |
additional_nodes = [ ServiceNode(i['id'], i['ip'], \
|
|
|
157 |
self.__default_cloud.get_cloud_name()) \
|
|
|
158 |
for i in ready ]
|
|
|
159 |
self.__force_terminate_lock.acquire()
|
|
|
160 |
#self.__created_nodes += additional_nodes
|
|
|
161 |
self.__created_nodes += [ i['id'] for i in ready ]
|
|
|
162 |
self.__partially_created_nodes = []
|
|
|
163 |
self.__force_terminate_lock.release()
|
|
|
164 |
|
|
|
165 |
# start reservation timer with slack of 3 mins + time already wasted
|
|
|
166 |
# this should be enough time to terminate instances before
|
|
|
167 |
# hitting the following hour
|
|
|
168 |
timer = ReservationTimer([ i['id'] for i in ready ],
|
|
|
169 |
(55 * 60) - (time.time() - request_start),
|
|
|
170 |
self.__deduct_and_check_credit,
|
|
|
171 |
self.__reservation_logger)
|
|
|
172 |
timer.start()
|
|
|
173 |
# set mappings
|
|
|
174 |
for i in ready:
|
|
|
175 |
self.__reservation_map[i['id']] = timer
|
|
|
176 |
return additional_nodes
|
|
|
177 |
|
|
|
178 |
#===========================================================================#
|
|
|
179 |
# delete_nodes(self, nodes) #
|
|
|
180 |
#===========================================================================#
|
|
|
181 |
def delete_nodes(self, nodes):
|
|
|
182 |
"""Kills the VMs associated with the list of nodes.
|
|
|
183 |
|
|
|
184 |
@param nodes The list of nodes to be removed - a node must be of type
|
|
|
185 |
ServiceNode or a class that extends ServiceNode
|
|
|
186 |
"""
|
|
|
187 |
|
|
|
188 |
self.__kill_nodesById([ i.vmid for i in nodes ])
|
|
|
189 |
|
|
|
190 |
#===========================================================================#
|
|
|
191 |
# list_vms(self, cloud=None) #
|
|
|
192 |
#===========================================================================#
|
|
|
193 |
def list_vms(self, cloud=None):
|
|
|
194 |
"""Returns an array with the VMs running at the given/default(s) cloud.
|
|
|
195 |
|
|
|
196 |
@param cloud (Optional) If specified, this method will return the
|
|
|
197 |
VMs already running at the given cloud
|
|
|
198 |
"""
|
|
|
199 |
if cloud != None:
|
|
|
200 |
c = cloud
|
|
|
201 |
else:
|
|
|
202 |
c = self.__default_cloud
|
|
|
203 |
|
|
|
204 |
#TODO: return ServiceNode(s)
|
|
|
205 |
return c.list_vms()
|
|
|
206 |
|
|
|
207 |
#===========================================================================#
|
|
|
208 |
# generate_context(self, service_name, replace, cloud) #
|
|
|
209 |
#===========================================================================#
|
|
|
210 |
def generate_context(self, service_name, cloud = None, ip_whitelist = None):
|
|
|
211 |
"""Generates the contextualization file for the default/given cloud.
|
|
|
212 |
|
|
|
213 |
@param cloud (Optional) If specified, the context will be generated
|
|
|
214 |
for it, otherwise it will be generated for the default
|
|
|
215 |
cloud
|
|
|
216 |
|
|
|
217 |
@param service_name Used to know which config_files and scripts
|
|
|
218 |
to select
|
|
|
219 |
"""
|
|
|
220 |
|
|
|
221 |
if cloud != None:
|
|
|
222 |
c = cloud
|
|
|
223 |
else:
|
|
|
224 |
c = self.__default_cloud
|
|
|
225 |
contxt = self._get_context_file(service_name, \
|
|
|
226 |
c.get_cloud_type())
|
|
|
227 |
c.set_context_template(contxt)
|
|
|
228 |
|
|
|
229 |
#===========================================================================#
|
|
|
230 |
# update_context(self, replace, cloud) #
|
|
|
231 |
#===========================================================================#
|
|
|
232 |
def update_context(self, replace = {}, cloud = None):
|
|
|
233 |
"""Updates the contextualization file for the default/given cloud.
|
|
|
234 |
|
|
|
235 |
@param replace A dictionary that specifies which words shoud be
|
|
|
236 |
replaced with what. For example:
|
|
|
237 |
replace = dict(name='A', age='57')
|
|
|
238 |
context1 = '$name , $age'
|
|
|
239 |
=> new_context1 = 'A , 57'
|
|
|
240 |
context2 ='${name}na, ${age}'
|
|
|
241 |
=> new_context2 = 'Ana, 57'
|
|
|
242 |
|
|
|
243 |
@param cloud (Optional) If specified, the context will be generated
|
|
|
244 |
for it, otherwise it will be generated for the default
|
|
|
245 |
cloud
|
|
|
246 |
|
|
|
247 |
"""
|
|
|
248 |
|
|
|
249 |
if cloud != None:
|
|
|
250 |
c = cloud
|
|
|
251 |
else:
|
|
|
252 |
c = self.__default_cloud
|
|
|
253 |
contxt = c.get_context_template()
|
|
|
254 |
contxt = Template(contxt).safe_substitute(replace)
|
|
|
255 |
c.config(context=contxt)
|
|
|
256 |
|
|
|
257 |
#===========================================================================#
|
|
|
258 |
# get_clouds(self) #
|
|
|
259 |
#===========================================================================#
|
|
|
260 |
# TODO: For now we work on just one cloud
|
|
|
261 |
def get_clouds(self):
|
|
|
262 |
"""
|
|
|
263 |
@return The list of cloud objects
|
|
|
264 |
|
|
|
265 |
"""
|
|
|
266 |
return [self.__default_cloud]
|
|
|
267 |
|
|
|
268 |
|
|
|
269 |
#===========================================================================#
|
|
|
270 |
# config_cloud(self, cloud, config_params) #
|
|
|
271 |
#===========================================================================#
|
|
|
272 |
def config_cloud(self, cloud, config_params):
|
|
|
273 |
"""Configures some parameters in the given cloud
|
|
|
274 |
|
|
|
275 |
@param cloud The cloud to be configured
|
|
|
276 |
|
|
|
277 |
@param config_params A dictionary containing the configuration
|
|
|
278 |
parameters (are specific to the cloud)
|
|
|
279 |
"""
|
|
|
280 |
cloud.config(config_params)
|
|
|
281 |
|
|
|
282 |
#===========================================================================#
|
|
|
283 |
|
|
|
284 |
def __kill_nodesById(self, ids):
|
|
|
285 |
#TODO: send also the cloud
|
|
|
286 |
for id in ids:
|
|
|
287 |
self.__logger.debug('[_kill_nodes]: killing ' + str(id))
|
|
|
288 |
try:
|
|
|
289 |
# node may not be in map if it failed to start
|
|
|
290 |
if id in self.__reservation_map:
|
|
|
291 |
timer = self.__reservation_map.pop(id)
|
|
|
292 |
if timer.remove_node(id) < 1:
|
|
|
293 |
timer.stop()
|
|
|
294 |
self.__default_cloud.kill_instance(id)
|
|
|
295 |
except: self.__logger.exception('[_kill_nodes]: ' \
|
|
|
296 |
'Failed to kill node %s', id)
|
|
|
297 |
|
|
|
298 |
def __wait_for_nodes(self, nodes, test_agent, port, poll_interval=10):
|
|
|
299 |
self.__logger.debug('[__wait_for_nodes]: going to start polling')
|
|
|
300 |
done = []
|
|
|
301 |
poll_cycles = 0
|
|
|
302 |
while len(nodes) > 0:
|
|
|
303 |
poll_cycles += 1
|
|
|
304 |
for i in nodes:
|
|
|
305 |
up = True
|
|
|
306 |
try:
|
|
|
307 |
if i['ip'] != '':
|
|
|
308 |
test_agent(i['ip'], port)
|
|
|
309 |
else:
|
|
|
310 |
up = False
|
|
|
311 |
except: up = False
|
|
|
312 |
if up:
|
|
|
313 |
done.append(i)
|
|
|
314 |
nodes = [ i for i in nodes if i not in done]
|
|
|
315 |
if len(nodes):
|
|
|
316 |
if poll_cycles * poll_interval > 180:
|
|
|
317 |
# at least 2mins of sleeping + poll time
|
|
|
318 |
return (done, nodes)
|
|
|
319 |
|
|
|
320 |
self.__logger.debug('[_wait_for_nodes]: waiting for %d nodes' \
|
|
|
321 |
% len(nodes))
|
|
|
322 |
time.sleep(poll_interval)
|
|
|
323 |
no_ip_nodes = [ i for i in nodes if i['ip'] == '' ]
|
|
|
324 |
if no_ip_nodes:
|
|
|
325 |
self.__logger.debug('[_wait_for_nodes]: refreshing %d nodes' \
|
|
|
326 |
% len(no_ip_nodes))
|
|
|
327 |
refreshed_list = self.__default_cloud.list_vms()
|
|
|
328 |
for i in no_ip_nodes:
|
|
|
329 |
i['ip'] = refreshed_list[i['id']]['ip']
|
|
|
330 |
self.__logger.debug('[_wait_for_nodes]: All nodes are ready %s' \
|
|
|
331 |
% str(done))
|
|
|
332 |
return (done, [])
|
|
|
333 |
|
|
|
334 |
def _get_context_file(self, service_name, cloud):
|
|
|
335 |
conpaas_home = self.__config_parser.get('manager', 'CONPAAS_HOME')
|
|
|
336 |
cloud_scripts_dir = conpaas_home + '/scripts/cloud'
|
|
|
337 |
agent_cfg_dir = conpaas_home + '/config/agent'
|
|
|
338 |
agent_scripts_dir = conpaas_home + '/scripts/agent'
|
|
|
339 |
|
|
|
340 |
bootstrap = self.__config_parser.get('manager', 'BOOTSTRAP')
|
|
|
341 |
|
|
|
342 |
# Get contextualization script for the cloud in which the manager resides
|
|
|
343 |
cloud_script_file = open(cloud_scripts_dir + '/' + cloud, 'r')
|
|
|
344 |
cloud_script = cloud_script_file.read()
|
|
|
345 |
|
|
|
346 |
# Get agent setup file
|
|
|
347 |
agent_setup_file = open(agent_scripts_dir + '/agent-setup', 'r')
|
|
|
348 |
agent_setup = Template(agent_setup_file.read()). \
|
|
|
349 |
safe_substitute(SOURCE=bootstrap)
|
|
|
350 |
|
|
|
351 |
# Get agent config file - add to the default one the one specific
|
|
|
352 |
# to the service if it exists
|
|
|
353 |
default_agent_cfg_file = open(agent_cfg_dir + '/default-agent.cfg')
|
|
|
354 |
agent_cfg = Template(default_agent_cfg_file.read()). \
|
|
|
355 |
safe_substitute(AGENT_TYPE=service_name, \
|
|
|
356 |
MANAGER_IP=get_ip_address('eth0'))
|
|
|
357 |
|
|
|
358 |
if os.path.isfile(agent_cfg_dir + '/' + service_name + '-agent.cfg'):
|
|
|
359 |
agent_cfg_file = open(agent_cfg_dir + '/'+ service_name + '-agent.cfg')
|
|
|
360 |
agent_cfg += '\n' + agent_cfg_file.read()
|
|
|
361 |
|
|
|
362 |
# Get agent start file - if none for this service, use the default one
|
|
|
363 |
if os.path.isfile(agent_scripts_dir + '/' + service_name + '-agent-start'):
|
|
|
364 |
agent_start_file = open(agent_scripts_dir + \
|
|
|
365 |
'/'+ service_name + '-agent-start')
|
|
|
366 |
else:
|
|
|
367 |
agent_start_file = open(agent_scripts_dir + '/default-agent-start')
|
|
|
368 |
agent_start = agent_start_file.read()
|
|
|
369 |
|
|
|
370 |
## Concatenate the files
|
|
|
371 |
context_file = cloud_script + '\n\n'
|
|
|
372 |
context_file += agent_setup + '\n\n'
|
|
|
373 |
context_file += 'cat <<EOF > $ROOT_DIR/config.cfg\n'
|
|
|
374 |
context_file += agent_cfg + '\n' + 'EOF\n\n'
|
|
|
375 |
context_file += agent_start + '\n'
|
|
|
376 |
|
|
|
377 |
return context_file
|
|
|
378 |
|
|
|
379 |
def __force_terminate_service(self):
|
|
|
380 |
# DO NOT release lock after acquiring it
|
|
|
381 |
# to prevent the creation of more nodes
|
|
|
382 |
self.__force_terminate_lock.acquire()
|
|
|
383 |
self.__logger.debug('OUT OF CREDIT, TERMINATING SERVICE')
|
|
|
384 |
|
|
|
385 |
# kill all partially created nodes
|
|
|
386 |
self.__kill_nodesById(self.__partially_created_nodes)
|
|
|
387 |
|
|
|
388 |
# kill all created nodes
|
|
|
389 |
self.__kill_nodesById(self.__created_nodes)
|
|
|
390 |
|
|
|
391 |
# notify front-end, attempt 10 times until successful
|
|
|
392 |
for _ in range(10):
|
|
|
393 |
try:
|
|
|
394 |
parsed_url = urlparse.urlparse(self.__fe_terminateUrl)
|
|
|
395 |
_, body = _http_post(parsed_url.hostname,
|
|
|
396 |
parsed_url.port or 80,
|
|
|
397 |
parsed_url.path,
|
|
|
398 |
{'sid': self.__fe_service_id})
|
|
|
399 |
obj = json.loads(body)
|
|
|
400 |
if not obj['error']: break
|
|
|
401 |
except:
|
|
|
402 |
self.__logger.exception('Failed to terminate service')
|
|
|
403 |
|
|
|
404 |
def __deduct_credit(self, value):
|
|
|
405 |
try:
|
|
|
406 |
parsed_url = urlparse.urlparse(self.__fe_creditUrl)
|
|
|
407 |
_, body = _http_post(parsed_url.hostname, parsed_url.port or 80,
|
|
|
408 |
parsed_url.path, {'sid': self.__fe_service_id,
|
|
|
409 |
'decrement': value})
|
|
|
410 |
obj = json.loads(body)
|
|
|
411 |
return not obj['error']
|
|
|
412 |
except:
|
|
|
413 |
self.__logger.exception('Failed to deduct credit')
|
|
|
414 |
return False
|
|
|
415 |
|
|
|
416 |
def __deduct_and_check_credit(self, value):
|
|
|
417 |
if not self.__deduct_credit(value):
|
|
|
418 |
self.__force_terminate_service()
|
|
|
419 |
|
|
|
420 |
|
|
|
421 |
class ReservationTimer(Thread):
|
|
|
422 |
def __init__(self, nodes, delay, callback,
|
|
|
423 |
reservation_logger, interval=3600):
|
|
|
424 |
|
|
|
425 |
Thread.__init__(self)
|
|
|
426 |
self.nodes = nodes
|
|
|
427 |
self.event = Event()
|
|
|
428 |
self.delay = delay
|
|
|
429 |
self.interval = interval
|
|
|
430 |
self.callback = callback
|
|
|
431 |
self.lock = Lock()
|
|
|
432 |
self.reservation_logger = reservation_logger
|
|
|
433 |
self.reservation_logger.debug('RTIMER Creating timer for %s' \
|
|
|
434 |
% (str(self.nodes)))
|
|
|
435 |
|
|
|
436 |
def remove_node(self, node_id):
|
|
|
437 |
with self.lock:
|
|
|
438 |
self.nodes.remove(node_id)
|
|
|
439 |
self.reservation_logger.debug('RTIMER removed node %s, ' \
|
|
|
440 |
'updated list %s' \
|
|
|
441 |
% (node_id, str(self.nodes)))
|
|
|
442 |
return len(self.nodes)
|
|
|
443 |
|
|
|
444 |
def run(self):
|
|
|
445 |
self.event.wait(self.delay)
|
|
|
446 |
while not self.event.is_set():
|
|
|
447 |
with self.lock:
|
|
|
448 |
list_size = len(self.nodes)
|
|
|
449 |
self.reservation_logger.debug('RTIMER charging user credit for ' \
|
|
|
450 |
'hour of %d instances %s' \
|
|
|
451 |
% (list_size, str(self.nodes)))
|
|
|
452 |
Thread(target=self.callback, args=[list_size]).start()
|
|
|
453 |
self.event.wait(self.interval)
|
|
|
454 |
|
|
|
455 |
def stop(self):
|
|
|
456 |
self.event.set()
|
|
|
457 |
|