In [ ]:
Copied!
! python -m pip install --upgrade https://github.com/fbeneventi/releases/releases/latest/download/examon-common-py3.zip
! python -m pip install --upgrade https://github.com/fbeneventi/releases/releases/latest/download/examon-common-py3.zip
Configure¶
The below cell will create the examon_pub.conf file that should be edited according to the server configuration
In [18]:
Copied!
%%file examon_pub.conf
; Sample examon publisher config file.
;
; The below section collects all the settings related to the
; MQTT transport layer
[MQTT]
; MQTT broker IP address and port
MQTT_BROKER = 127.0.0.1
MQTT_PORT = 1883
; MQTT output topic (optional). This setting is used only with
; the 'json' and 'bulk' MQTT output formats
MQTT_TOPIC =
; To be used when password authentication is enabled (optional)
MQTT_USER =
MQTT_PASSWORD =
; The below section collects all the settings related to the
; KairosDB database
[KairosDB]
; KairosDB server IP address and port
K_SERVERS =
K_PORT =
; To be used when password authentication is enabled (optional)
K_USER =
K_PASSWORD =
; The below section collects all the settings related to the
; ExaMon collector
[Daemon]
; Default sampling interval in seconds (float)
TS = 2
; Path to the log file
LOG_FILENAME = examon_pub.log
; Path to the pid file
PID_FILENAME = examon_pub.pid
%%file examon_pub.conf
; Sample examon publisher config file.
;
; The below section collects all the settings related to the
; MQTT transport layer
[MQTT]
; MQTT broker IP address and port
MQTT_BROKER = 127.0.0.1
MQTT_PORT = 1883
; MQTT output topic (optional). This setting is used only with
; the 'json' and 'bulk' MQTT output formats
MQTT_TOPIC =
; To be used when password authentication is enabled (optional)
MQTT_USER =
MQTT_PASSWORD =
; The below section collects all the settings related to the
; KairosDB database
[KairosDB]
; KairosDB server IP address and port
K_SERVERS =
K_PORT =
; To be used when password authentication is enabled (optional)
K_USER =
K_PASSWORD =
; The below section collects all the settings related to the
; ExaMon collector
[Daemon]
; Default sampling interval in seconds (float)
TS = 2
; Path to the log file
LOG_FILENAME = examon_pub.log
; Path to the pid file
PID_FILENAME = examon_pub.pid
Overwriting examon_pub.conf
Example¶
This is the main file where the publisher is defined. In this example, a dummy Sensor class creates some random data to be published.
In [19]:
Copied!
%%file examon_pub.py
import json
import time
import random
from examon.plugin.examonapp import ExamonApp
from examon.plugin.sensorreader import SensorReader
class Sensor:
def __init__(self, sensor_name='random_sensor', range_min=0, range_max=100.0):
self.sensor_name = sensor_name
self.range_min = range_min
self.range_max = range_max
def get_sensor_data(self):
return {
'sensor_name': self.sensor_name,
'value': random.uniform(self.range_min, self.range_max)
}
def read_data(self):
pass
def read_data(sr):
# get timestamp and data
timestamp = int(time.time()*1000)
raw_data = sr.sensor.get_sensor_data()
# build the examon metric
metric = {}
metric['name'] = raw_data['sensor_name']
metric['value'] = raw_data['value']
metric['timestamp'] = timestamp
metric['tags'] = sr.get_tags()
# return format:
# * list of metrics
examon_data = [metric]
# * worker id (string) useful for debug/log
worker_id = sr.sensor.sensor_name
return (worker_id, examon_data,)
def worker(conf, tags):
"""
Worker process code
"""
# sensor instance
sensor = Sensor()
# SensorReader app
sr = SensorReader(conf, sensor)
# add read_data callback
sr.read_data = read_data
# set the default tags
sr.add_tags(tags)
# run the worker loop
sr.run()
if __name__ == '__main__':
# start creating an Examon app
app = ExamonApp()
app.parse_opt()
# for checking
print("Config:")
print(json.dumps(app.conf, indent=4))
# set default metrics tags
tags = app.examon_tags()
tags['org'] = 'examon'
tags['plugin'] = 'examon_pub'
tags['chnl'] = 'data'
# add a worker
app.add_worker(worker, app.conf, tags)
# run!
app.run()
%%file examon_pub.py
import json
import time
import random
from examon.plugin.examonapp import ExamonApp
from examon.plugin.sensorreader import SensorReader
class Sensor:
def __init__(self, sensor_name='random_sensor', range_min=0, range_max=100.0):
self.sensor_name = sensor_name
self.range_min = range_min
self.range_max = range_max
def get_sensor_data(self):
return {
'sensor_name': self.sensor_name,
'value': random.uniform(self.range_min, self.range_max)
}
def read_data(self):
pass
def read_data(sr):
# get timestamp and data
timestamp = int(time.time()*1000)
raw_data = sr.sensor.get_sensor_data()
# build the examon metric
metric = {}
metric['name'] = raw_data['sensor_name']
metric['value'] = raw_data['value']
metric['timestamp'] = timestamp
metric['tags'] = sr.get_tags()
# return format:
# * list of metrics
examon_data = [metric]
# * worker id (string) useful for debug/log
worker_id = sr.sensor.sensor_name
return (worker_id, examon_data,)
def worker(conf, tags):
"""
Worker process code
"""
# sensor instance
sensor = Sensor()
# SensorReader app
sr = SensorReader(conf, sensor)
# add read_data callback
sr.read_data = read_data
# set the default tags
sr.add_tags(tags)
# run the worker loop
sr.run()
if __name__ == '__main__':
# start creating an Examon app
app = ExamonApp()
app.parse_opt()
# for checking
print("Config:")
print(json.dumps(app.conf, indent=4))
# set default metrics tags
tags = app.examon_tags()
tags['org'] = 'examon'
tags['plugin'] = 'examon_pub'
tags['chnl'] = 'data'
# add a worker
app.add_worker(worker, app.conf, tags)
# run!
app.run()
Overwriting examon_pub.py
Execution¶
The Examon publisher created above can be executed from the shell and the default configuration can be changed using the command line parameters.
In [20]:
Copied!
! python examon_pub.py -h
! python examon_pub.py -h
usage: examon_pub.py [-h] [-b MQTT_BROKER] [-p MQTT_PORT] [-t MQTT_TOPIC] [-s TS] [-x PID_FILENAME] [-l LOG_FILENAME] [-d {mqtt,kairosdb}] [-f {csv,json,bulk}] [--compress] [--kairosdb-server K_SERVERS] [--kairosdb-port K_PORT] [--kairosdb-user K_USER] [--kairosdb-password K_PASSWORD] [--logfile-size LOGFILE_SIZE_B] [--loglevel {DEBUG,INFO,WARNING,ERROR,CRITICAL}] [--dry-run] [--mqtt-user MQTT_USER] [--mqtt-password MQTT_PASSWORD] {run,start,restart,stop} positional arguments: {run,start,restart,stop} Run mode optional arguments: -h, --help show this help message and exit -b MQTT_BROKER IP address of the MQTT broker -p MQTT_PORT Port of the MQTT broker -t MQTT_TOPIC MQTT topic -s TS Sampling time (seconds) -x PID_FILENAME pid filename -l LOG_FILENAME log filename -d {mqtt,kairosdb} select where to send data (default: mqtt) -f {csv,json,bulk} MQTT payload format (default: csv) --compress enable payload compression (default: False) --kairosdb-server K_SERVERS kairosdb servers --kairosdb-port K_PORT kairosdb port --kairosdb-user K_USER kairosdb username --kairosdb-password K_PASSWORD kairosdb password --logfile-size LOGFILE_SIZE_B log file size (max) in bytes --loglevel {DEBUG,INFO,WARNING,ERROR,CRITICAL} log level --dry-run Data is not sent to the broker if True (default: False) --mqtt-user MQTT_USER MQTT username --mqtt-password MQTT_PASSWORD MQTT password
Dry Run¶
Before the actual execution can be useful a "dry run" to check the final payload. The MQTT packet (topic, payload) is printed in the lines that have the tag "[MqttPub]".
In [21]:
Copied!
! python examon_pub.py run --dry-run --loglevel=DEBUG
! python examon_pub.py run --dry-run --loglevel=DEBUG
Config: { "MQTT_BROKER": "127.0.0.1", "MQTT_PORT": "1883", "MQTT_TOPIC": "", "MQTT_USER": "", "MQTT_PASSWORD": "", "K_SERVERS": "", "K_PORT": "", "K_USER": "", "K_PASSWORD": "", "TS": "2", "LOG_FILENAME": "examon_pub.log", "PID_FILENAME": "examon_pub.pid", "runmode": "run", "OUT_PROTOCOL": "mqtt", "MQTT_FORMAT": "csv", "COMPRESS": false, "LOGFILE_SIZE_B": 5242880, "LOG_LEVEL": "DEBUG", "DRY_RUN": true } Starting jobs... INFO - 04/01/2022 06:31:27 PM - [Process-1] - [mqtt.py] - examon.transport.mqtt - Connecting to MQTT server: 127.0.0.1:1883 DEBUG - 04/01/2022 06:31:27 PM - [Process-1] - [mqtt.py] - examon.transport.mqtt - MQTT logs: Sending CONNECT (u0, p0, wr0, wq0, wf0, c1, k60) client_id=b'' DEBUG - 04/01/2022 06:31:27 PM - [Process-1] - [mqtt.py] - examon.transport.mqtt - Connect rc: 0 INFO - 04/01/2022 06:31:27 PM - [Process-1] - [mqtt.py] - examon.transport.mqtt - MQTT started DEBUG - 04/01/2022 06:31:27 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Start timeout timer DEBUG - 04/01/2022 06:31:27 PM - [Process-1] - [mqtt.py] - examon.transport.mqtt - MQTT logs: Received CONNACK (0, 0) INFO - 04/01/2022 06:31:27 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Worker [random_sensor] - Retrieved and processed 1 metrics in 0.000117 seconds INFO - 04/01/2022 06:31:27 PM - [Process-1] - [mqtt.py] - examon.transport.mqtt - Connected with result code 0 DEBUG - 04/01/2022 06:31:27 PM - [Process-1] - [mqtt.py] - examon.transport.mqtt - [MqttPub] Topic: org/examon/plugin/examon_pub/chnl/data/random_sensor - Payload: 78.60698026998385;1648830687.054 DEBUG - 04/01/2022 06:31:27 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Worker [random_sensor] - Insert: 1 sensors, time: 0.004744 sec, insert_rate: 210.779637 sens/sec DEBUG - 04/01/2022 06:31:27 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Cancel timeout timer DEBUG - 04/01/2022 06:31:27 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Start new loop DEBUG - 04/01/2022 06:31:28 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Start timeout timer INFO - 04/01/2022 06:31:28 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Worker [random_sensor] - Retrieved and processed 1 metrics in 0.000082 seconds DEBUG - 04/01/2022 06:31:28 PM - [Process-1] - [mqtt.py] - examon.transport.mqtt - [MqttPub] Topic: org/examon/plugin/examon_pub/chnl/data/random_sensor - Payload: 7.620401543358602;1648830688.004 DEBUG - 04/01/2022 06:31:28 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Worker [random_sensor] - Insert: 1 sensors, time: 0.002280 sec, insert_rate: 438.551234 sens/sec DEBUG - 04/01/2022 06:31:28 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Cancel timeout timer DEBUG - 04/01/2022 06:31:28 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Start new loop DEBUG - 04/01/2022 06:31:30 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Start timeout timer INFO - 04/01/2022 06:31:30 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Worker [random_sensor] - Retrieved and processed 1 metrics in 0.000079 seconds DEBUG - 04/01/2022 06:31:30 PM - [Process-1] - [mqtt.py] - examon.transport.mqtt - [MqttPub] Topic: org/examon/plugin/examon_pub/chnl/data/random_sensor - Payload: 40.6563350814879;1648830690.006 DEBUG - 04/01/2022 06:31:30 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Worker [random_sensor] - Insert: 1 sensors, time: 0.002456 sec, insert_rate: 407.213981 sens/sec DEBUG - 04/01/2022 06:31:30 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Cancel timeout timer DEBUG - 04/01/2022 06:31:30 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Start new loop DEBUG - 04/01/2022 06:31:32 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Start timeout timer INFO - 04/01/2022 06:31:32 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Worker [random_sensor] - Retrieved and processed 1 metrics in 0.000079 seconds DEBUG - 04/01/2022 06:31:32 PM - [Process-1] - [mqtt.py] - examon.transport.mqtt - [MqttPub] Topic: org/examon/plugin/examon_pub/chnl/data/random_sensor - Payload: 1.0554041222391009;1648830692.005 DEBUG - 04/01/2022 06:31:32 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Worker [random_sensor] - Insert: 1 sensors, time: 0.002471 sec, insert_rate: 404.621262 sens/sec DEBUG - 04/01/2022 06:31:32 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Cancel timeout timer DEBUG - 04/01/2022 06:31:32 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Start new loop DEBUG - 04/01/2022 06:31:34 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Start timeout timer INFO - 04/01/2022 06:31:34 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Worker [random_sensor] - Retrieved and processed 1 metrics in 0.000082 seconds DEBUG - 04/01/2022 06:31:34 PM - [Process-1] - [mqtt.py] - examon.transport.mqtt - [MqttPub] Topic: org/examon/plugin/examon_pub/chnl/data/random_sensor - Payload: 76.4034034158195;1648830694.007 DEBUG - 04/01/2022 06:31:34 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Worker [random_sensor] - Insert: 1 sensors, time: 0.002533 sec, insert_rate: 394.795181 sens/sec DEBUG - 04/01/2022 06:31:34 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Cancel timeout timer DEBUG - 04/01/2022 06:31:34 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Start new loop DEBUG - 04/01/2022 06:31:36 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Start timeout timer INFO - 04/01/2022 06:31:36 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Worker [random_sensor] - Retrieved and processed 1 metrics in 0.000081 seconds DEBUG - 04/01/2022 06:31:36 PM - [Process-1] - [mqtt.py] - examon.transport.mqtt - [MqttPub] Topic: org/examon/plugin/examon_pub/chnl/data/random_sensor - Payload: 79.65416560731711;1648830696.006 DEBUG - 04/01/2022 06:31:36 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Worker [random_sensor] - Insert: 1 sensors, time: 0.002764 sec, insert_rate: 361.765051 sens/sec DEBUG - 04/01/2022 06:31:36 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Cancel timeout timer DEBUG - 04/01/2022 06:31:36 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Start new loop DEBUG - 04/01/2022 06:31:38 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Start timeout timer INFO - 04/01/2022 06:31:38 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Worker [random_sensor] - Retrieved and processed 1 metrics in 0.000080 seconds DEBUG - 04/01/2022 06:31:38 PM - [Process-1] - [mqtt.py] - examon.transport.mqtt - [MqttPub] Topic: org/examon/plugin/examon_pub/chnl/data/random_sensor - Payload: 56.91540569779116;1648830698.005 DEBUG - 04/01/2022 06:31:38 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Worker [random_sensor] - Insert: 1 sensors, time: 0.002786 sec, insert_rate: 358.886284 sens/sec DEBUG - 04/01/2022 06:31:38 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Cancel timeout timer DEBUG - 04/01/2022 06:31:38 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Start new loop ^C Interrupted.. Process Process-1:
Run¶
Actual execution
In [22]:
Copied!
! python examon_pub.py run --loglevel=WARNING
! python examon_pub.py run --loglevel=WARNING
Config: { "MQTT_BROKER": "127.0.0.1", "MQTT_PORT": "1883", "MQTT_TOPIC": "", "MQTT_USER": "", "MQTT_PASSWORD": "", "K_SERVERS": "", "K_PORT": "", "K_USER": "", "K_PASSWORD": "", "TS": "2", "LOG_FILENAME": "examon_pub.log", "PID_FILENAME": "examon_pub.pid", "runmode": "run", "OUT_PROTOCOL": "mqtt", "MQTT_FORMAT": "csv", "COMPRESS": false, "LOGFILE_SIZE_B": 5242880, "LOG_LEVEL": "WARNING", "DRY_RUN": false } Starting jobs... ^C Interrupted.. Process Process-1:
Run as a service (daemon mode)¶
In this example, the publisher is executed in daemon mode
$ python examon_pub.py start --loglevel=WARNING
To stop it:
$ python examon_pub.py stop
Multiple sensors example¶
This example shows how to handle multiple sensors and dynamic tags.
In [23]:
Copied!
%%file examon_pub.py
import json
import time
import random
from examon.plugin.examonapp import ExamonApp
from examon.plugin.sensorreader import SensorReader
class Sensor:
def __init__(self, sensor_name='random_sensor', range_min=0, range_max=100.0):
self.sensor_name = sensor_name
self.range_min = range_min
self.range_max = range_max
def get_sensor_data(self, num_sensors=10):
payload = []
for s in range(0, num_sensors):
payload.append({
'sensor_name': self.sensor_name,
'id': str(s),
'value': random.uniform(self.range_min, self.range_max)
})
return payload
def read_data(self):
pass
def read_data(sr):
# get timestamp and data
timestamp = int(time.time()*1000)
raw_packet = sr.sensor.get_sensor_data()
# build the examon metric
examon_data = []
for raw_data in raw_packet:
metric = {}
metric['name'] = raw_data['sensor_name']
metric['value'] = raw_data['value']
metric['timestamp'] = timestamp
metric['tags'] = sr.get_tags()
# dynamically add new custom tags
metric['tags']['id'] = str(raw_data['id'])
# build the final packet
examon_data.append(metric)
# worker id (string) useful for debug/log
worker_id = sr.sensor.sensor_name
return (worker_id, examon_data,)
def worker(conf, tags):
"""
Worker process code
"""
# sensor instance
sensor = Sensor()
# SensorReader app
sr = SensorReader(conf, sensor)
# add read_data callback
sr.read_data = read_data
# set the default tags
sr.add_tags(tags)
# run the worker loop
sr.run()
if __name__ == '__main__':
# start creating an Examon app
app = ExamonApp()
app.parse_opt()
# for checking
print("Config:")
print(json.dumps(app.conf, indent=4))
# set default metrics tags
tags = app.examon_tags()
tags['org'] = 'examon'
tags['plugin'] = 'examon_pub'
tags['chnl'] = 'data'
# add a worker
app.add_worker(worker, app.conf, tags)
# run!
app.run()
%%file examon_pub.py
import json
import time
import random
from examon.plugin.examonapp import ExamonApp
from examon.plugin.sensorreader import SensorReader
class Sensor:
def __init__(self, sensor_name='random_sensor', range_min=0, range_max=100.0):
self.sensor_name = sensor_name
self.range_min = range_min
self.range_max = range_max
def get_sensor_data(self, num_sensors=10):
payload = []
for s in range(0, num_sensors):
payload.append({
'sensor_name': self.sensor_name,
'id': str(s),
'value': random.uniform(self.range_min, self.range_max)
})
return payload
def read_data(self):
pass
def read_data(sr):
# get timestamp and data
timestamp = int(time.time()*1000)
raw_packet = sr.sensor.get_sensor_data()
# build the examon metric
examon_data = []
for raw_data in raw_packet:
metric = {}
metric['name'] = raw_data['sensor_name']
metric['value'] = raw_data['value']
metric['timestamp'] = timestamp
metric['tags'] = sr.get_tags()
# dynamically add new custom tags
metric['tags']['id'] = str(raw_data['id'])
# build the final packet
examon_data.append(metric)
# worker id (string) useful for debug/log
worker_id = sr.sensor.sensor_name
return (worker_id, examon_data,)
def worker(conf, tags):
"""
Worker process code
"""
# sensor instance
sensor = Sensor()
# SensorReader app
sr = SensorReader(conf, sensor)
# add read_data callback
sr.read_data = read_data
# set the default tags
sr.add_tags(tags)
# run the worker loop
sr.run()
if __name__ == '__main__':
# start creating an Examon app
app = ExamonApp()
app.parse_opt()
# for checking
print("Config:")
print(json.dumps(app.conf, indent=4))
# set default metrics tags
tags = app.examon_tags()
tags['org'] = 'examon'
tags['plugin'] = 'examon_pub'
tags['chnl'] = 'data'
# add a worker
app.add_worker(worker, app.conf, tags)
# run!
app.run()
Overwriting examon_pub.py