OW2 Consortium contrail

Rev

Rev 2394 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
2394 aaasz 1
"""
2
Created on November, 2011
3
 
4
   This module contains internals of the ConPaaS MySQL Server. ConPaaS MySQL Server consists of several
5
   nodes with different roles
6
 
7
     * Manager node
8
     * Agent node(s)
9
        * Master
10
        * Slave(s)
11
 
12
   :platform: Linux, Debian
13
   :synopsis: Internals of ConPaaS MySQL Servers.
14
   :moduleauthor: Ales Cernivec <ales.cernivec@xlab.si>
15
 
16
"""
17
 
18
from threading import Thread
19
import time
20
import conpaas
21
 
22
from conpaas.core.http import HttpErrorResponse, HttpJsonResponse
23
from conpaas.core.log import create_logger
24
from conpaas.core.expose import expose
25
from conpaas.core.controller import Controller
26
 
27
from conpaas.services.mysql.agent import client
28
from conpaas.services.mysql.manager.config import Configuration, \
29
                              ManagerException,\
30
                              E_ARGS_UNEXPECTED, ServiceNode, E_UNKNOWN, \
31
                              E_ARGS_MISSING, E_STATE_ERROR, E_ARGS_INVALID
32
 
33
class MySQLManager(object):
34
    """
35
    Initializes :py:attr:`config` using Config and sets :py:attr:`state` to :py:attr:`S_INIT`
36
 
37
    :param conf: Configuration file.
38
    :type conf: str
39
    :type conf: boolean
40
 
41
    """
42
 
43
    # TODO: move this in core !!!!
44
    S_INIT = 'INIT'
45
    S_PROLOGUE = 'PROLOGUE'
46
    S_RUNNING = 'RUNNING'
47
    S_ADAPTING = 'ADAPTING'
48
    S_EPILOGUE = 'EPILOGUE'
49
    S_STOPPED = 'STOPPED'
50
    S_ERROR = 'ERROR'
51
 
52
    def __init__(self, conf, **kwargs):
53
        self.logger = create_logger(__name__)
54
        self.logger.debug("Entering MySQLServerManager initialization")
55
        self.controller = Controller(conf)
56
        self.controller.generate_context('mysql')
57
        self.logfile = conf.get('manager', 'LOG_FILE')
58
        self.state = self.S_INIT
59
        self.config = Configuration(conf)
60
        self.logger.debug("Leaving MySQLServer initialization")
61
 
62
        # The unique id that is used to start the master/slave
63
        self.id = 0
64
 
65
    @expose('POST')
66
    def startup(self, kwargs):
67
        ''' Starts the service - it will start and configure a MySQL master '''
68
 
69
        self.logger.debug("Entering MySQLServerManager startup")
70
        if len(kwargs) != 0:
71
            return HttpErrorResponse(ManagerException \
72
                                      (E_ARGS_UNEXPECTED, \
73
                                       kwargs.keys()).message)
74
 
75
        if self.state != self.S_INIT and self.state != self.S_STOPPED:
76
            return HttpErrorResponse(ManagerException(E_STATE_ERROR).message)
77
 
78
        self.state = self.S_PROLOGUE
79
        Thread(target=self._do_startup, args=[]).start()
80
        return HttpJsonResponse({'state': self.S_PROLOGUE})
81
 
82
    def _do_startup(self):
83
        ''' Starts up the service. The first node will be the MYSQL master.
84
            The next nodes will be slaves to this master. '''
85
 
86
        #TODO: Get any existing configuration (if the service was stopped and restarted)
87
        self.logger.debug('do_startup: Going to request one new node')
2418 aaasz 88
        #TODO: who generates the password? The frontend or te manager?
89
        self.controller.update_context(dict(mysql_username='root', \
90
			                    mysql_password='R00T'))
2394 aaasz 91
        try:
92
            node_instances = self.controller.create_nodes(1,
93
                                                    client.check_agent_process, self.config.AGENT_PORT)
94
        except:
95
            self.logger.exception('do_startup: Failed to request a new node')
96
            self.state = self.S_STOPPED
97
            return
98
        self._start_master(node_instances)
99
        self.config.addMySQLServiceNodes(node_instances, isMaster=True)
100
        self.state = self.S_RUNNING
101
 
102
    def _start_master(self, nodes):
103
        for serviceNode in nodes:
104
            try:
105
                client.create_master(serviceNode.ip, self.config.AGENT_PORT,
106
                                    self._get_server_id())
107
            except client.AgentException:
108
                self.logger.exception('Failed to start MySQL Master at node %s' % str(serviceNode))
109
                self.state = self.S_ERROR
110
                raise
111
 
112
    def _start_slave(self, nodes, master):
113
        for serviceNode in nodes:
114
            try:
115
                self.logger.debug('create_slave for master.ip  = %s' % master)
116
                client.create_slave(master.ip, self.config.AGENT_PORT, \
117
				    self._get_server_id(),\
118
				    serviceNode.ip, self.config.AGENT_PORT)
119
            except client.AgentException:
120
                self.logger.exception('Failed to start MySQL Slave at node %s' % str(serviceNode))
121
                self.state = self.S_ERROR
122
                raise
123
 
124
    @expose('GET')
125
    def list_nodes(self, kwargs):
126
        """
127
        HTTP GET method.
128
        Uses :py:meth:`IaaSClient.listVMs()` to get list of
129
        all Service nodes. For each service node it gets it
130
        checks if it is in servers list. If some of them are missing
131
        they are removed from the list. Returns list of all service nodes.
132
 
133
        :returns: HttpJsonResponse - JSON response with the list of services
134
        :raises: HttpErrorResponse
135
 
136
        """
137
        if len(kwargs) != 0:
138
            return HttpErrorResponse(ManagerException(E_ARGS_UNEXPECTED, kwargs.keys()).message)
139
        vms = self.controller.list_vms()
140
        vms_mysql = self.config.getMySQLServiceNodes()
141
        for vm in vms_mysql:
142
            if not(vm.vmid in vms.keys()):
143
                self.logger.debug('Removing instance ' + str(vm.vmid) + \
144
                                  ' since it is not in the list returned by the listVMs().')
145
                self.config.removeMySQLServiceNode(vm.vmid)
146
        _nodes = [ serviceNode.vmid for serviceNode in self.config.getMySQLServiceNodes() ]
147
 
148
        return HttpJsonResponse({
149
            'mysql': _nodes
150
            })
151
 
152
    @expose('GET')
153
    def get_node_info(self, kwargs):
154
        """
155
        HTTP GET method. Gets info of a specific node.
156
 
157
        :param param: serviceNodeId is a VMID of an existing service node.
158
        :type param: str
159
        :returns: HttpJsonResponse - JSON response with details about the node.
160
        :raises: ManagerException
161
 
162
        """
163
        if 'serviceNodeId' not in kwargs:
164
            return HttpErrorResponse(ManagerException(E_ARGS_MISSING, 'serviceNodeId').message)
165
        serviceNodeId = kwargs.pop('serviceNodeId')
166
        if len(kwargs) != 0:
167
            return HttpErrorResponse(ManagerException(E_ARGS_UNEXPECTED, kwargs.keys()).message)
168
        if serviceNodeId not in self.config.serviceNodes:
169
            return HttpErrorResponse(ManagerException(E_ARGS_INVALID , \
170
                                                      "serviceNodeId" ,\
171
                                                      detail='Invalid "serviceNodeId"').message)
172
        serviceNode = self.config.getMySQLNode(serviceNodeId)
173
        return HttpJsonResponse({
174
            'serviceNode': {
175
                            'id': serviceNode.vmid,
176
                            'ip': serviceNode.ip,
177
                            'isMaster': serviceNode.isMaster,
178
                            'isSlave': serviceNode.isSlave
179
                            }
180
            })
181
 
182
    @expose('POST')
183
    def add_nodes(self, kwargs):
184
        """
185
        HTTP POST method. Creates new node and adds it to the list of existing nodes in the manager. Makes internal call to :py:meth:`createServiceNodeThread`.
186
 
187
        :param kwargs: number of nodes to add.
188
        :type param: str
189
        :returns: HttpJsonResponse - JSON response with details about the node.
190
        :raises: ManagerException
191
 
192
        """
193
 
194
        if self.state != self.S_RUNNING:
195
            return HttpErrorResponse('ERROR: Wrong state to add_nodes')
196
        if not 'count' in kwargs:
197
            return HttpErrorResponse('ERROR: Required argument doesn\'t exist')
198
        if not isinstance(kwargs['count'], int):
199
            return HttpErrorResponse('ERROR: Expected an integer value for "count"')
200
        count = int(kwargs.pop('count'))
201
        self.state = self.S_ADAPTING
202
        Thread(target=self._do_add_nodes, args=[count]).start()
203
        return HttpJsonResponse()
204
 
205
    # TODO: also specify the master for which to add slaves
206
    def _do_add_nodes(self, count):
207
        node_instances = self.controller.create_nodes(count, \
208
                                           client.check_agent_process, self.config.AGENT_PORT)
209
        #self.nodes += node_instances
210
        # Get the master
211
        masters = self.config.getMySQLmasters()
212
        # Configure the nodes as slaves
213
 
214
        #TODO: modify this when multiple masters
215
	for master in masters:
216
            self._start_slave(node_instances, master)
217
        self.config.addMySQLServiceNodes(node_instances, isSlave=True)
218
        self.state = self.S_RUNNING
219
 
220
    def _get_server_id(self):
221
        self.id = self.id + 1
222
        return self.id
223
 
224
    @expose('GET')
225
    def get_service_performance(self, kwargs):
226
        ''' HTTP GET method. Placeholder for obtaining performance metrics.
227
 
228
        :param kwargs: Additional parameters.
229
        :type kwargs: dict
230
        :returns:  HttpJsonResponse -- returns metrics
231
 
232
        '''
233
 
234
        if len(kwargs) != 0:
235
            return HttpErrorResponse(ManagerException(E_ARGS_UNEXPECTED, kwargs.keys()).message)
236
        return HttpJsonResponse({
237
                'request_rate': 0,
238
                'error_rate': 0,
239
                'throughput': 0,
240
                'response_time': 0,
241
        })
242
 
243
    @expose('POST')
244
    def remove_nodes(self, kwargs):
245
        if self.state != self.S_RUNNING:
246
            self.logger.debug('Wrong state to remove nodes')
247
            return HttpErrorResponse('ERROR: Wrong state to remove_nodes')
248
        if not 'count' in kwargs:
249
            return HttpErrorResponse('ERROR: Required argument doesn\'t exist')
250
        if not isinstance(kwargs['count'], int):
251
            return HttpErrorResponse('ERROR: Expected an integer value for "count"')
252
        count = int(kwargs.pop('count'))
253
        if count > len(self.config.getMySQLslaves()):
254
            return HttpErrorResponse('ERROR: Cannot remove so many nodes')
255
        self.state = self.S_ADAPTING
256
        Thread(target=self._do_remove_nodes, args=[count]).start()
257
        return HttpJsonResponse()
258
 
259
    def _do_remove_nodes(self, count):
260
	nodes = self.config.getMySQLslaves()[:count]
261
        self.controller.delete_nodes(nodes)
262
	self.config.remove_nodes(nodes)
263
        self.state = self.S_RUNNING
264
        return HttpJsonResponse()
265
 
266
    @expose('GET')
267
    def get_service_info(self, kwargs):
268
        if len(kwargs) != 0:
269
            return HttpErrorResponse('ERROR: Arguments unexpected')
270
        return HttpJsonResponse({'state': self.state, 'type': 'mysql'})
271
 
272
    @expose('POST')
273
    def shutdown(self, kwargs):
274
        """
275
        HTTP POST method. Shuts down the manager service.
276
 
277
        :returns: HttpJsonResponse - JSON response with details about the status of a manager node: . ManagerException if something went wrong.
278
        :raises: ManagerException
279
 
280
        """
281
        if len(kwargs) != 0:
282
            return HttpErrorResponse(ManagerException(E_ARGS_UNEXPECTED, kwargs.keys()).message)
283
 
284
        if self.state != self.S_RUNNING:
285
            return HttpErrorResponse(ManagerException(E_STATE_ERROR).message)
286
 
287
        self.state = self.S_EPILOGUE
288
        Thread(target=self._do_shutdown, args=[]).start()
289
        return HttpJsonResponse({'state': self.S_EPILOGUE})
290
 
291
 
292
    def _do_shutdown(self):
293
        ''' Shuts down the service. '''
294
        #self._stop_slaves( config.getProxyServiceNodes())
295
        #self._stop_masters(config, config.getWebServiceNodes())
296
        self.controller.delete_nodes(self.config.serviceNodes.values())
297
        self.config.serviceNodes = {}
298
        self.state = self.S_STOPPED
299
 
300
    @expose('GET')
301
    def getLog(self, kwargs):
302
        if len(kwargs) != 0:
303
            return HttpErrorResponse(ManagerException(ManagerException.E_ARGS_UNEXPECTED, kwargs.keys()).message)
304
        try:
305
            fd = open(self.logfile)
306
            ret = ''
307
            s = fd.read()
308
            while s != '':
309
                 ret += s
310
                 s = fd.read()
311
                 if s != '':
312
                     ret += s
313
            return HttpJsonResponse({'log': ret})
314
        except:
315
            return HttpErrorResponse('Failed to read log')
316