OW2 Consortium contrail

Compare Revisions

Ignore whitespace Rev 3042 → Rev 3043

/trunk/conpaas/conpaas-services/src/conpaas/services/scalaris/manager/manager.py
248,10 → 248,21
if count > len(self.nodes) - 1:
self.state = self.S_RUNNING
return HttpErrorResponse('ERROR: Cannot delete so many workers')
for i in range(0, count):
self.controller.delete_nodes([self.nodes.pop(1)])
self.state = self.S_RUNNING
return HttpJsonResponse()
if count == len(self.nodes):
self.logger.info('killing the nodes')
for i in range(0, count):
self.controller.delete_nodes([self.nodes.pop(1)])
self.state = self.S_RUNNING
return HttpJsonResponse()
else:
for i in range(0, count):
node = self.nodes.pop(1)
self.logger.info('graceful leave on %s', node)
res = client.graceful_leave(node.ip, 5555)
self.logger.info('graceful leave: %s', res)
self.controller.delete_nodes([node])
self.state = self.S_RUNNING
return HttpJsonResponse()
 
@expose('GET')
def getLog(self, kwargs):
/trunk/conpaas/conpaas-services/src/conpaas/services/scalaris/agent/scalarisclient.py
File deleted
/trunk/conpaas/conpaas-services/src/conpaas/services/scalaris/agent/agent.py
52,7 → 52,7
import subprocess
import commands
from os.path import join
from conpaas.services.scalaris.agent import scalarisclient
from conpaas.services.scalaris.agent import scalaris
 
ETC='/etc/scalaris/'
 
120,6 → 120,14
self.logger.info('Agent is running')
return HttpJsonResponse()
 
@expose('POST')
def graceful_leave(self, kwargs):
self.logger.info('called graceful_leave')
vm = scalaris.ScalarisVM()
res = vm.shutdownVM()
self.logger.info('called shutdownVM %s', res)
return HttpJsonResponse(res)
 
def _write_config(self, known_hosts, mgmt_server):
tmpl = open(self.config_template).read()
conf_fd = open(self.config_file, 'w')
/trunk/conpaas/conpaas-services/src/conpaas/services/scalaris/agent/client.py
62,3 → 62,7
def startup(host, port, ip):
method = 'startup'
return _check(_jsonrpc_post(host, port, '/', method, {'ip': ip}))
 
def graceful_leave(host, port):
method = 'graceful_leave'
return _check(_jsonrpc_post(host, port, '/', method, {}))
/trunk/conpaas/conpaas-services/src/conpaas/services/scalaris/agent/scalaris.py
New file
0,0 → 1,1477
#!/usr/bin/python
# Copyright 2011 Zuse Institute Berlin
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
 
import httplib, urlparse, base64, urllib
import os, threading, numbers
from datetime import datetime, timedelta
try: import simplejson as json
except ImportError: import json
 
if 'SCALARIS_JSON_URL' in os.environ and os.environ['SCALARIS_JSON_URL'] != '':
DEFAULT_URL = os.environ['SCALARIS_JSON_URL']
else:
DEFAULT_URL = 'http://localhost:8000'
"""default URL and port to a scalaris node"""
DEFAULT_TIMEOUT = 5
"""socket timeout in seconds"""
DEFAULT_PATH = '/jsonrpc.yaws'
"""path to the json rpc page"""
 
class JSONConnection(object):
"""
Abstracts connections to scalaris using JSON
"""
def __init__(self, url = DEFAULT_URL, timeout = DEFAULT_TIMEOUT):
"""
Creates a JSON connection to the given URL using the given TCP timeout
"""
try:
uri = urlparse.urlparse(url)
self._conn = httplib.HTTPConnection(uri.hostname, uri.port,
timeout = timeout)
except Exception as instance:
raise ConnectionError(instance)
 
def callp(self, path, function, params, retry_if_bad_status = True):
return self.call(function, params, path = path, retry_if_bad_status = retry_if_bad_status)
 
def call(self, function, params, path = DEFAULT_PATH, retry_if_bad_status = True):
"""
Calls the given function with the given parameters via the JSON
interface of scalaris.
"""
params2 = {'jsonrpc': '2.0',
'method': function,
'params': params,
'id': 0}
try:
data = None
response = None
# use compact JSON encoding:
params_json = json.dumps(params2, separators=(',',':'))
headers = {"Content-type": "application/json; charset=utf-8"}
# note: we need to quote, e.g. if a '%' is in the value string:
self._conn.request("POST", path, urllib.quote(params_json), headers)
response = self._conn.getresponse()
#print response.status, response.reason
data = response.read().decode('utf-8')
if (response.status < 200 or response.status >= 300):
raise ConnectionError(data, response = response)
response_json = json.loads(data)
return response_json['result']
except httplib.BadStatusLine as instance:
#print 'HTTP STATUS:', response.status, response.reason, params_json
self.close()
if retry_if_bad_status:
return self.call(function, params, path = path, retry_if_bad_status = False)
else:
raise ConnectionError(data, response = response, error = instance)
except ConnectionError:
#print 'HTTP STATUS:', response.status, response.reason, params_json
self.close()
raise
except Exception as instance:
#print 'HTTP STATUS:', response.status, response.reason, params_json
self.close()
raise ConnectionError(data, response = response, error = instance)
 
@staticmethod
def encode_value(value):
"""
Encodes the value to the form required by the scalaris JSON API
"""
if isinstance(value, bytearray):
return {'type': 'as_bin', 'value': (base64.b64encode(bytes(value))).decode('ascii')}
else:
return {'type': 'as_is', 'value': value}
 
@staticmethod
def decode_value(value):
"""
Decodes the value from the scalaris JSON API form to a native type
"""
if ('type' not in value) or ('value' not in value):
raise UnknownError(value)
if value['type'] == 'as_bin':
return bytearray(base64.b64decode(value['value'].encode('ascii')))
else:
return value['value']
# result: {'status': 'ok'} or
# {'status': 'fail', 'reason': 'timeout'}
@staticmethod
def check_fail_abort(result):
"""
Processes the result of some Scalaris operation and raises a
TimeoutError if found.
"""
if result == {'status': 'fail', 'reason': 'timeout'}:
raise TimeoutError(result)
# result: {'status': 'ok', 'value': xxx} or
# {'status': 'fail', 'reason': 'timeout' or 'not_found'}
@staticmethod
def process_result_read(result):
"""
Processes the result of a read operation.
Returns the read value on success.
Raises the appropriate exception if the operation failed.
"""
if isinstance(result, dict) and 'status' in result and len(result) == 2:
if result['status'] == 'ok' and 'value' in result:
return JSONConnection.decode_value(result['value'])
elif result['status'] == 'fail' and 'reason' in result:
if result['reason'] == 'timeout':
raise TimeoutError(result)
elif result['reason'] == 'not_found':
raise NotFoundError(result)
raise UnknownError(result)
# result: {'status': 'ok'} or
# {'status': 'fail', 'reason': 'timeout'}
@staticmethod
def process_result_write(result):
"""
Processes the result of a write operation.
Raises the appropriate exception if the operation failed.
"""
if isinstance(result, dict):
if result == {'status': 'ok'}:
return None
elif result == {'status': 'fail', 'reason': 'timeout'}:
raise TimeoutError(result)
raise UnknownError(result)
# result: {'status': 'ok'} or
# {'status': 'fail', 'reason': 'abort', 'keys': <list>} or
# {'status': 'fail', 'reason': 'timeout'}
@staticmethod
def process_result_commit(result):
"""
Processes the result of a commit operation.
Raises the appropriate exception if the operation failed.
"""
if isinstance(result, dict) and 'status' in result:
if result == {'status': 'ok'}:
return None
elif result['status'] == 'fail' and 'reason' in result:
if len(result) == 2 and result['reason'] == 'timeout':
raise TimeoutError(result)
elif len(result) == 3 and result['reason'] == 'abort' and 'keys' in result:
raise AbortError(result, result['keys'])
raise UnknownError(result)
# results: {'status': 'ok'} or
# {'status': 'fail', 'reason': 'timeout' or 'not_a_list'} or
@staticmethod
def process_result_add_del_on_list(result):
"""
Processes the result of a add_del_on_list operation.
Raises the appropriate exception if the operation failed.
"""
if isinstance(result, dict) and 'status' in result:
if result == {'status': 'ok'}:
return None
elif result['status'] == 'fail' and 'reason' in result:
if len(result) == 2:
if result['reason'] == 'timeout':
raise TimeoutError(result)
elif result['reason'] == 'not_a_list':
raise NotAListError(result)
raise UnknownError(result)
# results: {'status': 'ok'} or
# {'status': 'fail', 'reason': 'timeout' or 'not_a_number'} or
@staticmethod
def process_result_add_on_nr(result):
"""
Processes the result of a add_on_nr operation.
Raises the appropriate exception if the operation failed.
"""
if isinstance(result, dict) and 'status' in result:
if result == {'status': 'ok'}:
return None
elif result['status'] == 'fail' and 'reason' in result:
if len(result) == 2:
if result['reason'] == 'timeout':
raise TimeoutError(result)
elif result['reason'] == 'not_a_number':
raise NotANumberError(result)
raise UnknownError(result)
# results: {'status': 'ok'} or
# {'status': 'fail', 'reason': 'timeout' or 'not_found'} or
# {'status': 'fail', 'reason': 'key_changed', 'value': xxx}
@staticmethod
def process_result_test_and_set(result):
"""
Processes the result of a test_and_set operation.
Raises the appropriate exception if the operation failed.
"""
if isinstance(result, dict) and 'status' in result:
if result == {'status': 'ok'}:
return None
elif result['status'] == 'fail' and 'reason' in result:
if len(result) == 2:
if result['reason'] == 'timeout':
raise TimeoutError(result)
elif result['reason'] == 'not_found':
raise NotFoundError(result)
elif result['reason'] == 'key_changed' and 'value' in result and len(result) == 3:
raise KeyChangedError(result, JSONConnection.decode_value(result['value']))
raise UnknownError(result)
# results: {'status': 'ok'}
@staticmethod
def process_result_publish(result):
"""
Processes the result of a publish operation.
Raises the appropriate exception if the operation failed.
"""
if result == {'status': 'ok'}:
return None
raise UnknownError(result)
# results: {'status': 'ok'} or
# {'status': 'fail', 'reason': 'timeout' or 'abort'}
@staticmethod
def process_result_subscribe(result):
"""
Processes the result of a subscribe operation.
Raises the appropriate exception if the operation failed.
"""
JSONConnection.process_result_commit(result)
# results: {'status': 'ok'} or
# {'status': 'fail', 'reason': 'abort', 'keys': <list>} or
# {'status': 'fail', 'reason': 'timeout' or 'not_found'}
@staticmethod
def process_result_unsubscribe(result):
"""
Processes the result of a unsubscribe operation.
Raises the appropriate exception if the operation failed.
"""
if result == {'status': 'ok'}:
return None
elif isinstance(result, dict) and 'status' in result:
if result['status'] == 'fail' and 'reason' in result:
if len(result) == 2:
if result['reason'] == 'timeout':
raise TimeoutError(result)
elif result['reason'] == 'not_found':
raise NotFoundError(result)
elif len(result) == 3 and result['reason'] == 'abort' and 'keys' in result:
raise AbortError(result, result['keys'])
raise UnknownError(result)
# results: [urls=str()]
@staticmethod
def process_result_get_subscribers(result):
"""
Processes the result of a get_subscribers operation.
Returns the list of subscribers on success.
Raises the appropriate exception if the operation failed.
"""
if isinstance(result, list):
return result
raise UnknownError(result)
 
# results: {'ok': xxx, 'results': ['ok' or 'locks_set' or 'undef']} or
# {'failure': 'timeout', 'ok': xxx, 'results': ['ok' or 'locks_set' or 'undef']}
@staticmethod
def process_result_delete(result):
"""
Processes the result of a delete operation.
Returns the tuple
(<success (True | 'timeout')>, <number of deleted items>, <detailed results>) on success.
Raises the appropriate exception if the operation failed.
"""
if isinstance(result, dict) and 'ok' in result and 'results' in result:
if 'failure' not in result:
return (True, result['ok'], result['results'])
elif result['failure'] == 'timeout':
return ('timeout', result['ok'], result['results'])
raise UnknownError(result)
# results: ['ok' or 'locks_set' or 'undef']
@staticmethod
def create_delete_result(result):
"""
Creates a new DeleteResult from the given result list.
"""
ok = 0
locks_set = 0
undefined = 0
if isinstance(result, list):
for element in result:
if element == 'ok':
ok += 1
elif element == 'locks_set':
locks_set += 1
elif element == 'undef':
undefined += 1
else:
raise UnknownError('Unknown reason ' + element + 'in ' + result)
return DeleteResult(ok, locks_set, undefined)
raise UnknownError('Unknown result ' + result)
 
# results: {'tlog': xxx,
# 'results': [{'status': 'ok'} or {'status': 'ok', 'value': xxx} or
# {'status': 'fail', 'reason': 'timeout' or 'abort' or 'not_found'}]}
@staticmethod
def process_result_req_list_t(result):
"""
Processes the result of a req_list operation of the Transaction class.
Returns the tuple (<tlog>, <result>) on success.
Raises the appropriate exception if the operation failed.
"""
if 'tlog' not in result or 'results' not in result or \
not isinstance(result['results'], list):
raise UnknownError(result)
return (result['tlog'], result['results'])
 
# results: [{'status': 'ok'} or {'status': 'ok', 'value': xxx} or
# {'status': 'fail', 'reason': 'timeout' or 'abort' or 'not_found'}]
@staticmethod
def process_result_req_list_tso(result):
"""
Processes the result of a req_list operation of the TransactionSingleOp class.
Returns <result> on success.
Raises the appropriate exception if the operation failed.
"""
if not isinstance(result, list):
raise UnknownError(result)
return result
# results: {'status': 'ok', 'value': xxx}
@staticmethod
def process_result_vm_get_version(result):
"""
Processes the result of a api_vm/get_version operation.
Raises the appropriate exception if the operation failed.
"""
if isinstance(result, dict) and 'status' in result and 'value' in result:
if result['status'] == 'ok':
return result['value']
raise UnknownError(result)
# value: {'scalaris_version': xxx,
# 'erlang_version': xxx,
# 'mem_total': xxx,
# 'uptime': xxx,
# 'erlang_node': xxx,
# 'ip': xxx,
# 'port': xxx,
# 'yaws_port': xxx}
# results: {'status': 'ok', 'value': <value>
@staticmethod
def process_result_vm_get_info(result):
"""
Processes the result of a api_vm/get_info operation.
Raises the appropriate exception if the operation failed.
"""
if isinstance(result, dict) and 'status' in result and 'value' in result:
value = result['value']
if result['status'] == 'ok' and \
'scalaris_version' in value and 'erlang_version' in value and \
'mem_total' in value and 'uptime' in value and \
'erlang_node' in value and 'ip' in value and \
'port' in value and 'yaws_port' in value:
try:
return ScalarisVM.GetInfoResult(value['scalaris_version'],
value['erlang_version'],
int(value['mem_total']),
int(value['uptime']),
value['erlang_node'],
value['ip'],
int(value['port']),
int(value['yaws_port']))
except:
pass
raise UnknownError(result)
# results: {'status': 'ok', 'value': xxx}
@staticmethod
def process_result_vm_get_number_of_nodes(result):
"""
Processes the result of a api_vm/number_of_nodes operation.
Raises the appropriate exception if the operation failed.
"""
if isinstance(result, dict) and 'status' in result and 'value' in result:
if result['status'] == 'ok':
try:
return int(result['value'])
except:
pass
raise UnknownError(result)
# results: {'status': 'ok', 'value': [xxx]}
@staticmethod
def process_result_vm_get_nodes(result):
"""
Processes the result of a api_vm/get_nodes operation.
Raises the appropriate exception if the operation failed.
"""
if isinstance(result, dict) and 'status' in result and 'value' in result:
if result['status'] == 'ok' and isinstance(result['value'], list):
return result['value']
raise UnknownError(result)
# results: {'status': 'ok', 'ok': [xxx], 'failed': [xxx]}
@staticmethod
def process_result_vm_add_nodes(result):
"""
Processes the result of a api_vm/add_nodes operation.
Raises the appropriate exception if the operation failed.
"""
if isinstance(result, dict) and 'status' in result and 'ok' in result and 'failed' in result:
if result['status'] == 'ok' and isinstance(result['ok'], list) and isinstance(result['failed'], list):
return (result['ok'], result['failed'])
raise UnknownError(result)
# results: {'status': 'ok' | 'not_found'}
@staticmethod
def process_result_vm_delete_node(result):
"""
Processes the result of a api_vm/shutdown_node and api_vm/kill_node operations.
Raises the appropriate exception if the operation failed.
"""
if result == {'status': 'ok'}:
return True
if result == {'status': 'not_found'}:
return False
raise UnknownError(result)
# results: {'status': 'ok', 'ok': [xxx]}
@staticmethod
def process_result_vm_delete_nodes(result):
"""
Processes the result of a api_vm/shutdown_nodes and api_vm/kill_nodes operations.
Raises the appropriate exception if the operation failed.
"""
if isinstance(result, dict) and 'status' in result and 'ok' in result and \
result['status'] == 'ok' and isinstance(result['ok'], list):
return result['ok']
raise UnknownError(result)
# results: {'status': 'ok', 'ok': [xxx], 'not_found': [xxx]}
@staticmethod
def process_result_vm_delete_nodes_by_name(result):
"""
Processes the result of a api_vm/shutdown_nodes_by_name and api_vm/kill_nodes_by_name operations.
Raises the appropriate exception if the operation failed.
"""
if isinstance(result, dict) and 'status' in result and 'ok' in result and 'not_found' in result:
if result['status'] == 'ok' and isinstance(result['ok'], list) and isinstance(result['not_found'], list):
return (result['ok'], result['not_found'])
raise UnknownError(result)
# results: {'status': 'ok'}
@staticmethod
def process_result_vm_delete_vm(result):
"""
Processes the result of a api_vm/shutdown_vm and api_vm/kill_vm operations.
Raises the appropriate exception if the operation failed.
"""
if result == {'status': 'ok'}:
return None
raise UnknownError(result)
# VM: {'erlang_node': xxx,
# 'ip': xxx,
# 'port': xxx,
# 'yaws_port': xxx}
# results: {'status': 'ok', 'value': [<VM>]
@staticmethod
def process_result_vm_get_other_vms(result):
"""
Processes the result of a api_vm/get_other_vms operation.
Raises the appropriate exception if the operation failed.
"""
if isinstance(result, dict) and 'status' in result and 'value' in result:
value = result['value']
if result['status'] == 'ok' and isinstance(value, list):
vms = []
try:
for vm in value:
if 'erlang_node' in vm and 'ip' in vm and \
'port' in vm and 'yaws_port' in vm:
vms.append('http://' + vm['ip'] + ':' + str(int(vm['yaws_port'])))
else:
raise UnknownError(result)
return vms
except:
pass
raise UnknownError(result)
# result: 'ok'
@staticmethod
def process_result_nop(result):
"""
Processes the result of a nop operation.
Raises the appropriate exception if the operation failed.
"""
if result != 'ok':
raise UnknownError(result)
@staticmethod
def new_req_list_t(other = None):
"""
Returns a new ReqList object allowing multiple parallel requests for
the Transaction class.
"""
return _JSONReqListTransaction(other)
@staticmethod
def new_req_list_tso(other = None):
"""
Returns a new ReqList object allowing multiple parallel requests for
the TransactionSingleOp class.
"""
return _JSONReqListTransactionSingleOp(other)
def close(self):
self._conn.close()
 
class ScalarisError(Exception):
"""Base class for errors in the scalaris package."""
 
class AbortError(ScalarisError):
"""
Exception that is thrown if a the commit of a write operation on a scalaris
ring fails.
"""
def __init__(self, raw_result, failed_keys):
self.raw_result = raw_result
self.failed_keys = failed_keys
def __str__(self):
return repr(self.raw_result)
 
class ConnectionError(ScalarisError):
"""
Exception that is thrown if an operation on a scalaris ring fails because
a connection does not exist or has been disconnected.
"""
def __init__(self, raw_result, response = None, error = None):
self.raw_result = raw_result
self.response = response
self.error = error
def __str__(self):
result_str = ''
if self.response is not None:
result_str += 'status: ' + str(self.response.status)
result_str += ', reason: ' + self.response.reason + '\n'
if self.error is not None:
result_str += 'error: ' + repr(self.error) + '\n'
result_str += 'data: ' + repr(self.raw_result)
return result_str
 
class KeyChangedError(ScalarisError):
"""
Exception that is thrown if a test_and_set operation on a scalaris ring
fails because the old value did not match the expected value.
"""
def __init__(self, raw_result, old_value):
self.raw_result = raw_result
self.old_value = old_value
def __str__(self):
return repr(self.raw_result) + ', old value: ' + repr(self.old_value)
 
class NodeNotFoundError(ScalarisError):
"""
Exception that is thrown if a delete operation on a scalaris ring fails
because no scalaris node was found.
"""
def __init__(self, raw_result):
self.raw_result = raw_result
def __str__(self):
return repr(self.raw_result)
 
class NotFoundError(ScalarisError):
"""
Exception that is thrown if a read operation on a scalaris ring fails
because the key did not exist before.
"""
def __init__(self, raw_result):
self.raw_result = raw_result
def __str__(self):
return repr(self.raw_result)
 
class NotAListError(ScalarisError):
"""
Exception that is thrown if a add_del_on_list operation on a scalaris ring
fails because the participating values are not lists.
"""
def __init__(self, raw_result):
self.raw_result = raw_result
def __str__(self):
return repr(self.raw_result)
 
class NotANumberError(ScalarisError):
"""
Exception that is thrown if a add_del_on_list operation on a scalaris ring
fails because the participating values are not numbers.
"""
def __init__(self, raw_result):
self.raw_result = raw_result
def __str__(self):
return repr(self.raw_result)
 
class TimeoutError(ScalarisError):
"""
Exception that is thrown if a read or write operation on a scalaris ring
fails due to a timeout.
"""
def __init__(self, raw_result):
self.raw_result = raw_result
def __str__(self):
return repr(self.raw_result)
 
class UnknownError(ScalarisError):
"""
Generic exception that is thrown during operations on a scalaris ring, e.g.
if an unknown result has been returned.
"""
def __init__(self, raw_result):
self.raw_result = raw_result
def __str__(self):
return repr(self.raw_result)
 
class DeleteResult(object):
"""
Stores the result of a delete operation.
"""
def __init__(self, ok, locks_set, undefined):
self.ok = ok
self.locks_set = locks_set
self.undefined = undefined
 
class ConnectionPool(object):
"""
Implements a simple (thread-safe) connection pool for Scalaris connections.
"""
def __init__(self, max_connections):
"""
Create a new connection pool with the given maximum number of connections.
"""
self._max_connections = max_connections
self._available_conns = []
self._checked_out_sema = threading.BoundedSemaphore(value=max_connections)
self._wait_cond = threading.Condition()
def _new_connection(self):
"""
Creates a new connection for the pool. Override this to use some other
connection class than JSONConnection.
"""
return JSONConnection()
def _get_connection(self):
"""
Gets a connection from the pool. Creates a new connection if necessary.
Returns <tt>None</tt> if the maximum number of connections has already
been hit.
"""
conn = None
if self._max_connections == 0:
conn = self._new_connection()
elif self._checked_out_sema.acquire(False):
try:
conn = self._available_conns.pop(0)
except IndexError:
conn = self._new_connection()
return conn
def get_connection(self, timeout = None):
"""
Tries to get a valid connection from the pool waiting at most
the given timeout. If timeout is an integer, it will be interpreted as
a number of milliseconds. Alternatively, timeout can be given as a
datetime.timedelta. Creates a new connection if necessary
and the maximum number of connections has not been hit yet.
If the timeout is hit and no connection is available, <tt>None</tt> is
returned.
"""
if timeout == None:
return self._get_connection()
else:
if isinstance(timeout, numbers.Integral ):
timeout = timedelta(milliseconds=timeout)
start = datetime.now()
while True:
conn = self._get_connection()
if not conn is None:
return conn
self._wait_cond.wait(timeout.microseconds / 1000.0)
end = datetime.now()
if end - start > timeout:
return None
def release_connection(self, connection):
"""
Puts the given connection back into the pool.
"""
self._available_conns.append(connection)
self._checked_out_sema.release()
self._wait_cond.notify_all()
def close_all(self):
"""
Close all connections to scalaris.
"""
for conn in self._available_conns:
conn.close()
self._available_conns = []
 
class TransactionSingleOp(object):
"""
Single write or read operations on scalaris.
"""
def __init__(self, conn = None):
"""
Create a new object using the given connection
"""
if conn is None:
conn = JSONConnection()
self._conn = conn
def new_req_list(self, other = None):
"""
Returns a new ReqList object allowing multiple parallel requests.
"""
return self._conn.new_req_list_tso(other)
def req_list(self, reqlist):
"""
Issues multiple parallel requests to scalaris; each will be committed.
Request lists can be created using new_req_list().
The returned list has the following form:
[{'status': 'ok'} or {'status': 'ok', 'value': xxx} or
{'status': 'fail', 'reason': 'timeout' or 'abort' or 'not_found'}].
Elements of this list can be processed with process_result_read() and
process_result_write().
"""
result = self._conn.callp('/api/tx.yaws', 'req_list_commit_each', [reqlist.get_requests()])
result = self._conn.process_result_req_list_tso(result)
return result
def process_result_read(self, result):
"""
Processes a result element from the list returned by req_list() which
originated from a read operation.
Returns the read value on success.
Raises the appropriate exceptions if a failure occurred during the
operation.
Beware: lists of (small) integers may be (falsely) returned as a string -
use str_to_list() to convert such strings.
"""
return self._conn.process_result_read(result)
 
def process_result_write(self, result):
"""
Processes a result element from the list returned by req_list() which
originated from a write operation.
Raises the appropriate exceptions if a failure occurred during the
operation.
"""
self._conn.check_fail_abort(result)
return self._conn.process_result_write(result)
 
def process_result_add_del_on_list(self, result):
"""
Processes a result element from the list returned by req_list() which
originated from a add_del_on_list operation.
Raises the appropriate exceptions if a failure occurred during the
operation.
"""
self._conn.check_fail_abort(result)
self._conn.process_result_add_del_on_list(result)
 
def process_result_add_on_nr(self, result):
"""
Processes a result element from the list returned by req_list() which
originated from a add_on_nr operation.
Raises the appropriate exceptions if a failure occurred during the
operation.
"""
self._conn.check_fail_abort(result)
self._conn.process_result_add_on_nr(result)
 
def process_result_test_and_set(self, result):
"""
Processes a result element from the list returned by req_list() which
originated from a test_and_set operation.
Raises the appropriate exceptions if a failure occurred during the
operation.
"""
self._conn.check_fail_abort(result)
self._conn.process_result_test_and_set(result)
 
def read(self, key):
"""
Read the value at key.
Beware: lists of (small) integers may be (falsely) returned as a string -
use str_to_list() to convert such strings.
"""
result = self._conn.callp('/api/tx.yaws', 'read', [key])
return self._conn.process_result_read(result)
 
def write(self, key, value):
"""
Write the value to key.
"""
value = self._conn.encode_value(value)
result = self._conn.callp('/api/tx.yaws', 'write', [key, value])
self._conn.check_fail_abort(result)
self._conn.process_result_write(result)
def add_del_on_list(self, key, to_add, to_remove):
"""
Changes the list stored at the given key, i.e. first adds all items in
to_add then removes all items in to_remove.
Both, to_add and to_remove, must be lists.
Assumes en empty list if no value exists at key.
"""
result = self._conn.callp('/api/tx.yaws', 'add_del_on_list', [key, to_add, to_remove])
self._conn.check_fail_abort(result)
self._conn.process_result_add_del_on_list(result)
def add_on_nr(self, key, to_add):
"""
Changes the number stored at the given key, i.e. adds some value.
Assumes 0 if no value exists at key.
"""
result = self._conn.callp('/api/tx.yaws', 'add_on_nr', [key, to_add])
self._conn.check_fail_abort(result)
self._conn.process_result_add_on_nr(result)
def test_and_set(self, key, old_value, new_value):
"""
Atomic test and set, i.e. if the old value at key is old_value, then
write new_value.
"""
old_value = self._conn.encode_value(old_value)
new_value = self._conn.encode_value(new_value)
result = self._conn.callp('/api/tx.yaws', 'test_and_set', [key, old_value, new_value])
self._conn.check_fail_abort(result)
self._conn.process_result_test_and_set(result)
 
def nop(self, value):
"""
No operation (may be used for measuring the JSON overhead).
"""
value = self._conn.encode_value(value)
result = self._conn.callp('/api/tx.yaws', 'nop', [value])
self._conn.process_result_nop(result)
def close_connection(self):
"""
Close the connection to scalaris
(it will automatically be re-opened on the next request).
"""
self._conn.close()
 
class Transaction(object):
"""
Write or read operations on scalaris inside a transaction.
"""
def __init__(self, conn = None):
"""
Create a new object using the given connection
"""
if conn is None:
conn = JSONConnection()
self._conn = conn
self._tlog = None
def new_req_list(self, other = None):
"""
Returns a new ReqList object allowing multiple parallel requests.
"""
return self._conn.new_req_list_t(other)
def req_list(self, reqlist):
"""
Issues multiple parallel requests to scalaris.
Request lists can be created using new_req_list().
The returned list has the following form:
[{'status': 'ok'} or {'status': 'ok', 'value': xxx} or
{'status': 'fail', 'reason': 'timeout' or 'abort' or 'not_found'}].
Elements of this list can be processed with process_result_read() and
process_result_write().
A commit (at the end of the request list) will be automatically checked
for its success.
"""
if self._tlog is None:
result = self._conn.callp('/api/tx.yaws', 'req_list', [reqlist.get_requests()])
else:
result = self._conn.callp('/api/tx.yaws', 'req_list', [self._tlog, reqlist.get_requests()])
(tlog, result) = self._conn.process_result_req_list_t(result)
self._tlog = tlog
if reqlist.is_commit():
self._process_result_commit(result[-1])
# transaction was successful: reset transaction log
self._tlog = None
return result
def process_result_read(self, result):
"""
Processes a result element from the list returned by req_list() which
originated from a read operation.
Returns the read value on success.
Raises the appropriate exceptions if a failure occurred during the
operation.
Beware: lists of (small) integers may be (falsely) returned as a string -
use str_to_list() to convert such strings.
"""
return self._conn.process_result_read(result)
 
def process_result_write(self, result):
"""
Processes a result element from the list returned by req_list() which
originated from a write operation.
Raises the appropriate exceptions if a failure occurred during the
operation.
"""
return self._conn.process_result_write(result)
 
def process_result_add_del_on_list(self, result):
"""
Processes a result element from the list returned by req_list() which
originated from a add_del_on_list operation.
Raises the appropriate exceptions if a failure occurred during the
operation.
"""
self._conn.process_result_add_del_on_list(result)
 
def process_result_add_on_nr(self, result):
"""
Processes a result element from the list returned by req_list() which
originated from a add_on_nr operation.
Raises the appropriate exceptions if a failure occurred during the
operation.
"""
self._conn.process_result_add_on_nr(result)
 
def process_result_test_and_set(self, result):
"""
Processes a result element from the list returned by req_list() which
originated from a test_and_set operation.
Raises the appropriate exceptions if a failure occurred during the
operation.
"""
self._conn.process_result_test_and_set(result)
 
def _process_result_commit(self, result):
"""
Processes a result element from the list returned by req_list() which
originated from a commit operation.
Raises the appropriate exceptions if a failure occurred during the
operation.
"""
return self._conn.process_result_commit(result)
def commit(self):
"""
Issues a commit operation to scalaris validating the previously
created operations inside the transaction.
"""
result = self.req_list(self.new_req_list().add_commit())[0]
self._process_result_commit(result)
# reset tlog (minor optimization which is not done in req_list):
self._tlog = None
def abort(self):
"""
Aborts all previously created operations inside the transaction.
"""
self._tlog = None
def read(self, key):
"""
Issues a read operation to scalaris, adds it to the current
transaction and returns the result.
Beware: lists of (small) integers may be (falsely) returned as a string -
use str_to_list() to convert such strings.
"""
result = self.req_list(self.new_req_list().add_read(key))[0]
return self.process_result_read(result)
def write(self, key, value):
"""
Issues a write operation to scalaris and adds it to the current
transaction.
"""
result = self.req_list(self.new_req_list().add_write(key, value))[0]
self.process_result_write(result)
def add_del_on_list(self, key, to_add, to_remove):
"""
Issues a add_del_on_list operation to scalaris and adds it to the
current transaction.
Changes the list stored at the given key, i.e. first adds all items in
to_add then removes all items in to_remove.
Both, to_add and to_remove, must be lists.
Assumes en empty list if no value exists at key.
"""
result = self.req_list(self.new_req_list().add_add_del_on_list(key, to_add, to_remove))[0]
self.process_result_add_del_on_list(result)
def add_on_nr(self, key, to_add):
"""
Issues a add_on_nr operation to scalaris and adds it to the
current transaction.
Changes the number stored at the given key, i.e. adds some value.
Assumes 0 if no value exists at key.
"""
result = self.req_list(self.new_req_list().add_add_on_nr(key, to_add))[0]
self.process_result_add_on_nr(result)
def test_and_set(self, key, old_value, new_value):
"""
Issues a test_and_set operation to scalaris and adds it to the
current transaction.
Atomic test and set, i.e. if the old value at key is old_value, then
write new_value.
"""
result = self.req_list(self.new_req_list().add_test_and_set(key, old_value, new_value))[0]
self.process_result_test_and_set(result)
 
def nop(self, value):
"""
No operation (may be used for measuring the JSON overhead).
"""
value = self._conn.encode_value(value)
result = self._conn.callp('/api/tx.yaws', 'nop', [value])
self._conn.process_result_nop(result)
def close_connection(self):
"""
Close the connection to scalaris
(it will automatically be re-opened on the next request).
"""
self._conn.close()
 
class _JSONReqList(object):
"""
Generic request list.
"""
def __init__(self, other = None):
"""
Create a new object using a JSON connection.
"""
self._requests = []
self._is_commit = False
if other is not None:
self.extend(other)
def add_read(self, key):
"""
Adds a read operation to the request list.
"""
if (self._is_commit):
raise RuntimeError("No further request supported after a commit!")
self._requests.append({'read': key})
return self
def add_write(self, key, value):
"""
Adds a write operation to the request list.
"""
if (self._is_commit):
raise RuntimeError("No further request supported after a commit!")
self._requests.append({'write': {key: JSONConnection.encode_value(value)}})
return self
def add_add_del_on_list(self, key, to_add, to_remove):
"""
Adds a add_del_on_list operation to the request list.
"""
if (self._is_commit):
raise RuntimeError("No further request supported after a commit!")
self._requests.append({'add_del_on_list': {'key': key, 'add': to_add, 'del': to_remove}})
return self
def add_add_on_nr(self, key, to_add):
"""
Adds a add_on_nr operation to the request list.
"""
if (self._is_commit):
raise RuntimeError("No further request supported after a commit!")
self._requests.append({'add_on_nr': {key: to_add}})
return self
def add_test_and_set(self, key, old_value, new_value):
"""
Adds a test_and_set operation to the request list.
"""
if (self._is_commit):
raise RuntimeError("No further request supported after a commit!")
self._requests.append({'test_and_set': {'key': key, 'old': old_value, 'new': new_value}})
return self
def add_commit(self):
"""
Adds a commit operation to the request list.
"""
if (self._is_commit):
raise RuntimeError("Only one commit per request list allowed!")
self._requests.append({'commit': ''})
self._is_commit = True
return self
def get_requests(self):
"""
Gets the collected requests.
"""
return self._requests
 
def is_commit(self):
"""
Returns whether the transactions contains a commit or not.
"""
return self._is_commit
 
def is_empty(self):
"""
Checks whether the request list is empty.
"""
return self._requests == []
 
def size(self):
"""
Gets the number of requests in the list.
"""
return len(self._requests)
 
def extend(self, other):
"""
Adds all requests of the other request list to the end of this list.
"""
self._requests.extend(other._requests)
return self
 
class _JSONReqListTransaction(_JSONReqList):
"""
Request list for use with Transaction.req_list().
"""
def __init__(self, other = None):
_JSONReqList.__init__(self, other)
 
class _JSONReqListTransactionSingleOp(_JSONReqList):
"""
Request list for use with TransactionSingleOp.req_list() which does not
support commits.
"""
def __init__(self, other = None):
_JSONReqList.__init__(self, other)
def add_commit(self):
"""
Adds a commit operation to the request list.
"""
raise RuntimeError("No commit allowed in TransactionSingleOp.req_list()!")
 
class PubSub(object):
"""
Publish and subscribe methods accessing scalaris' pubsub system
"""
def __init__(self, conn = None):
"""
Create a new object using the given connection.
"""
if conn is None:
conn = JSONConnection()
self._conn = conn
 
def publish(self, topic, content):
"""
Publishes content under topic.
"""
# note: do NOT encode the content, this is not decoded on the erlang side!
# (only strings are allowed anyway)
# content = self._conn.encode_value(content)
result = self._conn.callp('/api/pubsub.yaws', 'publish', [topic, content])
self._conn.process_result_publish(result)
 
def subscribe(self, topic, url):
"""
Subscribes url for topic.
"""
# note: do NOT encode the URL, this is not decoded on the erlang side!
# (only strings are allowed anyway)
# url = self._conn.encode_value(url)
result = self._conn.callp('/api/pubsub.yaws', 'subscribe', [topic, url])
self._conn.process_result_subscribe(result)
 
def unsubscribe(self, topic, url):
"""
Unsubscribes url from topic.
"""
# note: do NOT encode the URL, this is not decoded on the erlang side!
# (only strings are allowed anyway)
# url = self._conn.encode_value(url)
result = self._conn.callp('/api/pubsub.yaws', 'unsubscribe', [topic, url])
self._conn.process_result_unsubscribe(result)
 
def get_subscribers(self, topic):
"""
Gets the list of all subscribers to topic.
"""
result = self._conn.callp('/api/pubsub.yaws', 'get_subscribers', [topic])
return self._conn.process_result_get_subscribers(result)
 
def nop(self, value):
"""
No operation (may be used for measuring the JSON overhead).
"""
value = self._conn.encode_value(value)
result = self._conn.callp('/api/pubsub.yaws', 'nop', [value])
self._conn.process_result_nop(result)
def close_connection(self):
"""
Close the connection to scalaris
(it will automatically be re-opened on the next request).
"""
self._conn.close()
 
class ReplicatedDHT(object):
"""
Non-transactional operations on the replicated DHT of scalaris
"""
def __init__(self, conn = None):
"""
Create a new object using the given connection.
"""
if conn is None:
conn = JSONConnection()
self._conn = conn
 
# returns the number of successfully deleted items
# use get_last_delete_result() to get more details
def delete(self, key, timeout = 2000):
"""
Tries to delete the value at the given key.
WARNING: This function can lead to inconsistent data (e.g. deleted items
can re-appear). Also when re-creating an item the version before the
delete can re-appear.
"""
result = self._conn.callp('/api/rdht.yaws', 'delete', [key, timeout])
(success, ok, results) = self._conn.process_result_delete(result)
self._lastDeleteResult = results
if success == True:
return ok
elif success == 'timeout':
raise TimeoutError(result)
else:
raise UnknownError(result)
 
def get_last_delete_result(self):
"""
Returns the result of the last call to delete().
NOTE: This function traverses the result list returned by scalaris and
therefore takes some time to process. It is advised to store the returned
result object once generated.
"""
return self._conn.create_delete_result(self._lastDeleteResult)
 
def nop(self, value):
"""
No operation (may be used for measuring the JSON overhead).
"""
value = self._conn.encode_value(value)
result = self._conn.callp('/api/rdht.yaws', 'nop', [value])
self._conn.process_result_nop(result)
def close_connection(self):
"""
Close the connection to scalaris
(it will automatically be re-opened on the next request).
"""
self._conn.close()
 
class ScalarisVM(object):
"""
Provides methods to interact with a specific Scalaris (Erlang) VM.
"""
class GetInfoResult(object):
def __init__(self, scalarisVersion, erlangVersion, memTotal, uptime,
erlangNode, ip, port, yawsPort):
self.scalarisVersion = scalarisVersion
self.erlangVersion = erlangVersion
self.memTotal = memTotal
self.uptime = uptime
self.erlangNode = erlangNode
self.ip = ip
self.port = port
self.yawsPort = yawsPort
def __init__(self, conn = None):
"""
Create a new object using the given connection.
"""
if conn is None:
conn = JSONConnection()
self._conn = conn
 
def getVersion(self):
"""
Gets the version of the Scalaris VM of the current connection.
"""
result = self._conn.callp('/api/vm.yaws', 'get_version', [])
return self._conn.process_result_vm_get_version(result)
 
def getInfo(self):
"""
Gets some information about the VM and Scalaris.
"""
result = self._conn.callp('/api/vm.yaws', 'get_info', [])
return self._conn.process_result_vm_get_info(result)
 
def getNumberOfNodes(self):
"""
Gets the number of nodes in the Scalaris VM of the current connection.
"""
result = self._conn.callp('/api/vm.yaws', 'number_of_nodes', [])
return self._conn.process_result_vm_get_number_of_nodes(result)
 
def getNodes(self):
"""
Gets the names of the nodes in the Scalaris VM of the current connection.
"""
result = self._conn.callp('/api/vm.yaws', 'get_nodes', [])
return self._conn.process_result_vm_get_nodes(result)
 
def addNodes(self, number):
"""
Adds Scalaris nodes to the Scalaris VM of the current connection.
"""
result = self._conn.callp('/api/vm.yaws', 'add_nodes', [number])
return self._conn.process_result_vm_add_nodes(result)
 
def shutdownNode(self, name):
"""
Shuts down the given node (graceful leave) in the Scalaris VM of the current connection.
"""
result = self._conn.callp('/api/vm.yaws', 'shutdown_node', [name])
return self._conn.process_result_vm_delete_node(result)
 
def killNode(self, name):
"""
Kills the given node in the Scalaris VM of the current connection.
"""
result = self._conn.callp('/api/vm.yaws', 'kill_node', [name])
return self._conn.process_result_vm_delete_node(result)
 
def shutdownNodes(self, number):
"""
Shuts down the given number of nodes (graceful leave) in the Scalaris VM of the current connection.
"""
result = self._conn.callp('/api/vm.yaws', 'shutdown_nodes', [number])
return self._conn.process_result_vm_delete_nodes(result)
 
def killNodes(self, number):
"""
Kills the given number of nodes in the Scalaris VM of the current connection.
"""
result = self._conn.callp('/api/vm.yaws', 'kill_nodes', [number])
return self._conn.process_result_vm_delete_nodes(result)
 
def shutdownNodesByName(self, names):
"""
Shuts down the given nodes (graceful leave) in the Scalaris VM of the current connection.
"""
result = self._conn.callp('/api/vm.yaws', 'shutdown_nodes_by_name', [names])
return self._conn.process_result_vm_delete_nodes(result)
 
def killNodesByName(self, names):
"""
Kills the given nodes in the Scalaris VM of the current connection.
"""
result = self._conn.callp('/api/vm.yaws', 'kill_nodes_by_name', [names])
return self._conn.process_result_vm_delete_nodes(result)
 
def getOtherVMs(self, maxVMs):
"""
Retrieves additional nodes from the Scalaris VM of the current
connection for use as URLs in JSONConnection.
"""
if maxVMs <= 0:
raise ValueError("max must be an integer > 0")
result = self._conn.callp('/api/vm.yaws', 'get_other_vms', [maxVMs])
return self._conn.process_result_vm_get_other_vms(result)
 
def shutdownVM(self):
"""
Tells the Scalaris VM of the current connection to shut down gracefully.
"""
result = self._conn.callp('/api/vm.yaws', 'shutdown_vm', [])
return self._conn.process_result_vm_delete_vm(result)
 
def killVM(self):
"""
Kills the Scalaris VM of the current connection.
"""
result = self._conn.callp('/api/vm.yaws', 'kill_vm', [])
return self._conn.process_result_vm_delete_vm(result)
 
def nop(self, value):
"""
No operation (may be used for measuring the JSON overhead).
"""
value = self._conn.encode_value(value)
result = self._conn.callp('/api/pubsub.yaws', 'nop', [value])
self._conn.process_result_nop(result)
def close_connection(self):
"""
Close the connection to scalaris
(it will automatically be re-opened on the next request).
"""
self._conn.close()
 
def str_to_list(value):
"""
Converts a string to a list of integers.
If the expected value of a read operation is a list, the returned value
could be (mistakenly) a string if it is a list of integers.
"""
if (isinstance(value, str) or isinstance(value, unicode)):
chars = list(value)
return [ord(char) for char in chars]
else:
return value
Property changes:
Added: svn:executable
+ *