contrail
Rev 2394 | Blame | Compare with Previous | Last modification | View Log | RSS feed
"""
Created on November, 2011
This module contains internals of the ConPaaS MySQL Server. ConPaaS MySQL Server consists of several
nodes with different roles
* Manager node
* Agent node(s)
* Master
* Slave(s)
:platform: Linux, Debian
:synopsis: Internals of ConPaaS MySQL Servers.
:moduleauthor: Ales Cernivec <ales.cernivec@xlab.si>
"""
from threading import Thread
import time
import conpaas
from conpaas.core.http import HttpErrorResponse, HttpJsonResponse
from conpaas.core.log import create_logger
from conpaas.core.expose import expose
from conpaas.core.controller import Controller
from conpaas.services.mysql.agent import client
from conpaas.services.mysql.manager.config import Configuration, \
ManagerException,\
E_ARGS_UNEXPECTED, ServiceNode, E_UNKNOWN, \
E_ARGS_MISSING, E_STATE_ERROR, E_ARGS_INVALID
class MySQLManager(object):
"""
Initializes :py:attr:`config` using Config and sets :py:attr:`state` to :py:attr:`S_INIT`
:param conf: Configuration file.
:type conf: str
:type conf: boolean
"""
# TODO: move this in core !!!!
S_INIT = 'INIT'
S_PROLOGUE = 'PROLOGUE'
S_RUNNING = 'RUNNING'
S_ADAPTING = 'ADAPTING'
S_EPILOGUE = 'EPILOGUE'
S_STOPPED = 'STOPPED'
S_ERROR = 'ERROR'
def __init__(self, conf, **kwargs):
self.logger = create_logger(__name__)
self.logger.debug("Entering MySQLServerManager initialization")
self.controller = Controller(conf)
self.controller.generate_context('mysql')
self.logfile = conf.get('manager', 'LOG_FILE')
self.state = self.S_INIT
self.config = Configuration(conf)
self.logger.debug("Leaving MySQLServer initialization")
# The unique id that is used to start the master/slave
self.id = 0
@expose('POST')
def startup(self, kwargs):
''' Starts the service - it will start and configure a MySQL master '''
self.logger.debug("Entering MySQLServerManager startup")
if len(kwargs) != 0:
return HttpErrorResponse(ManagerException \
(E_ARGS_UNEXPECTED, \
kwargs.keys()).message)
if self.state != self.S_INIT and self.state != self.S_STOPPED:
return HttpErrorResponse(ManagerException(E_STATE_ERROR).message)
self.state = self.S_PROLOGUE
Thread(target=self._do_startup, args=[]).start()
return HttpJsonResponse({'state': self.S_PROLOGUE})
def _do_startup(self):
''' Starts up the service. The first node will be the MYSQL master.
The next nodes will be slaves to this master. '''
#TODO: Get any existing configuration (if the service was stopped and restarted)
self.logger.debug('do_startup: Going to request one new node')
#TODO: who generates the password? The frontend or te manager?
self.controller.update_context(dict(mysql_username='root', \
mysql_password='R00T'))
try:
node_instances = self.controller.create_nodes(1,
client.check_agent_process, self.config.AGENT_PORT)
except:
self.logger.exception('do_startup: Failed to request a new node')
self.state = self.S_STOPPED
return
self._start_master(node_instances)
self.config.addMySQLServiceNodes(node_instances, isMaster=True)
self.state = self.S_RUNNING
def _start_master(self, nodes):
for serviceNode in nodes:
try:
client.create_master(serviceNode.ip, self.config.AGENT_PORT,
self._get_server_id())
except client.AgentException:
self.logger.exception('Failed to start MySQL Master at node %s' % str(serviceNode))
self.state = self.S_ERROR
raise
def _start_slave(self, nodes, master):
for serviceNode in nodes:
try:
self.logger.debug('create_slave for master.ip = %s' % master)
client.create_slave(master.ip, self.config.AGENT_PORT, \
self._get_server_id(),\
serviceNode.ip, self.config.AGENT_PORT)
except client.AgentException:
self.logger.exception('Failed to start MySQL Slave at node %s' % str(serviceNode))
self.state = self.S_ERROR
raise
@expose('GET')
def list_nodes(self, kwargs):
"""
HTTP GET method.
Uses :py:meth:`IaaSClient.listVMs()` to get list of
all Service nodes. For each service node it gets it
checks if it is in servers list. If some of them are missing
they are removed from the list. Returns list of all service nodes.
:returns: HttpJsonResponse - JSON response with the list of services
:raises: HttpErrorResponse
"""
if len(kwargs) != 0:
return HttpErrorResponse(ManagerException(E_ARGS_UNEXPECTED, kwargs.keys()).message)
vms = self.controller.list_vms()
vms_mysql = self.config.getMySQLServiceNodes()
for vm in vms_mysql:
if not(vm.vmid in vms.keys()):
self.logger.debug('Removing instance ' + str(vm.vmid) + \
' since it is not in the list returned by the listVMs().')
self.config.removeMySQLServiceNode(vm.vmid)
_nodes = [ serviceNode.vmid for serviceNode in self.config.getMySQLServiceNodes() ]
return HttpJsonResponse({
'mysql': _nodes
})
@expose('GET')
def get_node_info(self, kwargs):
"""
HTTP GET method. Gets info of a specific node.
:param param: serviceNodeId is a VMID of an existing service node.
:type param: str
:returns: HttpJsonResponse - JSON response with details about the node.
:raises: ManagerException
"""
if 'serviceNodeId' not in kwargs:
return HttpErrorResponse(ManagerException(E_ARGS_MISSING, 'serviceNodeId').message)
serviceNodeId = kwargs.pop('serviceNodeId')
if len(kwargs) != 0:
return HttpErrorResponse(ManagerException(E_ARGS_UNEXPECTED, kwargs.keys()).message)
if serviceNodeId not in self.config.serviceNodes:
return HttpErrorResponse(ManagerException(E_ARGS_INVALID , \
"serviceNodeId" ,\
detail='Invalid "serviceNodeId"').message)
serviceNode = self.config.getMySQLNode(serviceNodeId)
return HttpJsonResponse({
'serviceNode': {
'id': serviceNode.vmid,
'ip': serviceNode.ip,
'isMaster': serviceNode.isMaster,
'isSlave': serviceNode.isSlave
}
})
@expose('POST')
def add_nodes(self, kwargs):
"""
HTTP POST method. Creates new node and adds it to the list of existing nodes in the manager. Makes internal call to :py:meth:`createServiceNodeThread`.
:param kwargs: number of nodes to add.
:type param: str
:returns: HttpJsonResponse - JSON response with details about the node.
:raises: ManagerException
"""
if self.state != self.S_RUNNING:
return HttpErrorResponse('ERROR: Wrong state to add_nodes')
if not 'count' in kwargs:
return HttpErrorResponse('ERROR: Required argument doesn\'t exist')
if not isinstance(kwargs['count'], int):
return HttpErrorResponse('ERROR: Expected an integer value for "count"')
count = int(kwargs.pop('count'))
self.state = self.S_ADAPTING
Thread(target=self._do_add_nodes, args=[count]).start()
return HttpJsonResponse()
# TODO: also specify the master for which to add slaves
def _do_add_nodes(self, count):
node_instances = self.controller.create_nodes(count, \
client.check_agent_process, self.config.AGENT_PORT)
#self.nodes += node_instances
# Get the master
masters = self.config.getMySQLmasters()
# Configure the nodes as slaves
#TODO: modify this when multiple masters
for master in masters:
self._start_slave(node_instances, master)
self.config.addMySQLServiceNodes(node_instances, isSlave=True)
self.state = self.S_RUNNING
def _get_server_id(self):
self.id = self.id + 1
return self.id
@expose('GET')
def get_service_performance(self, kwargs):
''' HTTP GET method. Placeholder for obtaining performance metrics.
:param kwargs: Additional parameters.
:type kwargs: dict
:returns: HttpJsonResponse -- returns metrics
'''
if len(kwargs) != 0:
return HttpErrorResponse(ManagerException(E_ARGS_UNEXPECTED, kwargs.keys()).message)
return HttpJsonResponse({
'request_rate': 0,
'error_rate': 0,
'throughput': 0,
'response_time': 0,
})
@expose('POST')
def remove_nodes(self, kwargs):
if self.state != self.S_RUNNING:
self.logger.debug('Wrong state to remove nodes')
return HttpErrorResponse('ERROR: Wrong state to remove_nodes')
if not 'count' in kwargs:
return HttpErrorResponse('ERROR: Required argument doesn\'t exist')
if not isinstance(kwargs['count'], int):
return HttpErrorResponse('ERROR: Expected an integer value for "count"')
count = int(kwargs.pop('count'))
if count > len(self.config.getMySQLslaves()):
return HttpErrorResponse('ERROR: Cannot remove so many nodes')
self.state = self.S_ADAPTING
Thread(target=self._do_remove_nodes, args=[count]).start()
return HttpJsonResponse()
def _do_remove_nodes(self, count):
nodes = self.config.getMySQLslaves()[:count]
self.controller.delete_nodes(nodes)
self.config.remove_nodes(nodes)
self.state = self.S_RUNNING
return HttpJsonResponse()
@expose('GET')
def get_service_info(self, kwargs):
if len(kwargs) != 0:
return HttpErrorResponse('ERROR: Arguments unexpected')
return HttpJsonResponse({'state': self.state, 'type': 'mysql'})
@expose('POST')
def shutdown(self, kwargs):
"""
HTTP POST method. Shuts down the manager service.
:returns: HttpJsonResponse - JSON response with details about the status of a manager node: . ManagerException if something went wrong.
:raises: ManagerException
"""
if len(kwargs) != 0:
return HttpErrorResponse(ManagerException(E_ARGS_UNEXPECTED, kwargs.keys()).message)
if self.state != self.S_RUNNING:
return HttpErrorResponse(ManagerException(E_STATE_ERROR).message)
self.state = self.S_EPILOGUE
Thread(target=self._do_shutdown, args=[]).start()
return HttpJsonResponse({'state': self.S_EPILOGUE})
def _do_shutdown(self):
''' Shuts down the service. '''
#self._stop_slaves( config.getProxyServiceNodes())
#self._stop_masters(config, config.getWebServiceNodes())
self.controller.delete_nodes(self.config.serviceNodes.values())
self.config.serviceNodes = {}
self.state = self.S_STOPPED
@expose('GET')
def getLog(self, kwargs):
if len(kwargs) != 0:
return HttpErrorResponse(ManagerException(ManagerException.E_ARGS_UNEXPECTED, kwargs.keys()).message)
try:
fd = open(self.logfile)
ret = ''
s = fd.read()
while s != '':
ret += s
s = fd.read()
if s != '':
ret += s
return HttpJsonResponse({'log': ret})
except:
return HttpErrorResponse('Failed to read log')