Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
# -*- coding: utf-8 -*-
Define a node for the cpu with 3 values : temperature, frequency and voltage
http://www.maketecheasier.com/finding-raspberry-pi-system-information/
"""
This file is part of Janitoo.
Janitoo is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
Janitoo is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with Janitoo. If not, see <http://www.gnu.org/licenses/>.
"""
# Set default logging handler to avoid "No handler found" warnings.
############################################################## #Check that we are in sync with the official command classes #Must be implemented for non-regression
##############################################################
"""The Rdd cache thread
Implement a cache.
""" """Initialise the cache thread
Manage a cache for the rrd.
A timer in a separated thread will pickle the cache to disk every 30 seconds.
An other thread will update the rrd every hours
:param options: The options used to start the worker. :type clientid: str """ #~ self._cache_rrd_ttl = 60*60 """ whe have a datasource with a unique_name haddnode but at component start, we don't have it. whe receive data from mqtt with params : haddvalue, value_uuid, value_index at etime
whe should update the rrd with : rtime:val1:val2:val3 ...
Whe should be abble to flush a rrd file for graphing
cache { 'rrd_0001' : { 'step':300, 'values' : {'epoch_time' : { 0 : 'U', ...}}, 'labels':{0:...}}
When updating a data, we can find the latest serie values filtering on epoch_time
We should look at the cache to find rrd_files that need a new serie of values (now - last_epoch) > step. In this case, we add a new serie of with 'U' as values.
hadd value_uuid value_index """ """ """
dirmask=RRD_DIR, filemask='%(rrd_file)s.rrd'): """ """
"""Launch before entering the run loop. The node manager is available. """
"""Launch after finishing the run loop. The node manager is still available. """ #~ self.flush_all() self._mqttc.join()
"""Launch after finishing the run loop. The node manager is still available. """ #Check for data that need a rotation #~ print "epochs : ", epochs #~ print "condition : ", epochs[0] + self._cache_rrd_ttl, etime #We should rotate the values #We should flush to the rrd self.flush(key) except: logger.exception("[%s] - Exception when rotating %s in cache", self.__class__.__name__, key) except: logger.exception("[%s] - Exception when rotating in cache", self.__class__.__name__)
"""On value
Do not lock as it don't add new values to dict. Should be ok using keys instead of iterator.
:param client: the Client instance that is calling the callback. :type client: paho.mqtt.client.Client :param userdata: user data of any type and can be set when creating a new client instance or with user_data_set(userdata). :type userdata: all :param message: The message variable is a MQTTMessage that describes all of the message parameters. :type message: paho.mqtt.client.MQTTMessage """ #logger.debug("[%s] - on_value %s", self.__class__.__name__, message.payload) #~ logger.debug("[%s] - update_last %s,%s,%s : %s", self.__class__.__name__, hadd, value_uuid, value_index) self.update_last(hadd, uuid, index, data[nval][kval]['data']) except: logger.exception("[%s] - Exception in on_value", self.__class__.__name__)
""" Create an in dex of keys :ret: a list of tuple () of values in cache """ self._cache[rrd]['uuids'][index], \ self._cache[rrd]['indexes'][index]) ) except: logger.exception('[%s] - Exception in create_store_index : rrd= %s, index = %s', self.__class__.__name__, rrd, index) except: logger.exception('[%s] - Exception in create_store_index : rrd = %s', self.__class__.__name__, rrd) #~ logger.debug("[%s] - create_store_index %s", self.__class__.__name__, ret)
""" An helper to find :ret: """ #~ logger.debug("[%s] - update_last %s,%s,%s : %s", self.__class__.__name__, hadd, value_uuid, value_index, data) ret = [] rrds = self._cache.keys() for rrd in rrds: indexes = self._cache[rrd]['indexes'].keys() for index in indexes: if self._cache[rrd]['hadds'][index]==hadd and \ self._cache[rrd]['uuids'][index]==value_uuid and \ self._cache[rrd]['indexes'][index]==value_index: epochs = sorted(self._cache[rrd]['values'].keys()) if len(epochs) == 0: logger.warning("[%s] - Can't update value. No epoch found for %s", self.__class__.__name__, rrd) else: if data is None: data = 'U' self._cache[rrd]['values'][epochs[-1]][index] = data self._cache[rrd]['last_update'] = datetime.datetime.now() logger.debug("[%s] - Value updated in store %s,%s,%s : %s", self.__class__.__name__, hadd, value_uuid, value_index, data)
"""Rotate via a separate thread in a timer """ return False etime = (datetime.datetime.now() - self.epoch).total_seconds()
"""Rotate """ etime = (datetime.datetime.now() - self.epoch).total_seconds() return False except: logger.exception("[%s] - Exception when rotating %s in cache", self.__class__.__name__, rrd_file) finally:
"""Run the loop """ #~ self.boot() except: logger.exception('[%s] - Exception in pre_loop', self.__class__.__name__) self._stopevent.set() except: logger.exception('[%s] - Exception in post_loop', self.__class__.__name__)
"""Get and create the direcotry if needed """
"""Get and create the direcotry if needed """
""" """
"""Restore data from disk using pickle """
"""Restore data from disk using pickle """
""" """ logger.debug("Can't retrieve rrd_file %s", rrd_file) return None
""" """ logger.debug("Can't retrieve rrd_file %s", rrd_file) return None
""" """ if rrd_file in self._cache and index in self._cache[rrd_file]["labels"]: return self._cache[rrd_file]["labels"][index] except: logger.exception("[%s] - Can't retrieve add_ctrl, add_node from hadd %s", self.__class__.__name__, hadd) return None
"""Dump data to disk using pickle """ except: logger.exception("[%s] - Exception when dumping data to file", self.__class__.__name__) finally:
"""Restore data from disk using pickle """ logger.exception("[%s] - Exception when restoring data from dump", self.__class__.__name__) finally:
""" """
"""Remove dead entries from cache """ self.stop_timer_dead() logger.debug("[%s] - Remove dead entries in cache", self.__class__.__name__) try: now = datetime.datetime.now() dead_time = now - datetime.timedelta(seconds=self._cache_dead_ttl) for key in self._cache.keys(): self._lock.acquire() if 'last_update' not in self._cache[key]: self._cache[key]['last_update'] = now try: if key in self._cache and self._cache[key]['last_update'] < dead_time: logger.debug("[%s] - Remove dead entries in cache : %s", self.__class__.__name__, key) self.remove_rrd_from_list(key) del self._cache[key] except: logger.exception("[%s] - Exception when removing dead entry %s in cache", self.__class__.__name__, key) finally: self._lock.release() except: logger.exception("[%s] - Exception when removing dead entries", self.__class__.__name__) self.start_timer_dead()
""" """
""" """
"""Dump cache to file using pickle """
""" """
"""Flush all data via a separate thread in a timer """ if hadd is None or value_uuid is None or value_index is None: return False th = threading.Timer(self._thread_delay, self.flush_all, args=(hadd, value_uuid, value_index)) th.start()
"""Flush all data to rrd files and remove them from cache """ rrds = self._cache.keys() for rrd in rrds: try: self.flush(rrd) except: logger.exception("[%s] - Exception in flush_all : rrd = %s", self.__class__.__name__, rrd)
"""Flush data from in cache from a value via a separate thread in a timer """ if hadd is None or value_uuid is None or value_index is None: return False th = threading.Timer(self._thread_delay, self.flush, args=(rrd_file)) th.start()
"""Flush data from a value to rrd file and remove them from cache """ if rrd_file is None or rrd_file not in self._cache: return False self._lock.acquire() logger.info("[%s] - Flush rrd_file %s", self.__class__.__name__, rrd_file) try: rrd_dict = self._cache[rrd_file] epochs = sorted(rrd_dict['values'].keys()) rrd_data = [self.get_rrd_filename(rrd_file)] last_epoch = epochs[-1] for epoch in epochs: try: rrd_line = "" if epoch != last_epoch: #We must let the last serie in cache #Otherwise we could raise : # error: /tmp/janitoo/home/public/datarrd_store/rrd/open_files.rrd: illegal attempt to update using time 1443837167 when last # update time is 1443837240 (minimum one second step) rrd_line = '%s' %(epoch) for key_idx in rrd_dict['values'][epoch]: try: val = 'U' if rrd_dict['values'][epoch][key_idx] is not None: val = rrd_dict['values'][epoch][key_idx] rrd_line = '%s:%s' %(rrd_line, val) except: rrd_line = '%s:%s' %(rrd_line, 'U') logger.exception("[%s] - Exception when flushing cache for %s epoch %s:%s", self.__class__.__name__, rrd_file, epoch, key_idx) del self._cache[rrd_file]['values'][epoch] if rrd_line != "": rrd_data.append(rrd_line) except: logger.exception("[%s] - Exception when flushing cache for %s epoch %s", self.__class__.__name__, rrd_file, epoch) if len (rrd_data) > 1: rrdtool.update(rrd_data) except: logger.exception("[%s] - Exception when flushing cache for %s", self.__class__.__name__, rrd_file) finally: self._lock.release()
"""Retrieve the number of values cached """
"""Retrieve the number of series of values cached """
"""Return a list of tuples (hadd, value_uuid, value_index) of values in timeout. They must be flush to disk """ return 0
"""Remove an entry in cache and its rrd file """ if rrd_file not in self._cache: logger.warning("[%s] - Remove a non existent entry [%s] from cache ", self.__class__.__name__, rrd_file) if len(self._cache[rrd_file]['values']) > 0: logger.warning("[%s] - Remove a non empty entry [%s] from cache : %s ", self.__class__.__name__, rrd_file, self._cache[rrd_file]) self._lock.acquire() try: filename = self.get_rrd_filename(rrd_file) if os.path.exists(filename) == True: os.remove(filename) if rrd_file is not None and rrd_file in self._cache: del self._cache[rrd_file] except: logger.exception("[%s] - Exception when removing config", self.__class__.__name__) finally: self._lock.release()
"""Add a config via a separate thread in a timer """
""" ret = rrdtool.create("example.rrd", "--step", "1800", "--start", '0', "DS:metric1:GAUGE:2000:U:U", "DS:metric2:GAUGE:2000:U:U", "RRA:AVERAGE:0.5:1:600", "RRA:AVERAGE:0.5:6:700", "RRA:AVERAGE:0.5:24:775", "RRA:AVERAGE:0.5:288:797", "RRA:MAX:0.5:1:600", "RRA:MAX:0.5:6:700", "RRA:MAX:0.5:24:775", "RRA:MAX:0.5:444:797")
Let’s consider all lines in details. First line include name of RRD database (“example.rrd”) and you can use here any path you want, step of parameters checking (30 minutes in our case), and the start point (0 or N means ‘now’). ‘DS’ in line 4-5 means Data Source, these lines include two our metrics. ‘2000’ means that RRD can wait for 2000 seconds to get new values until it considers them as unknown (that’s is why we use 2000, which 200 seconds more of our 30 minutes interval). Last two parameters – ‘U:U’ – stand for min and max values of each metric (‘unknown’ in our case). Lines 6-13 describe what types of gained values RRD should store in its database. It’s pretty self-describing (average and max values). Mentioned values describe how many parameters RRD should keep. Considering it can be confusing I will omit explanation but note that these values were choosen to be compatible with MRTG (actually, it’s not quite true since we use 1800 seconds periods and not 5 minutes, so you might need to modify it (if you also don’t use 5 minutes period) or keep like I did).
GAUGE, COUNTER, DERIVE or ABSOLUTE """ #print "add_config", rrd_file, config logger.warning("[%s] - Can't add %s in cache", self.__class__.__name__, rrd_file) except: logger.exception("[%s] - Exception when adding config in cache", self.__class__.__name__) finally: #print "rrd_sources :", rrd_sources rrd_sources, "RRA:AVERAGE:0.5:1:1440", "RRA:AVERAGE:0.5:12:1440", "RRA:AVERAGE:0.5:144:1440", "RRA:AVERAGE:0.5:288:1440", "RRA:MAX:0.5:1:1440", "RRA:MAX:0.5:12:1440", "RRA:MAX:0.5:144:1440", "RRA:MAX:0.5:288:1440", "RRA:MIN:0.5:1:1440", "RRA:MIN:0.5:12:1440", "RRA:MIN:0.5:144:1440", "RRA:MIN:0.5:288:1440") except: logger.exception("[%s] - Exception when creating rrd file %s", self.__class__.__name__, rrd_file)
"""Add the rrd_file to index.txt """
"""Remove the rrd from index.txt """ filename = self.get_list_filename() rrd_list = [] if os.path.exists(filename) == True: with open(filename) as file: # Use file to refer to the file object data = file.read() rrd_list = data.split("|") if rrd_file not in rrd_list: return rrd_list.remove(rrd_file) line = '|'.join(rrd_list) with open(filename, "w") as file: file.write(line)
"""A pseudo-bus to manage RRDTools """ """ :param int bus_id: the SMBus id (see Raspberry Pi documentation) :param kwargs: parameters transmitted to :py:class:`smbus.SMBus` initializer """
node_uuid=self.uuid, help='The number of values in the store', label='#Values', get_data_cb=self.get_count_values, genre=0x01, )
node_uuid=self.uuid, help='The number of series of values in the store', label='#Series', get_data_cb=self.get_count_series, genre=0x01, )
node_uuid=self.uuid, help='The cache_rrd_ttl', label='Rrd ttl', default=60*60, )
node_uuid=self.uuid, help='The cache_pickle_ttl', label='Pickle ttl', default=60, )
node_uuid=self.uuid, help='The cache_dead_ttl', label='Cache dead ttl', default=60*60*24, )
node_uuid=self.uuid, help='The action on the store', label='Actions', list_items=['flush'], set_data_cb=self.set_action, is_writeonly = True, cmd_class = COMMAND_CONTROLLER, genre=0x01, )
"""Check that the component is 'available'
""" #~ print "it's me %s : %s" % (self.values['upsname'].data, self._ups_stats_last) if self.store is not None: return self.store.is_alive() return False
""" """ return 0
""" """ return 0
"""Act on the server """ params = {} if data == "flush": if self.store is not None: self.store.flush()
cache_pickle_ttl=self.values["cache_pickle_ttl"].data, cache_dead_ttl=self.values["cache_dead_ttl"].data)
"""Return the name of the package. Needed to publish static files
**MUST** be copy paste in every extension that publish statics files """ return __package__ |