Coverage for janitoo.server : 46%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
# -*- coding: utf-8 -*-
- we must add a method to reload a thread (a key from entry point) : install new thread, update a thread
"""
This file is part of Janitoo.
Janitoo is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
Janitoo is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with Janitoo. If not, see <http://www.gnu.org/licenses/>.
"""
# Set default logging handler to avoid "No handler found" warnings.
"""The Janitoo base Server
""" """Init the server. Must be called at the begin of the children class. """ #Need more tests
""" """ except: pass
"""Start the server. Must be called at the end of the children class. """
"""Before enterig the loop """ try: self.loop_sleep = int(loop_sleep) except: logger.info("[%s] - Can't set loop_sleep from configuration file. Using default valuse %s", self.__class__.__name__, self.loop_sleep) try: self.gc_delay = int(gc_delay) except: logger.info("[%s] - Can't set gc_delay from configuration file. Using default valuse %s", self.__class__.__name__, self.gc_delay) self.gc_next_run = datetime.datetime.now() + datetime.timedelta(seconds=self.gc_delay) try: self.slow_start = int(slow_start) except: logger.info("[%s] - Can't set slow_start from configuration file. Using default valuse %s", self.__class__.__name__, self.slow_start) except: logger.exception("[%s] - Exception when loading thread from entry_point : %s", self.__class__.__name__, entry.name) except: logger.exception("[%s] - Exception when loading thread from entry_point : %s", self.__class__.__name__, entry.name) logger.error("[%s] - Can't find a thread to launch in the config file", self.__class__.__name__) raise JanitooException(message="Can't find a thread to launch in the config file")
"""After the loop """
"""Run the loop """ gc.collect() self.gc_next_run = datetime.datetime.now() + datetime.timedelta(seconds=self.gc_delay)
"""Stop the server. Must be called at begin if overloaded in the children class """
"""Reload the threads """
"""Find a thread using its oid (section) """ return None logger.error("[%s] - Found multiple threads with same section %s : %s", self.__class__.__name__, section, ths) return None
"""Reload the server """ #~ self.stop() #~ while len(self._threads)>0: #~ self._stopevent.wait(self.loop_sleep*10) #~ self._stopevent.wait(1.5) #~ self.start()
"""Flush the server's data to disk """ pass
"""Needed to publish static files """ return resource_filename(Requirement.parse(self.get_package_name().split('.')[0]), path)
"""Return the name of the package. Needed to publish static files
**MUST** be copy paste in every extension that publish statics files """ return __package__
"""#DEPRECATED Return the egg path of the module. Must be redefined in server class. """ raise JanitooNotImplemented('_get_egg_path not implemnted')
"""Catch SIGTERM signal """ print('TERM signal received : %s' % (signal)) self.stop() #~ sys.exit(0)
"""Catch SIGHUP signal """ print('HUP signal received : %s' % (signal)) logger.warning('[%s] - HUP signal received : %s', self.__class__.__name__, signal) self.reload() #~ sys.exit(0)
"""Catch SIGUSR1 signal The server must flush its data to disk The mosquitto broker use it to persist its database to disk. """ logger.warning('[%s] - USR1 signal received : %s', self.__class__.__name__, signal) #~ sys.exit(0)
############################################################## #Check that we are in sync with the official command classes #Must be implemented for non-regression
##############################################################
"""A node dedicated for a special thread/server like the the DHCP server or the listener thread in the webapp """ self.mqtt_controller = None self._controller = None self.heartbeat_controller_timer = None self._requests = {'request_info_nodes' : self.request_info_nodes, 'request_info_users' : self.request_info_users, 'request_info_configs' : self.request_info_configs, 'request_info_systems' : self.request_info_systems, 'request_info_basics' : self.request_info_basics, 'request_info_commands' : self.request_info_commands } self.uuid = self.options.get_option(self.section, 'uuid') self.uuid = muuid.uuid1() self.options.set_option(self.section, 'uuid', self.uuid) self.polls = {}
"""Stop the controller timer """ if self.heartbeat_controller_timer is not None: self.heartbeat_controller_timer.cancel() self.heartbeat_controller_timer = None
"""Start the controller tier """ self.stop_controller_timer() self.heartbeat_controller_timer = threading.Timer(self._controller.heartbeat+5, self.heartbeat_controller) self.heartbeat_controller_timer.start()
"""Stop the controller """ logger.info("[%s] - Stop the controller", self.__class__.__name__) if self.mqtt_controller is not None: self.mqtt_controller.unsubscribe(topic=TOPIC_NODES_REQUEST%(self._controller.hadd)) self.mqtt_controller.stop() if self.mqtt_controller.is_alive(): try: self.mqtt_controller.join() except: logger.exception("[%s] - Catched exception", self.__class__.__name__) self.mqtt_controller = None self.mqtt_controller = None
"""Start the controller """ logger.info("[%s] - Start the controller", self.__class__.__name__) cmd_classes = kwargs.pop('cmd_classes', []) if not COMMAND_CONTROLLER in cmd_classes: cmd_classes.append(COMMAND_CONTROLLER) self._controller.add_internal_system_values() self._controller.add_internal_config_values() self._controller.hadd_get(section, None) self._controller.load_system_from_local() self.mqtt_controller = MQTTClient(options=options.data) self.mqtt_controller.connect() logger.debug(u"[%s] - Subscribe to topic %s", self.__class__.__name__, TOPIC_NODES_REQUEST%(self._controller.hadd)) self.mqtt_controller.subscribe(topic=TOPIC_NODES_REQUEST%(self._controller.hadd), callback=self.on_controller_request) self.mqtt_controller.start()
"""Start the controller """ pass
"""Send a add_ctrl:-1 heartbeat message. It will ping all devices managed by this controller. """ logger.debug(u"[%s] - Send heartbeat for controller", self.__class__.__name__) if self.heartbeat_controller_timer is not None: #The manager is started self.heartbeat_controller_timer.cancel() if self.mqtt_controller is not None: self.heartbeat_controller_timer = threading.Timer(self._controller.heartbeat, self.heartbeat_controller) self.heartbeat_controller_timer.start() if self._controller.hadd is not None and self.mqtt_controller is not None: #~ print self.nodes[node].hadd add_ctrl, add_node = self._controller.split_hadd() values = [ k for k in self._controller.values if self._controller.values[k].is_polled ] for value in values: self.publish_poll(self.mqtt_controller, self._controller.values[value])
""" """ if value.uuid in self.polls: #~ value.is_polled= False del self.polls[value.uuid]
""" """ if value.poll_delay == 0: self.remove_poll(value) return if value.uuid not in self.polls or timeout: if timeout is None: timeout = self.config_timeout self.polls[value.uuid] = {'next_run':datetime.datetime.now()+datetime.timedelta(seconds=timeout), 'value':value} else: self.polls[value.uuid]['next_run'] = datetime.datetime.now()+datetime.timedelta(seconds=value.poll_delay) value.is_polled= True
""" """ node = self._controller genres = {1:'basic', 2:'user', 3:'config', } if value.genre in genres: else: genre = "user" mqttc.publish_value(node.hadd, value, genre)
"""Return the controller hadd""" if self._controller is None: return None return self._controller.hadd
"""Return the controller""" return self._controller
"""On request
:param client: the Client instance that is calling the callback. :type client: paho.mqtt.client.Client :param userdata: user data of any type and can be set when creating a new client instance or with user_data_set(userdata). :type userdata: all :param message: The message variable is a MQTTMessage that describes all of the message parameters. :type message: paho.mqtt.client.MQTTMessage """ logger.debug("on_request receive message %s", message.payload) try: data = json_loads(message.payload) #~ print data['uuid'] #We should check what value is requested #{'hadd', 'cmd_class', 'type'='list', 'genre'='0x04', 'data'='node|value|config', 'uuid'='request_info'} if data['cmd_class'] == COMMAND_DISCOVERY: if data['genre'] == 0x04: if data['uuid'] in self._requests: resp = {} resp.update(data) try: if message.topic.find('broadcast') != -1: topic = "/broadcast/reply/%s" % data['reply_hadd'] self._requests[data['uuid']](topic, resp) else: topic = "/nodes/%s/reply" % data['reply_hadd'] self._requests[data['uuid']](topic, resp) return except: logger.exception(u"[%s] - Exception when running on_request method", self.__class__.__name__) return except: logger.exception("[%s] - Exception in on_request", self.__class__.__name__)
""" """ resp['data'] = self._controller.to_dict() msg = json_dumps(resp) self.publish_request(reply_topic, msg)
""" """ resp['data'] = {} for kvalue in self._controller.values.keys(): value = self._controller.values[kvalue] if value.genre == 0x02: if value.hadd not in resp['data']: resp['data'][value.hadd] = {} resp['data'][value.hadd][value.uuid] = value.to_dict() msg = json_dumps(resp) self.publish_request(reply_topic, msg)
""" """ resp['data'] = {} for kvalue in self._controller.values.keys(): value = self._controller.values[kvalue] if value.genre == 0x03: if value.hadd not in resp['data']: resp['data'][value.hadd] = {} resp['data'][value.hadd][value.uuid] = value.to_dict() msg = json_dumps(resp) self.publish_request(reply_topic, msg)
""" """ resp['data'] = {} for kvalue in self._controller.values.keys(): value = self._controller.values[kvalue] if value.genre == 0x01: if value.hadd not in resp['data']: resp['data'][value.hadd] = {} resp['data'][value.hadd][value.uuid] = value.to_dict() msg = json_dumps(resp) self.publish_request(reply_topic, msg)
""" """ resp['data'] = {} for kvalue in self._controller.values.keys(): value = self._controller.values[kvalue] if value.genre == 0x04: if value.hadd not in resp['data']: resp['data'][value.hadd] = {} resp['data'][value.hadd][value.uuid] = value.to_dict() msg = json_dumps(resp) self.publish_request(reply_topic, msg)
""" """ resp['data'] = {} for kvalue in self._controller.values.keys(): value = self._controller.values[kvalue] if value.genre == 0x05: if value.hadd not in resp['data']: resp['data'][value.hadd] = {} resp['data'][value.hadd][value.uuid] = value.to_dict() msg = json_dumps(resp) self.publish_request(reply_topic, msg)
""" """ self.mqtt_controller.publish(topic=reply_topic, payload=msg)
|