OW2 Consortium contrail

Rev

Rev 2394 | Blame | Compare with Previous | Last modification | View Log | RSS feed

"""
Created on November, 2011

   This module contains internals of the ConPaaS MySQL Server. ConPaaS MySQL Server consists of several
   nodes with different roles
     
     * Manager node
     * Agent node(s)
        * Master
        * Slave(s)

   :platform: Linux, Debian
   :synopsis: Internals of ConPaaS MySQL Servers.
   :moduleauthor: Ales Cernivec <ales.cernivec@xlab.si> 

"""

from threading import Thread
import time
import conpaas

from conpaas.core.http import HttpErrorResponse, HttpJsonResponse
from conpaas.core.log import create_logger
from conpaas.core.expose import expose
from conpaas.core.controller import Controller

from conpaas.services.mysql.agent import client
from conpaas.services.mysql.manager.config import Configuration, \
                              ManagerException,\
                              E_ARGS_UNEXPECTED, ServiceNode, E_UNKNOWN, \
                              E_ARGS_MISSING, E_STATE_ERROR, E_ARGS_INVALID

class MySQLManager(object):
    """
    Initializes :py:attr:`config` using Config and sets :py:attr:`state` to :py:attr:`S_INIT`

    :param conf: Configuration file.
    :type conf: str    
    :type conf: boolean

    """

    # TODO: move this in core !!!!
    S_INIT = 'INIT'
    S_PROLOGUE = 'PROLOGUE'
    S_RUNNING = 'RUNNING'
    S_ADAPTING = 'ADAPTING'
    S_EPILOGUE = 'EPILOGUE'
    S_STOPPED = 'STOPPED'
    S_ERROR = 'ERROR'

    def __init__(self, conf, **kwargs):        
        self.logger = create_logger(__name__)
        self.logger.debug("Entering MySQLServerManager initialization")
        self.controller = Controller(conf)
        self.controller.generate_context('mysql')
        self.logfile = conf.get('manager', 'LOG_FILE')
        self.state = self.S_INIT
        self.config = Configuration(conf)         
        self.logger.debug("Leaving MySQLServer initialization")
 
        # The unique id that is used to start the master/slave
        self.id = 0

    @expose('POST')
    def startup(self, kwargs):
        ''' Starts the service - it will start and configure a MySQL master '''

        self.logger.debug("Entering MySQLServerManager startup")
        if len(kwargs) != 0:
            return HttpErrorResponse(ManagerException \
                                      (E_ARGS_UNEXPECTED, \
                                       kwargs.keys()).message)

        if self.state != self.S_INIT and self.state != self.S_STOPPED:
            return HttpErrorResponse(ManagerException(E_STATE_ERROR).message)

        self.state = self.S_PROLOGUE
        Thread(target=self._do_startup, args=[]).start()
        return HttpJsonResponse({'state': self.S_PROLOGUE})

    def _do_startup(self):
        ''' Starts up the service. The first node will be the MYSQL master. 
            The next nodes will be slaves to this master. '''

        #TODO: Get any existing configuration (if the service was stopped and restarted)
        self.logger.debug('do_startup: Going to request one new node')
        #TODO: who generates the password? The frontend or te manager? 
        self.controller.update_context(dict(mysql_username='root', \
                                            mysql_password='R00T'))
        try:
            node_instances = self.controller.create_nodes(1,
                                                    client.check_agent_process, self.config.AGENT_PORT)
        except:
            self.logger.exception('do_startup: Failed to request a new node')
            self.state = self.S_STOPPED
            return
        self._start_master(node_instances)
        self.config.addMySQLServiceNodes(node_instances, isMaster=True)
        self.state = self.S_RUNNING

    def _start_master(self, nodes):
        for serviceNode in nodes:
            try:
                client.create_master(serviceNode.ip, self.config.AGENT_PORT,
                                    self._get_server_id())
            except client.AgentException:
                self.logger.exception('Failed to start MySQL Master at node %s' % str(serviceNode))
                self.state = self.S_ERROR
                raise

    def _start_slave(self, nodes, master):
        for serviceNode in nodes:
            try:
                self.logger.debug('create_slave for master.ip  = %s' % master)
                client.create_slave(master.ip, self.config.AGENT_PORT, \
                                    self._get_server_id(),\
                                    serviceNode.ip, self.config.AGENT_PORT)
            except client.AgentException:
                self.logger.exception('Failed to start MySQL Slave at node %s' % str(serviceNode))
                self.state = self.S_ERROR
                raise

    @expose('GET')
    def list_nodes(self, kwargs):
        """
        HTTP GET method.
        Uses :py:meth:`IaaSClient.listVMs()` to get list of 
        all Service nodes. For each service node it gets it 
        checks if it is in servers list. If some of them are missing 
        they are removed from the list. Returns list of all service nodes.

        :returns: HttpJsonResponse - JSON response with the list of services
        :raises: HttpErrorResponse

        """
        if len(kwargs) != 0:
            return HttpErrorResponse(ManagerException(E_ARGS_UNEXPECTED, kwargs.keys()).message)
        vms = self.controller.list_vms()
        vms_mysql = self.config.getMySQLServiceNodes()
        for vm in vms_mysql:
            if not(vm.vmid in vms.keys()):
                self.logger.debug('Removing instance ' + str(vm.vmid) + \
                                  ' since it is not in the list returned by the listVMs().')
                self.config.removeMySQLServiceNode(vm.vmid)
        _nodes = [ serviceNode.vmid for serviceNode in self.config.getMySQLServiceNodes() ]

        return HttpJsonResponse({
            'mysql': _nodes
            })

    @expose('GET')
    def get_node_info(self, kwargs):
        """
        HTTP GET method. Gets info of a specific node.

        :param param: serviceNodeId is a VMID of an existing service node.
        :type param: str
        :returns: HttpJsonResponse - JSON response with details about the node.
        :raises: ManagerException

        """
        if 'serviceNodeId' not in kwargs: 
            return HttpErrorResponse(ManagerException(E_ARGS_MISSING, 'serviceNodeId').message)
        serviceNodeId = kwargs.pop('serviceNodeId')
        if len(kwargs) != 0:
            return HttpErrorResponse(ManagerException(E_ARGS_UNEXPECTED, kwargs.keys()).message)
        if serviceNodeId not in self.config.serviceNodes:
            return HttpErrorResponse(ManagerException(E_ARGS_INVALID , \
                                                      "serviceNodeId" ,\
                                                      detail='Invalid "serviceNodeId"').message)
        serviceNode = self.config.getMySQLNode(serviceNodeId)
        return HttpJsonResponse({
            'serviceNode': {
                            'id': serviceNode.vmid,
                            'ip': serviceNode.ip,
                            'isMaster': serviceNode.isMaster,
                            'isSlave': serviceNode.isSlave
                            }
            })

    @expose('POST')
    def add_nodes(self, kwargs):
        """
        HTTP POST method. Creates new node and adds it to the list of existing nodes in the manager. Makes internal call to :py:meth:`createServiceNodeThread`.

        :param kwargs: number of nodes to add.
        :type param: str
        :returns: HttpJsonResponse - JSON response with details about the node.
        :raises: ManagerException

        """ 

        if self.state != self.S_RUNNING:
            return HttpErrorResponse('ERROR: Wrong state to add_nodes')
        if not 'count' in kwargs:
            return HttpErrorResponse('ERROR: Required argument doesn\'t exist')
        if not isinstance(kwargs['count'], int):
            return HttpErrorResponse('ERROR: Expected an integer value for "count"')
        count = int(kwargs.pop('count'))
        self.state = self.S_ADAPTING
        Thread(target=self._do_add_nodes, args=[count]).start()
        return HttpJsonResponse()

    # TODO: also specify the master for which to add slaves
    def _do_add_nodes(self, count):
        node_instances = self.controller.create_nodes(count, \
                                           client.check_agent_process, self.config.AGENT_PORT)
        #self.nodes += node_instances
        # Get the master
        masters = self.config.getMySQLmasters()
        # Configure the nodes as slaves

        #TODO: modify this when multiple masters
        for master in masters: 
            self._start_slave(node_instances, master)
        self.config.addMySQLServiceNodes(node_instances, isSlave=True)
        self.state = self.S_RUNNING

    def _get_server_id(self):
        self.id = self.id + 1
        return self.id

    @expose('GET')
    def get_service_performance(self, kwargs):
        ''' HTTP GET method. Placeholder for obtaining performance metrics.

        :param kwargs: Additional parameters.
        :type kwargs: dict 
        :returns:  HttpJsonResponse -- returns metrics

        '''

        if len(kwargs) != 0:
            return HttpErrorResponse(ManagerException(E_ARGS_UNEXPECTED, kwargs.keys()).message)
        return HttpJsonResponse({
                'request_rate': 0,
                'error_rate': 0,
                'throughput': 0,
                'response_time': 0,
        })

    @expose('POST')
    def remove_nodes(self, kwargs):
        if self.state != self.S_RUNNING:
            self.logger.debug('Wrong state to remove nodes')
            return HttpErrorResponse('ERROR: Wrong state to remove_nodes')
        if not 'count' in kwargs:
            return HttpErrorResponse('ERROR: Required argument doesn\'t exist')
        if not isinstance(kwargs['count'], int):
            return HttpErrorResponse('ERROR: Expected an integer value for "count"')
        count = int(kwargs.pop('count'))
        if count > len(self.config.getMySQLslaves()):
            return HttpErrorResponse('ERROR: Cannot remove so many nodes')
        self.state = self.S_ADAPTING
        Thread(target=self._do_remove_nodes, args=[count]).start()
        return HttpJsonResponse()

    def _do_remove_nodes(self, count):
        nodes = self.config.getMySQLslaves()[:count]
        self.controller.delete_nodes(nodes)
        self.config.remove_nodes(nodes)
        self.state = self.S_RUNNING
        return HttpJsonResponse()

    @expose('GET')
    def get_service_info(self, kwargs):
        if len(kwargs) != 0:
            return HttpErrorResponse('ERROR: Arguments unexpected')
        return HttpJsonResponse({'state': self.state, 'type': 'mysql'})

    @expose('POST')
    def shutdown(self, kwargs):
        """
        HTTP POST method. Shuts down the manager service. 

        :returns: HttpJsonResponse - JSON response with details about the status of a manager node: . ManagerException if something went wrong.
        :raises: ManagerException

        """ 
        if len(kwargs) != 0:
            return HttpErrorResponse(ManagerException(E_ARGS_UNEXPECTED, kwargs.keys()).message)
    
        if self.state != self.S_RUNNING:
            return HttpErrorResponse(ManagerException(E_STATE_ERROR).message)

        self.state = self.S_EPILOGUE
        Thread(target=self._do_shutdown, args=[]).start()
        return HttpJsonResponse({'state': self.S_EPILOGUE})


    def _do_shutdown(self):
        ''' Shuts down the service. '''
        #self._stop_slaves( config.getProxyServiceNodes())
        #self._stop_masters(config, config.getWebServiceNodes())
        self.controller.delete_nodes(self.config.serviceNodes.values())
        self.config.serviceNodes = {}
        self.state = self.S_STOPPED

    @expose('GET')
    def getLog(self, kwargs):
        if len(kwargs) != 0:
            return HttpErrorResponse(ManagerException(ManagerException.E_ARGS_UNEXPECTED, kwargs.keys()).message)
        try:
            fd = open(self.logfile)
            ret = ''
            s = fd.read()
            while s != '':
                 ret += s
                 s = fd.read()
                 if s != '':
                     ret += s
            return HttpJsonResponse({'log': ret})
        except:
            return HttpErrorResponse('Failed to read log')