Coverage for janitoo.dhcp : 19%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
# -*- coding: utf-8 -*-
TODO : - actuellement l'etat HEARTBEAT n'existe que pour les primary car il sert pour le discover. - il faut l'implémenter pour les secondary : par /resolv ? /dhcp/?
"""
This file is part of Janitoo.
Janitoo is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
Janitoo is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with Janitoo. If not, see <http://www.gnu.org/licenses/>.
"""
# Set default logging handler to avoid "No handler found" warnings.
############################################################## #Check that we are in sync with the official command classes #Must be implemented for non-regression
##############################################################
#https://github.com/tyarkoni/transitions
'ONLINE' : 5, #The machine is online 'BOOT' : 4, #The machine is booting 'CONFIG' : 4, #The machine is configuring 'PENDING' : 3, #The machine did not send a ping 'FAILED' : 2, 'DEAD' : 1, 'OFFLINE' : 0, #The machine is offline. 'UNKNOWN' : -1, #Hummm ... we don' know. }
""" """ self.entries = {}
"""Load the cache from db
""" #Initialise the cache if query is not None: self.entries = {} data = query.all() for line in data: self.update(line.add_ctrl, line.add_node, state=line.state, hearbeat=line.heartbeat, last_seen=line.last_seen)
"""Flush the cache to db. Remmove failed name from cache
:param session: the session to use to communicate with db. May be a scoped_session if used in a separate tread. If None, use the common session. :type session: sqlalchemy session """ if query is not None: data = query.all() for line in data: if line.add_ctrl in self.entries and line.add_node in self.entries[line.add_ctrl]: line.state = self.entries[line.add_ctrl][line.add_node]['state'] line.last_seen = self.entries[line.add_ctrl][line.add_node]['last_seen'] #Remove failed nodes from cache for ctrl in self.entries.keys(): for node in self.entries[ctrl].keys(): if self.entries[ctrl][node]['state'] == 'dead': self.remove(ctrl, node)
"""Update an entry in cache
:param add_ctrl: the controller part of the address :type add_ctrl: Integer :param add_node: the node part of the address. 0 for controller, -1 for all nodes managed by controller. :type add_node: Integer :param state: the state of the node. :type state: String :param last_seen: the last time the node have been seen. :type last_seen: datetime """ #~ print "update heartbeat" #~ print add_ctrl, add_node, state if last_seen is None: last_seen=datetime.datetime.now() #Create/Update an entry in cache #~ print "heartbeat update %s, %s, %s : %s" % (add_ctrl, add_node, heartbeat, last_seen) if add_ctrl not in self.entries: self.entries[add_ctrl] = {} nodes = [] if add_node == -1: for nds in self.entries[add_ctrl]: nodes.append(nds) else: nodes.append(add_node) #print nodes for nd in nodes: if nd not in self.entries[add_ctrl]: self.entries[add_ctrl][nd] = {} self.entries[add_ctrl][nd]["state"] = state.upper() self.entries[add_ctrl][nd]["heartbeat"] = heartbeat * 1.1 #print "Update state here" self.entries[add_ctrl][nd]["last_seen"] = last_seen if state == 'ONLINE' or state == 'BOOT': #Reset the counter self.entries[add_ctrl][nd]["count"] = 0
"""Return the state of an entry in cache
:param add_ctrl: the controller part of the address :type add_ctrl: Integer :param add_node: the node part of the address. 0 for controller, -1 for all nodes managed by controller. :type add_node: Integer """ #Create/Update an entry in cache if add_ctrl not in self.entries or add_node not in self.entries[add_ctrl]: return 'OFFLINE' return self.entries[add_ctrl][add_node]['state']
"""Chech if an entry is in cache
:param add_ctrl: the controller part of the address :type add_ctrl: Integer :param add_node: the node part of the address. 0 for controller, -1 for all nodes managed by controller. :type add_node: Integer """ #Create/Update an entry in cache if add_ctrl not in self.entries: return False if add_node == -1: add_node = 0 if add_node in self.entries[add_ctrl]: return True return False
"""Remove an entry fom cache
:param add_ctrl: the controller part of the address :type add_ctrl: Integer :param add_node: the node part of the address. 0 for controller, -1 for all nodes managed by controller. :type add_node: Integer """ #Remove an antry from cache and clean tree in needed if add_ctrl not in self.entries: return if add_node in self.entries[add_ctrl]: del self.entries[add_ctrl][add_node] if len(self.entries[add_ctrl]) == 0 or add_node == -1: del self.entries[add_ctrl]
"""Number of entries in the cache
""" return len(self.entries)
"""Check the states of the machine. Must be called in a timer Called in a separate thread. Must use a scoped_session.
:param session: the session to use to communicate with db. May be a scoped_session if used in a separate tread. If None, use the common session. :type session: sqlalchemy session """ #~ print self.entries #~ print "Check heartbeat" now = datetime.datetime.now() lleases = list() for ctrl in self.entries.keys(): for node in self.entries[ctrl].keys(): if (now - self.entries[ctrl][node]['last_seen']).total_seconds() > self.entries[ctrl][node]['heartbeat'] \ and self.entries[ctrl][node]['state'] in ['ONLINE', 'BOOT', 'CONFIG', 'PENDING', 'FAILED', 'DEAD', 'OFFLINE']: #~ print "add heartbeat %s,%s : %s" % (ctrl, node, self.entries[ctrl][node]['last_seen']) lleases.append((ctrl, node)) for ctrl, node in lleases: if self.entries[ctrl][node]['state'] == 'FAILED' \ and (now - self.entries[ctrl][node]['last_seen']).total_seconds() > heartbeat_dead: self.entries[ctrl][node]['state'] = 'DEAD' self.entries[ctrl][node]['count'] = 0 else : #~ print self.entries[ctrl][node] if "count" not in self.entries[ctrl][node]: self.entries[ctrl][node]['count'] = 1 else: self.entries[ctrl][node]['count'] += 1 if self.entries[ctrl][node]['count'] >= heartbeat_count: #The count is reached #We need to change the state if self.entries[ctrl][node]['state'] == 'ONLINE': self.entries[ctrl][node]['state'] = 'PENDING' self.entries[ctrl][node]['count'] = 0 elif self.entries[ctrl][node]['state'] == 'BOOT': self.entries[ctrl][node]['state'] = 'PENDING' self.entries[ctrl][node]['count'] = 0 elif self.entries[ctrl][node]['state'] == 'PENDING': self.entries[ctrl][node]['state'] = 'FAILED' self.entries[ctrl][node]['count'] = 0 else: lleases.remove((ctrl, node)) #~ print heartbeat_count #~ print self.entries[ctrl][node] #~ print self.entries #~ print lleases return lleases
""" """ """ """
#Try to decode payload as json #And retrieve add_ctrl and add_node from it logger.warning("mqtt_on_heartbeat receive a request with no %s", ffield) except ValueError: logger.exception("mqtt_on_heartbeat can't convert add_ctrl %s to integer", data['add_ctrl']) return None, None, None except TypeError: logger.exception("mqtt_on_heartbeat can't convert add_ctrl %s to integer", data['add_ctrl']) except ValueError: logger.exception("mqtt_on_heartbeat can't convert add_node %s to integer", data['add_node']) return None, None, None except TypeError: logger.exception("mqtt_on_heartbeat can't convert add_node %s to integer", data['add_node']) except ValueError: #Try to retrieve adds from topic #hadd = message.topic.split('/')[-1] hadd = self.message.topic.replace("/dhcp/heartbeat/","") #print "hadd %s" % hadd try: add_ctrl,add_node = hadd_split(hadd) if add_ctrl is None: return None, None, None state = self.message.payload except ValueError: logger.exception("get_heartbeat exception") return None, None, None except TypeError: logger.exception("get_heartbeat exception")
""" """ if thread_event.is_set(): return mqttc = MQTTClient(options=options.data) mqttc.connect() mqttc.start() max_in_a_loop = 10 time_to_sleep = 0.06 if hadd is None or hadd=='': topic = TOPIC_RESOLV_BROADCAST else: topic = TOPIC_RESOLV_REPLY % hadd if data is None or len(data) == 0: #We send the response as is msg = json_dumps(resp) mqttc.publish(topic=topic, payload=msg) else : #~ print "data : %s"%data while len(data)>0: in_loop = 0 for key in data.keys(): if key in ["nodes", "systems", "configs", "commands", "basics", "users"]: resp['data'] = {} for kkey in data[key].keys(): #~ print "data 2: %s"%data #print "in_loop : %s"%in_loop if in_loop < max_in_a_loop: resp['data'][kkey] = data[key][kkey] del data[key][kkey] in_loop +=1 else: break if len(data[key]) == 0: del data[key] if len(resp['data'])==0: break #~ print "resp = %s"%resp msg = json_dumps(resp) mqttc.publish(topic=topic, payload=msg) thread_event.wait(time_to_sleep) mqttc.stop() if mqttc.is_alive(): try: mqttc.join() except: logger.exception("Catched exception") mqttc = None
"""The network manager : handle all nodes, values, ...
Will be used in dhcp server and web server.
Should work in 2 modes : - resolv : use the dhcp to retrieve information : nodes, values, configs, ... - listen to nodes updates sent by the dhcp (node_update, node_added, node_remove, ... - broadcast : to retrieve informations from nodes : in case of a dhcp fail or for the dhcp itself - heartbeat listening : when we receive a heartbeat from an unknown node, we send it standard system queries - send notification for node_added, node_update_info, node_update_config, ...
Actually, we depend on socket/flask to emit update for the listener. This should be removed for the merge with the dhcp manager We can achieve it by callbacks or class heritage.
Same problem for the lease manager
heartbeat : heartbeat will be managed inside this class. No more dependcy to /dhcp = > new topic /heartbeat
Starting as master is simple
We should implement startup as a state machine : master : broadcast -> heartbeat secondary : resolv and if needed -> failover
State machines :
- fsm_network : a global one to manage primary, secondary and fail over states
"""
State(name='STOPPED', on_enter=['stop_resolv_heartbeat','stop_resolv_discover', 'stop_resolv_request', 'stop_nodes_discover', 'stop_heartbeat', 'stop_heartbeat_discover', 'stop_dispatch_heartbeat', 'unset_failed', 'stop_broadcast_nodes_discover', 'stop_broadcast_configs_discover', 'stop_broadcast_systems_discover', 'stop_broadcast_basics_discover', 'stop_broadcast_users_discover']), State(name='STARTED', on_enter=['start_values_listener', 'fsm_on_started'], on_exit=['stop_values_listener', 'stop_resolv_discover']), #~ State(name='BROADCAST_START', on_exit=['start_broadcast_discover']), #~ State(name='BROADCAST_STOP', on_exit=['stop_broadcast_discover']), State(name='BROADCAST_NODES', on_enter=['start_nodes_discover','start_broadcast_nodes_discover'], on_exit=['stop_broadcast_nodes_discover']), State(name='BROADCAST_SYSTEMS', on_enter=['start_broadcast_systems_discover'], on_exit=['stop_broadcast_systems_discover']), State(name='BROADCAST_CONFIGS', on_enter=['start_broadcast_configs_discover'], on_exit=['stop_broadcast_configs_discover']), State(name='BROADCAST_COMMANDS', on_enter=['start_broadcast_commands_discover'], on_exit=['stop_broadcast_commands_discover']), State(name='BROADCAST_BASICS', on_enter=['start_broadcast_basics_discover'], on_exit=['stop_broadcast_basics_discover']), State(name='BROADCAST_USERS', on_enter=['start_broadcast_users_discover'], on_exit=['stop_broadcast_users_discover', 'stop_broadcast_discover']), State(name='RESOLV', on_enter=['start_resolv_discover', 'start_resolv_heartbeat']), State(name='HEARTBEAT_DISCOVER', on_enter=['start_heartbeat_discover']), State(name='HEARTBEAT', on_enter=['start_heartbeat']), State(name='DISPATCH', on_enter=['start_dispatch_heartbeat', 'start_resolv_request'], on_exit=['stop_resolv_request']), ]
'STOPPED' : "Network is stopped", 'STARTED' : "Network is started", 'BROADCAST_NODES' : "Broadcasting nodes", 'BROADCAST_USERS' : "Broadcasting user values", 'BROADCAST_CONFIGS' : "Broadcasting config values", 'BROADCAST_SYSTEMS' : "Broadcasting system values", 'BROADCAST_BASICS' : "Broadcasting basic values", 'BROADCAST_COMMANDS' : "Broadcasting command values", 'RESOLV' : "Resolving nodes", 'HEARTBEAT_DISCOVER' : "Start headbeart discovering", 'HEARTBEAT' : "Start headbeart listening", 'DISPATCH' : "Start headbeart dispatch", }
"""The states of the network """
""" """ self.options = options self._stopevent = stopevent self.home_id = kwargs.get('home_id', "Unknown") self.is_primary = kwargs.get('is_primary', True) self.do_heartbeat_dispatch = kwargs.get('do_heartbeat_dispatch', True) self.is_secondary = kwargs.get('is_secondary', False) self._is_failed = kwargs.get('is_failed', False) self.broadcast_timeout = kwargs.get('broadcast_timeout', 10) self.resolv_timeout = kwargs.get('resolv_timeout', 15) self.request_timeout = kwargs.get('request_timeout', 10) self._test = kwargs.get('test', False) """For tests only"""
self.state = 'STOPPED' self.fsm_network = None self.nodes = {} self.users = {} self.configs = {} self.basics = {} self.systems = {} self.commands = {} self._lock = threading.Lock() self.broadcast_mqttc = None self.broadcast_nodes_timer = None self.broadcast_configs_timer = None self.broadcast_systems_timer = None self.broadcast_users_timer = None self.broadcast_basics_timer = None self.broadcast_commands_timer = None self.heartbeat_discover_mqttc = None self.nodes_mqttc = None self.resolv_mqttc = None self.resolv_timeout_timer = None self.resolv_heartbeat_mqttc = None self.resolv_heartbeat_timer = None self.dispatch_heartbeat_mqttc = None self.dispatch_heartbeat_timer = None self.values_mqttc = None self.resolv_request_mqttc = None self.heartbeat_mqttc = None
self.dbcon = None self.hadds = {} self.heartbeat_cache = None
#~ self._replies = {'request_info_nodes' : self.add_nodes, 'request_info_users' : self.add_users, 'request_info_configs' : self.add_configs, #~ 'request_info_systems' : self.add_systems, 'request_info_basics' : self.add_basics, 'request_info_commands' : self.add_commands }
""" """ self.stop()
def is_failed(self): """ """ return self._is_failed
def is_stopped(self): """Return True if the network is stopped """ return self.fsm_network == None
def is_started(self): """Return True if the network is started """ return self.state == "STARTED"
""" """ self._is_failed = True self.emit_network()
""" """ self._is_failed = False self.emit_network()
"""Start the network """ self.heartbeat_cache = CacheManager() self.loop_sleep = loop_sleep options = self.options.get_options('network') self.from_dict(options) if self.is_primary and self.is_secondary: raise RuntimeError("Can't start in both modes : primary or secondary") logger.debug("Start network with options %s" % options) self.heartbeat_cache.start(None) self.emit_nodes() self.emit_network() if self.fsm_network is None: self.fsm_network = Machine(model=self, states=self.fsm_network_states, initial='STOPPED') self.fsm_network.add_ordered_transitions() #~ self.fsm_network.add_transition('fsm_network_start', 'STOPPED', 'BROADCAST_START', conditions=['is_primary']) self.fsm_network.add_transition('fsm_network_start', 'STOPPED', 'BROADCAST_NODES', conditions=['fsm_is_primary']) self.fsm_network.add_transition('fsm_network_start', 'STOPPED', 'RESOLV', conditions=['fsm_is_secondary']) #~ self.fsm_network.add_transition('fsm_network_next', 'BROADCAST_START', 'BROADCAST_NODES') self.fsm_network.add_transition('fsm_network_next', 'BROADCAST_NODES', 'BROADCAST_SYSTEMS') self.fsm_network.add_transition('fsm_network_next', 'BROADCAST_SYSTEMS', 'BROADCAST_COMMANDS') self.fsm_network.add_transition('fsm_network_next', 'BROADCAST_COMMANDS', 'BROADCAST_CONFIGS') self.fsm_network.add_transition('fsm_network_next', 'BROADCAST_CONFIGS', 'BROADCAST_BASICS') self.fsm_network.add_transition('fsm_network_next', 'BROADCAST_BASICS', 'BROADCAST_USERS') #~ self.fsm_network.add_transition('fsm_network_next', 'BROADCAST_CONFIGS', 'BROADCAST_STOP') self.fsm_network.add_transition('fsm_network_next', 'BROADCAST_USERS', 'HEARTBEAT_DISCOVER') #~ self.fsm_network.add_transition('fsm_network_next', 'BROADCAST_STOP', 'HEARTBEAT_DISCOVER') self.fsm_network.add_transition('fsm_network_next', 'HEARTBEAT_DISCOVER', 'DISPATCH', conditions=['fsm_do_heartbeat_dispatch']) self.fsm_network.add_transition('fsm_network_next', 'HEARTBEAT_DISCOVER', 'STARTED') self.fsm_network.add_transition('fsm_network_next', 'DISPATCH', 'STARTED') self.fsm_network.add_transition('fsm_network_next', 'RESOLV', 'HEARTBEAT') self.fsm_network.add_transition('fsm_network_next', 'HEARTBEAT', 'STARTED') self.fsm_network.add_transition('fsm_network_fail', 'RESOLV', 'BROADCAST_NODES', before = ['set_failed']) self.fsm_network.add_transition('fsm_network_recover', 'STARTED', 'RESOLV', before = ['stop_heartbeat_discover', 'stop_dispatch_heartbeat', 'unset_failed']) self.fsm_network.add_transition('fsm_network_stop', '*', 'STOPPED', after = ['delete_fsm']) self.state = 'STOPPED'
"""Delete the fsm """ self.fsm_network = None
"""Stop the network """ self.stop_resolv_heartbeat_timer() self.stop_dispatch_heartbeat_timer() self.fsm_network_stop() self._lock.acquire() try: self.nodes = {} self.hadds = {} self.configs = {} self.users = {} self.bascis = {} self.systems = {} except: logger.exception("Exception in network stop") finally: if self.heartbeat_cache is not None: self.heartbeat_cache.flush(None) self.heartbeat_cache = None self.emit_nodes()
""" """
""" """
""" """ return self.do_heartbeat_dispatch
""" """ logger.debug("fsm_network : %s", 'start_broadcast_nodes_discover') if self._test: print "start_broadcast_nodes_discover" else: if self.broadcast_mqttc is None: self.broadcast_mqttc = MQTTClient(options=self.options.data) self.broadcast_mqttc.connect() self.broadcast_mqttc.subscribe(topic=TOPIC_BROADCAST_REPLY%self.hadds[0], callback=self.on_reply) self.broadcast_mqttc.start() self.broadcast_mqttc.publish(TOPIC_BROADCAST_REQUEST, json_dumps(msg)) self.broadcast_nodes_timer = threading.Timer(self.broadcast_timeout, self.finish_broadcast_nodes_discover) self.broadcast_nodes_timer.start() self.emit_network() self.emit_nodes()
""" """ logger.debug("fsm_network : %s", 'stop_broadcast_nodes_discover') if self._test: print "stop_nodes_broadcast_discover" else: if self.broadcast_nodes_timer is not None: self.broadcast_nodes_timer.cancel() self.broadcast_nodes_timer = None self.emit_network() self.emit_nodes()
""" """ logger.debug("fsm_network : %s", 'stop_broadcast_discover') if self._test: print "stop_broadcast_discover" else: if self.broadcast_mqttc is not None: self.broadcast_mqttc.unsubscribe(topic=TOPIC_BROADCAST_REPLY%self.hadds[0]) self.broadcast_mqttc.stop() if self.broadcast_mqttc.is_alive(): try: except: logger.exception("Catched exception") self.broadcast_mqttc = None self.emit_network() self.emit_nodes()
""" """ logger.debug("fsm_network : %s", 'finish_broadcast_nodes_discover') self.emit_network() self.emit_nodes() if not self.is_started and not self.is_stopped: self.fsm_network_next()
""" """ logger.debug("fsm_network : %s", 'start_broadcast_users_discover') if self._test: print "start_broadcast_users_discover" else: self.broadcast_mqttc.publish(TOPIC_BROADCAST_REQUEST, json_dumps(msg)) self.broadcast_users_timer = threading.Timer(self.broadcast_timeout, self.finish_broadcast_users_discover) self.broadcast_users_timer.start() self.emit_network() self.emit_nodes()
""" """ logger.debug("fsm_network : %s", 'stop_broadcast_users_discover') if self._test: print "stop_broadcast_users_discover" else: if self.broadcast_users_timer is not None: self.broadcast_users_timer.cancel() self.broadcast_users_timer = None self.emit_network() self.emit_nodes()
""" """ logger.debug("fsm_network : %s", 'finish_broadcast_users_discover') self.emit_network() self.emit_users() self.fsm_network_next()
""" """ logger.debug("fsm_network : %s", 'start_broadcast_configs_discover') if self._test: print "start_broadcast_configs_discover" else: self.broadcast_mqttc.publish(TOPIC_BROADCAST_REQUEST, json_dumps(msg)) self.broadcast_configs_timer = threading.Timer(self.broadcast_timeout, self.finish_broadcast_configs_discover) self.broadcast_configs_timer.start() self.emit_network() self.emit_nodes()
""" """ logger.debug("fsm_network : %s", 'stop_broadcast_configs_discover') print "stop_broadcast_configs_discover" else: if self.broadcast_configs_timer is not None: self.broadcast_configs_timer.cancel() self.broadcast_configs_timer = None
""" """ logger.debug("fsm_network : %s", 'finish_broadcast_configs_discover') self.emit_network() self.emit_configs() self.fsm_network_next()
""" """ logger.debug("fsm_network : %s", 'start_broadcast_basics_discover') if self._test: print "start_broadcast_basics_discover" else: self.broadcast_mqttc.publish(TOPIC_BROADCAST_REQUEST, json_dumps(msg)) self.broadcast_basics_timer = threading.Timer(self.broadcast_timeout, self.finish_broadcast_basics_discover) self.broadcast_basics_timer.start() self.emit_network() self.emit_nodes()
""" """ logger.debug("fsm_network : %s", 'stop_broadcast_basics_discover') print "stop_broadcast_basics_discover" else: if self.broadcast_basics_timer is not None: self.broadcast_basics_timer.cancel() self.broadcast_basics_timer = None
""" """ logger.debug("fsm_network : %s", 'finish_broadcast_basics_discover') self.emit_network() self.emit_basics() self.fsm_network_next()
""" """ logger.debug("fsm_network : %s", 'start_broadcast_systems_discover') if self._test: print "start_broadcast_systems_discover" else: self.broadcast_mqttc.publish(TOPIC_BROADCAST_REQUEST, json_dumps(msg)) self.broadcast_systems_timer = threading.Timer(self.broadcast_timeout, self.finish_broadcast_systems_discover) self.broadcast_systems_timer.start() self.emit_network() self.emit_nodes()
""" """ logger.debug("fsm_network : %s", 'stop_broadcast_systems_discover') print "stop_broadcast_systems_discover" else: if self.broadcast_systems_timer is not None: self.broadcast_systems_timer.cancel() self.broadcast_systems_timer = None
""" """ logger.debug("fsm_network : %s", 'finish_broadcast_systems_discover') self.emit_network() self.emit_systems() self.fsm_network_next()
""" """ logger.debug("fsm_network : %s", 'start_broadcast_commands_discover') if self._test: print "start_broadcast_commands_discover" else: self.broadcast_mqttc.publish(TOPIC_BROADCAST_REQUEST, json_dumps(msg)) self.broadcast_commands_timer = threading.Timer(self.broadcast_timeout, self.finish_broadcast_commands_discover) self.broadcast_commands_timer.start() self.emit_network() self.emit_nodes()
""" """ logger.debug("fsm_network : %s", 'stop_broadcast_commands_discover') print "stop_broadcast_commands_discover" else: if self.broadcast_commands_timer is not None: self.broadcast_commands_timer.cancel() self.broadcast_commands_timer = None
""" """ logger.debug("fsm_network : %s", 'finish_broadcast_commands_discover') self.emit_network() self.emit_commands() self.fsm_network_next()
""" """ logger.debug("fsm_network : %s", 'fsm_on_started') self.emit_all()
#~ def resolv_mqttc_on_connect(self, client, userdata, flags, rc): """Called when the broker responds to our connection request.
:param client: the Client instance that is calling the callback. :type client: paho.mqtt.client.Client :param userdata: user data of any type and can be set when creating a new client instance or with user_data_set(userdata). :type userdata: all :param flags: flags is a dict that contains response flags from the broker: flags['session present'] - this flag is useful for clients that are using clean session set to 0 only. If a client with clean session=0, that reconnects to a broker that it has previously connected to, this flag indicates whether the broker still has the session information for the client. If 1, the session still exists. :type flags: dict :param rc: the value of rc determines success or not: 0: Connection successful 1: Connection refused - incorrect protocol version 2: Connection refused - invalid client identifier 3: Connection refused - server unavailable 4: Connection refused - bad username or password 5: Connection refused - not authorised 6-255: Currently unused. :type rc: in """ #~ logger.debug("fsm_network : %s", 'resolv_mqttc_on_connect') #~ if self.resolv_timeout_timer is not None: #~ self.resolv_timeout_timer.cancel() #~ self.resolv_timeout_timer = None #~ self.emit_network() #~ self.fsm_network_next()
""" """ logger.debug("fsm_network : %s", 'start_resolv_discover') if self._test: print "start_resolv_discover" else: if self.resolv_mqttc is None: self.resolv_mqttc = MQTTClient(options=self.options.data) self.resolv_mqttc.connect() self.resolv_mqttc.subscribe(topic="%s#"%TOPIC_RESOLV) self.resolv_mqttc.add_topic(topic=TOPIC_RESOLV_REPLY%self.hadds[0], callback=self.on_resolv_reply) self.resolv_mqttc.add_topic(topic=TOPIC_RESOLV_BROADCAST+'#', callback=self.on_resolv_reply) self.resolv_mqttc.start() try: self._stopevent.wait(0.5) except: logger.exception('Catched exception') #~ self.resolv_mqttc.on_connect = self.resolv_mqttc_on_connect th = threading.Timer(self.request_timeout/4, self.request_resolv_nodes) th.start() th = threading.Timer(self.request_timeout/2, self.request_resolv_systems) th.start() th = threading.Timer(self.request_timeout, self.request_resolv_configs) th.start() th = threading.Timer(self.request_timeout + self.request_timeout/4, self.request_resolv_basics) th.start() th = threading.Timer(self.request_timeout + self.request_timeout/2, self.request_resolv_users) th.start() th = threading.Timer(self.request_timeout * 2, self.request_resolv_commands) th.start() if self.resolv_timeout_timer is not None: self.resolv_timeout_timer = None self.resolv_timeout_timer = threading.Timer(self.resolv_timeout, self.finish_resolv_discover) self.resolv_timeout_timer.start() self.emit_network() self.emit_nodes()
"""This function is called when we did nod receive informations on /dhcp/resolv defore timeout. The dhcp server must have send its 'online' status ... so he his dead fallback to fail mode """ self.resolv_timeout_timer = None if not self.is_failed: logger.warning("The network switch to failed mode") self.fsm_network_fail() self.emit_network()
""" """ logger.debug("fsm_network : %s", 'stop_resolv_discover') if self._test: print "stop_resolv_discover" else: if self.resolv_timeout_timer is not None: self.resolv_timeout_timer.cancel() self.resolv_timeout_timer = None if self.resolv_mqttc is not None: self.resolv_mqttc.remove_topic(topic=TOPIC_RESOLV_REPLY%self.hadds[0]) self.resolv_mqttc.remove_topic(topic=TOPIC_RESOLV_BROADCAST+'#') self.resolv_mqttc.unsubscribe(topic="%s#"%TOPIC_RESOLV) self.resolv_mqttc.stop() try: self.resolv_mqttc.join() except: logger.exception("Catched exception") self.resolv_mqttc = None
""" """ logger.debug("fsm_network : %s", 'start_resolv_request') if self._test: print "start_resolv_request" else: if self.resolv_request_mqttc is None: self.resolv_request_mqttc = MQTTClient(options=self.options.data) self.resolv_request_mqttc.connect() self.resolv_request_mqttc.subscribe(topic="%s#"%TOPIC_RESOLV_REQUEST, callback=self.on_resolv_request) self.resolv_request_mqttc.start()
""" """ logger.debug("fsm_network : %s", 'stop_resolv_request') if self._test: print "stop_resolv_request" else: if self.resolv_request_mqttc is not None: self.resolv_request_mqttc.unsubscribe(topic="%s#"%TOPIC_RESOLV_REQUEST) try: self.resolv_request_mqttc.join() except: logger.exception("Catched exception") self.resolv_request_mqttc = None
""" """ logger.debug("fsm_network : %s", 'start_resolv_heartbeat') if self._test: print "start_resolv_heartbeat" else: self.resolv_heartbeat_mqttc = MQTTClient(options=self.options.data) self.resolv_heartbeat_mqttc.connect() self.resolv_heartbeat_mqttc.subscribe(topic="%sheartbeat"%TOPIC_RESOLV, callback=self.on_resolv_heartbeat) self.resolv_heartbeat_mqttc.start() self.stop_resolv_heartbeat_timer()
""" """ logger.debug("fsm_network : %s", 'start_resolv_heartbeat_timer') print "start_resolv_heartbeat_timer" else: self.stop_resolv_heartbeat_timer() self.resolv_heartbeat_timer = threading.Timer(self.resolv_timeout, self.finish_resolv_heartbeat_timer) self.resolv_heartbeat_timer.start()
"""This function is called when we did nod receive informations on /dhcp/resolv defore timeout. The dhcp server must have send its 'online' status ... so he his dead fallback to fail mode """ logger.debug("fsm_network : %s", 'finish_resolv_heartbeat_timer') if not self.is_failed and self.is_started: logger.warning("The network switch to failed mode") self.fsm_network_fail() self.emit_network()
""" """ logger.debug("fsm_network : %s", 'stop_resolv_heartbeat') if self._test: print "stop_resolv_heartbeat" else: self.stop_resolv_heartbeat_timer() if self.resolv_heartbeat_mqttc is not None: self.resolv_heartbeat_mqttc.unsubscribe(topic="%sheartbeat"%TOPIC_RESOLV) try: self.resolv_heartbeat_mqttc.join() except: logger.exception("Catched exception") self.resolv_heartbeat_mqttc = None
""" """ logger.debug("fsm_network : %s", 'stop_resolv_heartbeat_timer') print "stop_resolv_heartbeat_timer" else: if self.resolv_heartbeat_timer is not None: self.resolv_heartbeat_timer.cancel() self.resolv_heartbeat_timer = None
""" """ logger.debug("fsm_network : %s", 'stop_nodes_discover') if self._test: print "stop_nodes_discover" else: if self.nodes_mqttc is not None: self.nodes_mqttc.unsubscribe(topic='/nodes/%s/reply/'%self.hadds[0]) try: self.nodes_mqttc.join() except: logger.exception("Catched exception") self.nodes_mqttc = None
""" """ logger.debug("fsm_network : %s", 'start_nodes_discover') if self._test: print "start_nodes_discover" else: if self.nodes_mqttc is None: self.nodes_mqttc.connect() self.nodes_mqttc.subscribe(topic='/nodes/%s/reply/'%self.hadds[0], callback=self.on_reply) self.nodes_mqttc.start() self.emit_nodes() self.emit_network()
""" """ logger.debug("fsm_network : %s", 'start_heartbeat_discover') if self._test: print "start_heartbeat_discover" else: if self.heartbeat_discover_mqttc is None: self.heartbeat_discover_mqttc.connect() self.heartbeat_discover_mqttc.subscribe(topic='/dhcp/heartbeat/#', callback=self.on_heartbeat_discover) self.heartbeat_discover_mqttc.start() self.emit_network() self.fsm_network_next()
""" """ logger.debug("fsm_network : %s", 'stop_heartbeat_discover') if self._test: print "stop_heartbeat_discover" else: if self.heartbeat_discover_mqttc is not None: self.heartbeat_discover_mqttc.unsubscribe(topic='/dhcp/heartbeat/#') self.heartbeat_discover_mqttc.stop() self.heartbeat_discover_mqttc.join() except: logger.exception("Catched exception") self.heartbeat_discover_mqttc = None self.emit_network()
""" """ logger.debug("fsm_network : %s", 'start_heartbeat') if self._test: print "start_heartbeat" else: if self.heartbeat_mqttc is None: self.heartbeat_mqttc.connect() self.heartbeat_mqttc.subscribe(topic='/dhcp/heartbeat/#', callback=self.on_heartbeat) self.heartbeat_mqttc.start() self.emit_network() self.fsm_network_next()
""" """ logger.debug("fsm_network : %s", 'stop_heartbeat') if self._test: print "stop_heartbeat" else: if self.heartbeat_mqttc is not None: self.heartbeat_mqttc.unsubscribe(topic='/dhcp/heartbeat/#') self.heartbeat_mqttc.stop() self.heartbeat_mqttc.join() except: logger.exception("Catched exception") self.heartbeat_mqttc = None self.emit_network()
""" """ logger.debug("fsm_network : %s", 'start_dispatch_heartbeat') if self._test: print "start_dispatch_heartbeat" else: if self.dispatch_heartbeat_mqttc is None: self.dispatch_heartbeat_mqttc = MQTTClient(options=self.options.data) #~ self.dispatch_heartbeat_mqttc.subscribe(topic="%sheartbeat"%TOPIC_RESOLV, callback=self.on_dispatch_heartbeat) self.dispatch_heartbeat_mqttc.start() self.start_dispatch_heartbeat_timer() self.emit_network() self.fsm_network_next()
""" """ logger.debug("fsm_network : %s", 'start_dispatch_heartbeat_timer') print "start_dispatch_heartbeat_timer" else: self.stop_dispatch_heartbeat_timer() self.dispatch_heartbeat_timer = threading.Timer(1, self.finish_dispatch_heartbeat_timer) self.dispatch_heartbeat_timer.start()
"""This function is called when we did nod receive informations on /dhcp/resolv defore timeout. The dhcp server must have send its 'online' status ... so he his dead fallback to fail mode """ logger.debug("fsm_network : %s", 'finish_dispatch_heartbeat_timer') self.stop_dispatch_heartbeat_timer() self.start_dispatch_heartbeat_timer() timeouts = self.heartbeat_cache.check_heartbeats() for add_ctrl, add_node in timeouts: #~ print add_ctrl, add_node #~ print msg self.dispatch_heartbeat_mqttc.publish_heartbeat_msg(msg) #~ self.dispatch_heartbeat_mqttc.publish_heartbeat_resolv_msg(msg) if self._stopevent is not None: self._stopevent.wait(0.02)
""" """ logger.debug("fsm_network : %s", 'stop_dispatch_heartbeat') if self._test: print "stop_dispatch_heartbeat" else: self.stop_dispatch_heartbeat_timer() if self.dispatch_heartbeat_mqttc is not None: try: self.dispatch_heartbeat_mqttc.join() except: logger.exception("Catched exception") self.dispatch_heartbeat_mqttc = None
""" """ logger.debug("fsm_network : %s", 'stop_dispatch_heartbeat_timer') print "stop_dispatch_heartbeat_timer" else: if self.dispatch_heartbeat_timer is not None: self.dispatch_heartbeat_timer.cancel() self.dispatch_heartbeat_timer = None
""" """ logger.debug("fsm_network : %s", 'start_values_listener') if self._test: print "start_values_listener" else: self.values_mqttc = MQTTClient(options=self.options.data) self.values_mqttc.connect() self.values_mqttc.subscribe(topic='/values/#', callback=self.on_value) self.values_mqttc.start() self.emit_network()
""" """ logger.debug("fsm_network : %s", 'stop_values_listener') if self._test: print "stop_values_listener" else: if self.values_mqttc is not None: self.values_mqttc.unsubscribe(topic='/values/#') self.values_mqttc.stop() self.values_mqttc.join() except: logger.exception("Catched exception") self.values_mqttc = None self.emit_network()
"""On value
:param client: the Client instance that is calling the callback. :type client: paho.mqtt.client.Client :param userdata: user data of any type and can be set when creating a new client instance or with user_data_set(userdata). :type userdata: all :param message: The message variable is a MQTTMessage that describes all of the message parameters. :type message: paho.mqtt.client.MQTTMessage """ logger.debug("[%s] - on_value %s", self.__class__.__name__, message.payload) try: mdata = json_loads(message.payload) #~ print mdata if 'genre' in mdata: data = {0:mdata} else: data = mdata logger.debug("[%s] - on_value 2 %s", self.__class__.__name__, data) for key in data.keys(): #~ print data if data[key]['genre'] == 0x01: self.add_basics(data) self.emit_basics() elif data[key]['genre'] == 0x02: self.add_users(data) self.emit_users() elif data[key]['genre'] == 0x03: self.add_configs(data) self.emit_configs() elif data[key]['genre'] == 0x04: self.add_systems(data) self.emit_systems() elif data[key]['genre'] == 0x05: self.emit_commands() else : logger.warning("Unknown genre in value %s", data) except: logger.exception("Exception in on_value")
"""On diqpatch request
:param client: the Client instance that is calling the callback. :type client: paho.mqtt.client.Client :param userdata: user data of any type and can be set when creating a new client instance or with user_data_set(userdata). :type userdata: all :param message: The message variable is a MQTTMessage that describes all of the message parameters. :type message: paho.mqtt.client.MQTTMessage """ logger.debug("[%s] - on_resolv_request %s", self.__class__.__name__, message.payload) try: data = json_loads(message.payload) #~ print data['uuid'] #We should check what value is requested #{'hadd', 'cmd_class', 'type'='list', 'genre'='0x04', 'data'='node|value|config', 'uuid'='request_info'} #print self.systems if data['cmd_class'] == COMMAND_DISCOVERY: if data['genre'] == 0x04: resp = {} resp.update(data) #~ print "data uuid %s" %data['uuid'] data_to_send = {'nodes':{}, 'systems':{}, 'configs':{}, 'commands':{}, 'basics':{}, 'users':{}} if data['uuid'] == "request_info_nodes": #~ print " self.nodes %s"%self.nodes for knode in self.nodes.keys(): #~ print " self.nodes.keys %s"%self.nodes.keys() data_to_send['nodes'][knode] = self.nodes[knode] #~ print "data_to_send %s"%data_to_send elif data['uuid'] == "request_info_configs": for knode in self.configs.keys(): #~ print knode #~ print knode for kvalue in self.configs[knode].keys(): #~ print kvalue value = self.configs[knode][kvalue] #~ print value if value['genre'] == 0x03: if not value['hadd'] in data_to_send['configs']: data_to_send['configs'][value['hadd']] = {} data_to_send['configs'][value['hadd']][value['uuid']] = value elif data['uuid'] == "request_info_systems": for knode in self.systems.keys(): #~ print knode for kvalue in self.systems[knode].keys(): #~ print kvalue value = self.systems[knode][kvalue] #~ print value if value['genre'] == 0x04: if not value['hadd'] in data_to_send['systems']: data_to_send['systems'][value['hadd']] = {} data_to_send['systems'][value['hadd']][value['uuid']] = value elif data['uuid'] == "request_info_commands": for knode in self.commands.keys(): #~ print knode for kvalue in self.commands[knode].keys(): #~ print kvalue value = self.commands[knode][kvalue] #~ print value if value['genre'] == 0x05: if not value['hadd'] in data_to_send['commands']: data_to_send['commands'][value['hadd']] = {} data_to_send['commands'][value['hadd']][value['uuid']] = value elif data['uuid'] == "request_info_users": for knode in self.users.keys(): #~ print knode for kvalue in self.users[knode].keys(): #~ print kvalue value = self.users[knode][kvalue] #~ print value if value['genre'] == 0x02: if not value['hadd'] in data_to_send['users']: data_to_send['users'][value['hadd']] = {} data_to_send['users'][value['hadd']][value['uuid']] = value elif data['uuid'] == "request_info_basics": for knode in self.basics.keys(): #~ print knode for kvalue in self.basics[knode].keys(): #~ print kvalue value = self.basics[knode][kvalue] #~ print value if value['genre'] == 0x01: if not value['hadd'] in data_to_send['basics']: data_to_send['basics'][value['hadd']] = {} data_to_send['basics'][value['hadd']][value['uuid']] = value else: logger.warning("Can't find % in %s", data['uuid'],'on_resolv_request') return #~ print "final data_to_send %s"%data_to_send try: thread = threading.Thread(target = threaded_send_resolv, args = (self._stopevent, self.options, data['reply_hadd'], resp, data_to_send)) except: logger.exception("Exception when running on_request method") return except: logger.exception("Exception in on_resolv_request")
"""On reply
:param client: the Client instance that is calling the callback. :type client: paho.mqtt.client.Client :param userdata: user data of any type and can be set when creating a new client instance or with user_data_set(userdata). :type userdata: all :param message: The message variable is a MQTTMessage that describes all of the message parameters. :type message: paho.mqtt.client.MQTTMessage """ logger.debug("[%s] - on_reply %s", self.__class__.__name__, message.payload) try: data = json_loads(message.payload) #We should check what value is requested #{'hadd', 'cmd_class', 'type'='list', 'genre'='0x04', 'data'='node|value|config', 'uuid'='request_info'} if data['cmd_class'] == COMMAND_DISCOVERY: if data['genre'] == 0x04: logger.warning("Data in %s : %s",data['uuid'], data) if len(data['data']) == 0: return if data['uuid'] == "request_info_nodes": self.add_nodes(data['data']) elif data['uuid'] == "request_info_configs": if 'genre' in data['data']: data = {0:data['data']} else: data = data['data'] #~ print "data", data for key in data.keys(): self.add_configs(data[key]) elif data['uuid'] == "request_info_systems": self.add_systems(data['data']) elif data['uuid'] == "request_info_commands": self.add_commands(data['data']) elif data['uuid'] == "request_info_users": if 'genre' in data['data']: data = {0:data['data']} else: data = data['data'] #~ print "data", data for key in data.keys(): self.add_users(data[key]) elif data['uuid'] == "request_info_basics": if 'genre' in data['data']: data = {0:data['data']} else: data = data['data'] #~ print "data", data for key in data.keys(): self.add_basics(data[key]) else: logger.warning("Unknown value% in %s", data['uuid'],'on_reply') return thread = threading.Thread(target = threaded_send_resolv, args = (self._stopevent, self.options, None, data, None)) thread.start()
except: logger.exception("Exception in on_reply")
"""On resolv
:param client: the Client instance that is calling the callback. :type client: paho.mqtt.client.Client :param userdata: user data of any type and can be set when creating a new client instance or with user_data_set(userdata). :type userdata: all :param message: The message variable is a MQTTMessage that describes all of the message parameters. :type message: paho.mqtt.client.MQTTMessage """ logger.debug("[%s] - on_resolv %s", self.__class__.__name__, message.payload) self.start_resolv_heartbeat_timer() if self.is_failed and self.is_started: logger.warning("The network switch to normal mode") self.fsm_network_recover() if self.resolv_timeout_timer is not None: self.resolv_timeout_timer = None self.fsm_network_next() if message.topic == "/dhcp/resolv/heartbeat": logger.debug("on_resolv : %s", 'receive heartbeat') return
"""On resolv
:param client: the Client instance that is calling the callback. :type client: paho.mqtt.client.Client :param userdata: user data of any type and can be set when creating a new client instance or with user_data_set(userdata). :type userdata: all :param message: The message variable is a MQTTMessage that describes all of the message parameters. :type message: paho.mqtt.client.MQTTMessage """ logger.debug("[%s] - on_resolv_reply %s", self.__class__.__name__, message.payload) self.start_resolv_heartbeat_timer()
"""Emit a network state event """
"""Emit a nodes state event """ pass
"""Emit a node state event nodes : a single node or a dict of nodes """
"""Emit a nodes state event """ pass
"""Emit a node state event nodes : a single node or a dict of nodes """
"""Emit a nodes state event """ pass
"""Emit a node state event nodes : a single node or a dict of nodes """
"""Emit a nodes state event """ pass
"""Emit a node state event nodes : a single node or a dict of nodes """
"""Emit a nodes state event """ pass
"""Emit a node state event nodes : a single node or a dict of nodes """
"""Emit a nodes state event """ pass
"""Emit a node state event nodes : a single node or a dict of nodes """ pass
"""Emit all events nodes : a single node or a dict of nodes """ self.emit_configs() self.emit_systems() self.emit_commands() self.emit_configs() self.emit_basics()
""" """ if self.heartbeat_discover_mqttc is None: return msg = { 'cmd_class': COMMAND_DISCOVERY, 'genre':0x04, 'uuid':'request_info_nodes', 'reply_hadd':self.hadds[0]} self.heartbeat_discover_mqttc.publish(TOPIC_NODES_REQUEST%hadd, json_dumps(msg))
""" """ if self.heartbeat_discover_mqttc is None: return msg = { 'cmd_class': COMMAND_DISCOVERY, 'genre':0x04, 'uuid':'request_info_users', 'reply_hadd':self.hadds[0]} self.heartbeat_discover_mqttc.publish(TOPIC_NODES_REQUEST%hadd, json_dumps(msg))
""" """ if self.heartbeat_discover_mqttc is None: return msg = { 'cmd_class': COMMAND_DISCOVERY, 'genre':0x04, 'uuid':'request_info_systems', 'reply_hadd':self.hadds[0]} self.heartbeat_discover_mqttc.publish(TOPIC_NODES_REQUEST%hadd, json_dumps(msg))
""" """ if self.heartbeat_discover_mqttc is None: return msg = { 'cmd_class': COMMAND_DISCOVERY, 'genre':0x04, 'uuid':'request_info_basics', 'reply_hadd':self.hadds[0]} self.heartbeat_discover_mqttc.publish(TOPIC_NODES_REQUEST%hadd, json_dumps(msg))
""" """ if self.heartbeat_discover_mqttc is None: return msg = { 'cmd_class': COMMAND_DISCOVERY, 'genre':0x04, 'uuid':'request_info_configs', 'reply_hadd':self.hadds[0]} self.heartbeat_discover_mqttc.publish(TOPIC_NODES_REQUEST%hadd, json_dumps(msg))
""" """ if self.heartbeat_discover_mqttc is None: return msg = { 'cmd_class': COMMAND_DISCOVERY, 'genre':0x04, 'uuid':'request_info_commands', 'reply_hadd':self.hadds[0]} self.heartbeat_discover_mqttc.publish(TOPIC_NODES_REQUEST%hadd, json_dumps(msg))
""" """ if self.resolv_mqttc is None: return msg = { 'cmd_class': COMMAND_DISCOVERY, 'genre':0x04, 'uuid':'request_info_nodes', 'reply_hadd':self.hadds[0]} self.resolv_mqttc.publish(TOPIC_RESOLV_REQUEST, json_dumps(msg))
""" """ if self.resolv_mqttc is None: return msg = { 'cmd_class': COMMAND_DISCOVERY, 'genre':0x04, 'uuid':'request_info_users', 'reply_hadd':self.hadds[0]} self.resolv_mqttc.publish(TOPIC_RESOLV_REQUEST, json_dumps(msg))
""" """ if self.resolv_mqttc is None: return msg = { 'cmd_class': COMMAND_DISCOVERY, 'genre':0x04, 'uuid':'request_info_systems', 'reply_hadd':self.hadds[0]} self.resolv_mqttc.publish(TOPIC_RESOLV_REQUEST, json_dumps(msg))
""" """ if self.resolv_mqttc is None: return msg = { 'cmd_class': COMMAND_DISCOVERY, 'genre':0x04, 'uuid':'request_info_basics', 'reply_hadd':self.hadds[0]} self.resolv_mqttc.publish(TOPIC_RESOLV_REQUEST, json_dumps(msg))
""" """ if self.resolv_mqttc is None: return msg = { 'cmd_class': COMMAND_DISCOVERY, 'genre':0x04, 'uuid':'request_info_configs', 'reply_hadd':self.hadds[0]} self.resolv_mqttc.publish(TOPIC_RESOLV_REQUEST, json_dumps(msg))
""" """ if self.resolv_mqttc is None: return msg = { 'cmd_class': COMMAND_DISCOVERY, 'genre':0x04, 'uuid':'request_info_commands', 'reply_hadd':self.hadds[0]} self.resolv_mqttc.publish(TOPIC_RESOLV_REQUEST, json_dumps(msg))
"""Boot the node manager """ self.hadds = hadds self.start(loop_sleep=loop_sleep)
"""on_heartbeat
:param client: the Client instance that is calling the callback. :type client: paho.mqtt.client.Client :param userdata: user data of any type and can be set when creating a new client instance or with user_data_set(userdata). :type userdata: all :param message: The message variable is a MQTTMessage that describes all of the message parameters. :type message: paho.mqtt.client.MQTTMessage """ logger.debug("[%s] - on_heartbeat %s", self.__class__.__name__, message.payload) hb = HeartbeatMessage(message) add_ctrl, add_node, state = hb.get_heartbeat() if add_ctrl is None or add_node is None: return hadd = HADD % (add_ctrl, add_node) #~ print "!!!!!!!!!!!!!!!!!!! On heartbeat", hadd #~ print self.nodes if hadd not in self.nodes: return if hadd in self.nodes: #~ if hadd in self.nodes and state != self.heartbeat_cache.get_state(add_ctrl, add_node): node.update(self.nodes[hadd]) node['state'] = state if state is not None else 'PENDING' self.emit_node(node) #~ print " node : %s" % self.nodes[hadd] self.heartbeat_cache.update(add_ctrl, add_node, state=state, heartbeat=self.nodes[hadd]['heartbeat'])
"""on_heartbeat_discover
:param client: the Client instance that is calling the callback. :type client: paho.mqtt.client.Client :param userdata: user data of any type and can be set when creating a new client instance or with user_data_set(userdata). :type userdata: all :param message: The message variable is a MQTTMessage that describes all of the message parameters. :type message: paho.mqtt.client.MQTTMessage """ logger.debug("[%s] - on_heartbeat_discover %s", self.__class__.__name__, message.payload) hb = HeartbeatMessage(message) add_ctrl, add_node, state = hb.get_heartbeat() self.incoming_hearbeat(add_ctrl, add_node, state)
""" """ #print add_ctrl, add_node, state if add_ctrl == None: return if add_node == -1: hadd = HADD % (add_ctrl, 0) else: hadd = HADD % (add_ctrl, add_node) #Check if we already know this entry if self.heartbeat_cache.has_entry(add_ctrl, add_node) == False: #NO. So we ask from some info logger.debug("heartbeat from an unknown device %s,%s,%s", add_ctrl, add_node, state) th = threading.Timer(self.request_timeout/3, self.request_node_nodes, [hadd]) th.start() th = threading.Timer(self.request_timeout/2, self.request_node_systems, [hadd]) th.start() th = threading.Timer(self.request_timeout, self.request_node_configs, [hadd]) th.start() th = threading.Timer(self.request_timeout+self.request_timeout/4, self.request_node_basics, [hadd]) th.start() th = threading.Timer(self.request_timeout+self.request_timeout/3, self.request_node_users, [hadd]) th.start() th = threading.Timer(self.request_timeout+self.request_timeout/2, self.request_node_commands, [hadd]) th.start() else : #~ print " node : %s" % self.nodes[hadd] if state != self.heartbeat_cache.get_state(add_ctrl, add_node): node.update(self.nodes[hadd]) node['state'] = state if state != None else 'PENDING' #~ print " node : %s" % node self.emit_node(node) self.heartbeat_cache.update(add_ctrl, add_node, state=state, heartbeat=self.nodes[hadd]['heartbeat'])
""" """ initial_startup = False do_emit = False self._lock.acquire() try: if self.broadcast_nodes_timer != None: #This is the initial startup. initial_startup = True self.broadcast_nodes_timer.cancel() self.broadcast_nodes_timer = None self.broadcast_nodes_timer = threading.Timer(self.broadcast_timeout, self.finish_broadcast_nodes_discover) self.broadcast_nodes_timer.start() if 'hadd' in data: data = {'0':data} for knode in data.keys(): self.nodes[data[knode]['hadd']] = {} self.nodes[data[knode]['hadd']].update(JNTNode().to_dict()) self.nodes[data[knode]['hadd']].update(data[knode]) self.nodes[data[knode]['hadd']].update(data[knode]) add_ctrl, add_node = hadd_split(data[knode]['hadd']) if self.heartbeat_cache.has_entry(add_ctrl, add_node) == False: do_emit = True #~ print self.nodes[data[knode]['hadd']] #~ print self.nodes[data[knode]['hadd']]['heartbeat'] self.heartbeat_cache.update(add_ctrl, add_node, heartbeat=self.nodes[data[knode]['hadd']]['heartbeat']) data[knode]['state'] = 'PENDING' self.emit_node(data) except: logger.exception("Exception in add_nodes") finally: self._lock.release()
""" """ self._lock.acquire() try: if self.broadcast_users_timer != None: self.broadcast_users_timer.cancel() self.broadcast_users_timer = None self.broadcast_users_timer = threading.Timer(self.broadcast_timeout, self.finish_broadcast_users_discover) self.broadcast_users_timer.start() if 'uuid' in data: data = {'0': {'0':data}} elif 'uuid' in data[data.keys()[0]]: data = {'0':data} #~ print "ddddaaaaaaaaaaaaaaaaaaaata : %s" % data for nval in data: for kval in data[nval]: hadd = data[nval][kval]['hadd'] uuid = data[nval][kval]['uuid'] #~ print "haaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaadddddddddd", hadd, uuid index = 0 if 'index' in data[nval][kval]: index = data[nval][kval]['index'] if hadd not in self.users: self.users[hadd] = {} if uuid not in self.users[hadd]: self.users[hadd][uuid] = {} self.users[hadd][uuid][index] = data[nval][kval] #~ print 'add_users', self.users[hadd][uuid][index] #~ print "seeeeeeeeeeeeeeeeeeeeeeeeeeeeeeelf.users" #~ print self.users except: logger.exception("Exception in add_users") finally: self._lock.release()
""" """ self._lock.acquire() try: if self.broadcast_configs_timer != None: self.broadcast_configs_timer.cancel() self.broadcast_configs_timer = None self.broadcast_configs_timer = threading.Timer(self.broadcast_timeout, self.finish_broadcast_configs_discover) self.broadcast_configs_timer.start() if 'uuid' in data: data = {'0': {'0':data}} elif 'uuid' in data[data.keys()[0]]: data = {'0':data} for nval in data: for kval in data[nval]: #~ print "haaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaadddddddddd", data[nval][kval] hadd = data[nval][kval]['hadd'] uuid = data[nval][kval]['uuid'] #~ print "haaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaadddddddddd", hadd, uuid index = 0 if 'index' in data[nval][kval]: index = data[nval][kval]['index'] if hadd not in self.configs: self.configs[hadd] = {} if uuid not in self.configs[hadd]: self.configs[hadd][uuid] = {} self.configs[hadd][uuid][index] = data[nval][kval] #~ for nval in data: #~ for kval in data[nval]: #~ if data[nval][kval]['hadd'] not in self.configs: #~ self.configs[data[nval][kval]['hadd']] = {} #~ self.configs[data[nval][kval]['hadd']][data[nval][kval]['uuid']] = data[nval][kval] #~ print "add_configs self.configs ", self.configs except: logger.exception("Exception in add_configs") finally: self._lock.release()
""" """ self._lock.acquire() try: if self.broadcast_basics_timer != None: self.broadcast_basics_timer.cancel() self.broadcast_basics_timer = None self.broadcast_basics_timer = threading.Timer(self.broadcast_timeout, self.finish_broadcast_basics_discover) self.broadcast_basics_timer.start() if 'uuid' in data: data = {'0': {'0':data}} elif 'uuid' in data[data.keys()[0]]: data = {'0':data} #~ print "ddddaaaaaaaaaaaaaaaaaaaata : %s" % data for nval in data: for kval in data[nval]: hadd = data[nval][kval]['hadd'] uuid = data[nval][kval]['uuid'] #~ print "haaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaadddddddddd", hadd, uuid index = 0 if 'index' in data[nval][kval]: index = data[nval][kval]['index'] if hadd not in self.basics: self.basics[hadd] = {} if uuid not in self.basics[hadd]: self.basics[hadd][uuid] = {} self.basics[hadd][uuid][index] = data[nval][kval] #~ print 'add_basics', self.basics[hadd][uuid][index] #~ print "seeeeeeeeeeeeeeeeeeeeeeeeeeeeeeelf.basics" #~ print self.basics except: logger.exception("Exception in add_basics") finally: self._lock.release()
""" """ self._lock.acquire() try: if self.broadcast_systems_timer != None: self.broadcast_systems_timer.cancel() self.broadcast_systems_timer = None self.broadcast_systems_timer = threading.Timer(self.broadcast_timeout, self.finish_broadcast_systems_discover) self.broadcast_systems_timer.start() if 'uuid' in data: data = {'0': {'0':data}} elif 'uuid' in data[data.keys()[0]]: data = {'0':data} #print "ddddaaaaaaaaaaaaaaaaaaaata : %s" % data for nval in data: for kval in data[nval]: if data[nval][kval]['hadd'] not in self.systems: self.systems[data[nval][kval]['hadd']] = {} self.systems[data[nval][kval]['hadd']][data[nval][kval]['uuid']] = data[nval][kval] data[nval][kval]['node_uuid'] = self.nodes[data[nval][kval]['hadd']] except: logger.exception("Exception in add_systems") finally: self._lock.release()
""" """ self._lock.acquire() try: if self.broadcast_commands_timer != None: self.broadcast_commands_timer.cancel() self.broadcast_commands_timer = None self.broadcast_commands_timer = threading.Timer(self.broadcast_timeout, self.finish_broadcast_commands_discover) self.broadcast_commands_timer.start() if 'uuid' in data: data = {'0': {'0':data}} elif 'uuid' in data[data.keys()[0]]: data = {'0':data} for nval in data: for kval in data[nval]: if data[nval][kval]['hadd'] not in self.commands: self.commands[data[nval][kval]['hadd']] = {} self.commands[data[nval][kval]['hadd']][data[nval][kval]['uuid']] = data[nval][kval] data[nval][kval]['node_uuid'] = self.nodes[data[nval][kval]['hadd']] except: logger.exception("Exception in add_commands") finally: self._lock.release()
""" """ to_polls = [] keys = self.polls.keys() for key in keys: if self.polls[key]['next_run'] < datetime.datetime.now(): to_polls.append(self.polls[key]['value']) if len(to_polls)>0: logger.debug('Found polls in timeout : %s', to_polls) for value in to_polls: self.publish_poll(mqttc, value, self._stopevent) self._stopevent.wait(0.05) try: except ValueError: sleep = 0 if sleep<0: sleep=0.1 self._stopevent.wait(sleep)
def nodes_count(self): """
:rtype: int
""" return len(self.nodes)
def users_count(self): """
:rtype: int
""" return len(self.users)
def configs_count(self): """
:rtype: int
""" return len(self.configs)
def systems_count(self): """
:rtype: int
""" return len(self.systems)
def commands_count(self): """
:rtype: int
""" return len(self.commands)
def basics_count(self): """
:rtype: int
""" return len(self.basics)
def state_str(self): """
:rtype: str
""" return self.states_str[self.state]
def kvals(self): """ The keyvals store in db for this object.
:rtype: {}
""" if self.dbcon is None: return None res = {} cur = self.network.dbcon.cursor() cur.execute("SELECT key,value FROM %s WHERE object_id=%s"%(self.__class__.__name__, self.object_id)) row = cur.fetchone() if row == None: break res[row[0]] = row[1] return res
def kvals(self, kvs): """ The keyvals store in db for this object.
:param kvs: The key/valuse to store in db. Setting a value to None will remove it. :type kvs: {} :rtype: boolean
""" if self.dbcon is None: return False if len(kvs) == 0: return True cur = self.network.dbcon.cursor() for key in kvs.keys(): logger.debug("DELETE FROM %s WHERE object_id=%s and key='%s'", self.__class__.__name__, self.object_id, key) if kvs[key] is not None: logger.debug("INSERT INTO %s(object_id, 'key', 'value') VALUES (%s,'%s','%s');", self.__class__.__name__, self.object_id, key, kvs[key]) cur.execute("INSERT INTO %s(object_id, 'key', 'value') VALUES (%s,'%s','%s');"%(self.__class__.__name__, self.object_id, key, kvs[key])) self.network.dbcon.commit() return True
"""Update internal dict from adict """ for field in ['is_primary','do_heartbeat_dispatch','is_secondary','is_failed','is_stopped','is_started']: if field in adict: try: if type(adict[field]) == type(''): adict[field] = string_to_bool(adict[field]) except ValueError: logger.exception("Error in from_dict") for field in ['broadcast_timeout','resolv_timeout','request_timeout']: if field in adict: try: except ValueError: logger.exception("Error in fron_dict") if 'resolv_timeout' in adict and 'request_timeout' in adict: assert (adict['resolv_timeout'] > adict['request_timeout']),"request_timeout must be smaller than resolv_timeout"
"""Retrieves scenes on the network """
"""Emit a scene state event """ pass
"""Emit a scene state event nodes : a single scene or a dict of scene """
"""Retrieves scenarios on the network """
"""Emit a scenario state event """ pass
"""Emit a scenario state event nodes : a single scenario or a dict of scenario """
"""Retrieves crons on the network """
"""Emit a cron state event """ pass
"""Emit a cron state event nodes : a single cron or a dict of cron """ pass
"""Check the states of the machine. Must be called in a timer Called in a separate thread. Must use a scoped_session.
:param session: the session to use to communicate with db. May be a scoped_session if used in a separate tread. If None, use the common session. :type session: sqlalchemy session """ now = datetime.datetime.now() lleases = list() for ctrl in entries.keys(): for node in entries[ctrl].keys(): if (now - entries[ctrl][node]['last_seen']).total_seconds() > heartbeat_timeout \ and entries[ctrl][node]['state'] in ['online', 'boot', 'pending', 'failed']: lleases.append((ctrl, node)) for ctrl, node in lleases: if entries[ctrl][node]['state'] == 'failed' \ and (now - entries[ctrl][node]['last_seen']).total_seconds() > heartbeat_dead: entries[ctrl][node]['state'] = 'dead' entries[ctrl][node]['count'] = 0 else : if "count" not in entries[ctrl][node]: entries[ctrl][node]['count'] = 1 else: entries[ctrl][node]['count'] += 1 if entries[ctrl][node]['count'] >= heartbeat_count: #The count is reached #We need to change the state if entries[ctrl][node]['state'] == 'online': entries[ctrl][node]['state'] = 'pending' entries[ctrl][node]['count'] = 0 elif entries[ctrl][node]['state'] == 'boot': entries[ctrl][node]['state'] = 'pending' entries[ctrl][node]['count'] = 0 elif entries[ctrl][node]['state'] == 'pending': entries[ctrl][node]['state'] = 'failed' entries[ctrl][node]['count'] = 0
|