In [ ]:
Copied!
! python -m pip install --upgrade https://github.com/fbeneventi/releases/releases/latest/download/examon-common-py3.zip
! python -m pip install --upgrade https://github.com/fbeneventi/releases/releases/latest/download/examon-common-py3.zip
Configure¶
The below cell will create the examon_pub.conf file that should be edited according to the server configuration
In [18]:
Copied!
%%file examon_pub.conf
; Sample examon publisher config file.
;
; The below section collects all the settings related to the
; MQTT transport layer
[MQTT]
; MQTT broker IP address and port
MQTT_BROKER = 127.0.0.1
MQTT_PORT = 1883
; MQTT output topic (optional). This setting is used only with
; the 'json' and 'bulk' MQTT output formats
MQTT_TOPIC =
; To be used when password authentication is enabled (optional)
MQTT_USER =
MQTT_PASSWORD =
; The below section collects all the settings related to the
; KairosDB database
[KairosDB]
; KairosDB server IP address and port
K_SERVERS =
K_PORT =
; To be used when password authentication is enabled (optional)
K_USER =
K_PASSWORD =
; The below section collects all the settings related to the
; ExaMon collector
[Daemon]
; Default sampling interval in seconds (float)
TS = 2
; Path to the log file
LOG_FILENAME = examon_pub.log
; Path to the pid file
PID_FILENAME = examon_pub.pid
%%file examon_pub.conf
; Sample examon publisher config file.
;
; The below section collects all the settings related to the
; MQTT transport layer
[MQTT]
; MQTT broker IP address and port
MQTT_BROKER = 127.0.0.1
MQTT_PORT = 1883
; MQTT output topic (optional). This setting is used only with
; the 'json' and 'bulk' MQTT output formats
MQTT_TOPIC =
; To be used when password authentication is enabled (optional)
MQTT_USER =
MQTT_PASSWORD =
; The below section collects all the settings related to the
; KairosDB database
[KairosDB]
; KairosDB server IP address and port
K_SERVERS =
K_PORT =
; To be used when password authentication is enabled (optional)
K_USER =
K_PASSWORD =
; The below section collects all the settings related to the
; ExaMon collector
[Daemon]
; Default sampling interval in seconds (float)
TS = 2
; Path to the log file
LOG_FILENAME = examon_pub.log
; Path to the pid file
PID_FILENAME = examon_pub.pid
Overwriting examon_pub.conf
Example¶
This is the main file where the publisher is defined. In this example, a dummy Sensor class creates some random data to be published.
In [19]:
Copied!
%%file examon_pub.py
import json
import time
import random
from examon.plugin.examonapp import ExamonApp
from examon.plugin.sensorreader import SensorReader
class Sensor:
def __init__(self, sensor_name='random_sensor', range_min=0, range_max=100.0):
self.sensor_name = sensor_name
self.range_min = range_min
self.range_max = range_max
def get_sensor_data(self):
return {
'sensor_name': self.sensor_name,
'value': random.uniform(self.range_min, self.range_max)
}
def read_data(self):
pass
def read_data(sr):
# get timestamp and data
timestamp = int(time.time()*1000)
raw_data = sr.sensor.get_sensor_data()
# build the examon metric
metric = {}
metric['name'] = raw_data['sensor_name']
metric['value'] = raw_data['value']
metric['timestamp'] = timestamp
metric['tags'] = sr.get_tags()
# return format:
# * list of metrics
examon_data = [metric]
# * worker id (string) useful for debug/log
worker_id = sr.sensor.sensor_name
return (worker_id, examon_data,)
def worker(conf, tags):
"""
Worker process code
"""
# sensor instance
sensor = Sensor()
# SensorReader app
sr = SensorReader(conf, sensor)
# add read_data callback
sr.read_data = read_data
# set the default tags
sr.add_tags(tags)
# run the worker loop
sr.run()
if __name__ == '__main__':
# start creating an Examon app
app = ExamonApp()
app.parse_opt()
# for checking
print("Config:")
print(json.dumps(app.conf, indent=4))
# set default metrics tags
tags = app.examon_tags()
tags['org'] = 'examon'
tags['plugin'] = 'examon_pub'
tags['chnl'] = 'data'
# add a worker
app.add_worker(worker, app.conf, tags)
# run!
app.run()
%%file examon_pub.py
import json
import time
import random
from examon.plugin.examonapp import ExamonApp
from examon.plugin.sensorreader import SensorReader
class Sensor:
def __init__(self, sensor_name='random_sensor', range_min=0, range_max=100.0):
self.sensor_name = sensor_name
self.range_min = range_min
self.range_max = range_max
def get_sensor_data(self):
return {
'sensor_name': self.sensor_name,
'value': random.uniform(self.range_min, self.range_max)
}
def read_data(self):
pass
def read_data(sr):
# get timestamp and data
timestamp = int(time.time()*1000)
raw_data = sr.sensor.get_sensor_data()
# build the examon metric
metric = {}
metric['name'] = raw_data['sensor_name']
metric['value'] = raw_data['value']
metric['timestamp'] = timestamp
metric['tags'] = sr.get_tags()
# return format:
# * list of metrics
examon_data = [metric]
# * worker id (string) useful for debug/log
worker_id = sr.sensor.sensor_name
return (worker_id, examon_data,)
def worker(conf, tags):
"""
Worker process code
"""
# sensor instance
sensor = Sensor()
# SensorReader app
sr = SensorReader(conf, sensor)
# add read_data callback
sr.read_data = read_data
# set the default tags
sr.add_tags(tags)
# run the worker loop
sr.run()
if __name__ == '__main__':
# start creating an Examon app
app = ExamonApp()
app.parse_opt()
# for checking
print("Config:")
print(json.dumps(app.conf, indent=4))
# set default metrics tags
tags = app.examon_tags()
tags['org'] = 'examon'
tags['plugin'] = 'examon_pub'
tags['chnl'] = 'data'
# add a worker
app.add_worker(worker, app.conf, tags)
# run!
app.run()
Overwriting examon_pub.py
Execution¶
The Examon publisher created above can be executed from the shell and the default configuration can be changed using the command line parameters.
In [20]:
Copied!
! python examon_pub.py -h
! python examon_pub.py -h
usage: examon_pub.py [-h] [-b MQTT_BROKER] [-p MQTT_PORT] [-t MQTT_TOPIC]
[-s TS] [-x PID_FILENAME] [-l LOG_FILENAME]
[-d {mqtt,kairosdb}] [-f {csv,json,bulk}] [--compress]
[--kairosdb-server K_SERVERS] [--kairosdb-port K_PORT]
[--kairosdb-user K_USER] [--kairosdb-password K_PASSWORD]
[--logfile-size LOGFILE_SIZE_B]
[--loglevel {DEBUG,INFO,WARNING,ERROR,CRITICAL}]
[--dry-run] [--mqtt-user MQTT_USER]
[--mqtt-password MQTT_PASSWORD]
{run,start,restart,stop}
positional arguments:
{run,start,restart,stop}
Run mode
optional arguments:
-h, --help show this help message and exit
-b MQTT_BROKER IP address of the MQTT broker
-p MQTT_PORT Port of the MQTT broker
-t MQTT_TOPIC MQTT topic
-s TS Sampling time (seconds)
-x PID_FILENAME pid filename
-l LOG_FILENAME log filename
-d {mqtt,kairosdb} select where to send data (default: mqtt)
-f {csv,json,bulk} MQTT payload format (default: csv)
--compress enable payload compression (default: False)
--kairosdb-server K_SERVERS
kairosdb servers
--kairosdb-port K_PORT
kairosdb port
--kairosdb-user K_USER
kairosdb username
--kairosdb-password K_PASSWORD
kairosdb password
--logfile-size LOGFILE_SIZE_B
log file size (max) in bytes
--loglevel {DEBUG,INFO,WARNING,ERROR,CRITICAL}
log level
--dry-run Data is not sent to the broker if True (default:
False)
--mqtt-user MQTT_USER
MQTT username
--mqtt-password MQTT_PASSWORD
MQTT password
Dry Run¶
Before the actual execution can be useful a "dry run" to check the final payload. The MQTT packet (topic, payload) is printed in the lines that have the tag "[MqttPub]".
In [21]:
Copied!
! python examon_pub.py run --dry-run --loglevel=DEBUG
! python examon_pub.py run --dry-run --loglevel=DEBUG
Config:
{
"MQTT_BROKER": "127.0.0.1",
"MQTT_PORT": "1883",
"MQTT_TOPIC": "",
"MQTT_USER": "",
"MQTT_PASSWORD": "",
"K_SERVERS": "",
"K_PORT": "",
"K_USER": "",
"K_PASSWORD": "",
"TS": "2",
"LOG_FILENAME": "examon_pub.log",
"PID_FILENAME": "examon_pub.pid",
"runmode": "run",
"OUT_PROTOCOL": "mqtt",
"MQTT_FORMAT": "csv",
"COMPRESS": false,
"LOGFILE_SIZE_B": 5242880,
"LOG_LEVEL": "DEBUG",
"DRY_RUN": true
}
Starting jobs...
INFO - 04/01/2022 06:31:27 PM - [Process-1] - [mqtt.py] - examon.transport.mqtt - Connecting to MQTT server: 127.0.0.1:1883
DEBUG - 04/01/2022 06:31:27 PM - [Process-1] - [mqtt.py] - examon.transport.mqtt - MQTT logs: Sending CONNECT (u0, p0, wr0, wq0, wf0, c1, k60) client_id=b''
DEBUG - 04/01/2022 06:31:27 PM - [Process-1] - [mqtt.py] - examon.transport.mqtt - Connect rc: 0
INFO - 04/01/2022 06:31:27 PM - [Process-1] - [mqtt.py] - examon.transport.mqtt - MQTT started
DEBUG - 04/01/2022 06:31:27 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Start timeout timer
DEBUG - 04/01/2022 06:31:27 PM - [Process-1] - [mqtt.py] - examon.transport.mqtt - MQTT logs: Received CONNACK (0, 0)
INFO - 04/01/2022 06:31:27 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Worker [random_sensor] - Retrieved and processed 1 metrics in 0.000117 seconds
INFO - 04/01/2022 06:31:27 PM - [Process-1] - [mqtt.py] - examon.transport.mqtt - Connected with result code 0
DEBUG - 04/01/2022 06:31:27 PM - [Process-1] - [mqtt.py] - examon.transport.mqtt - [MqttPub] Topic: org/examon/plugin/examon_pub/chnl/data/random_sensor - Payload: 78.60698026998385;1648830687.054
DEBUG - 04/01/2022 06:31:27 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Worker [random_sensor] - Insert: 1 sensors, time: 0.004744 sec, insert_rate: 210.779637 sens/sec
DEBUG - 04/01/2022 06:31:27 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Cancel timeout timer
DEBUG - 04/01/2022 06:31:27 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Start new loop
DEBUG - 04/01/2022 06:31:28 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Start timeout timer
INFO - 04/01/2022 06:31:28 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Worker [random_sensor] - Retrieved and processed 1 metrics in 0.000082 seconds
DEBUG - 04/01/2022 06:31:28 PM - [Process-1] - [mqtt.py] - examon.transport.mqtt - [MqttPub] Topic: org/examon/plugin/examon_pub/chnl/data/random_sensor - Payload: 7.620401543358602;1648830688.004
DEBUG - 04/01/2022 06:31:28 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Worker [random_sensor] - Insert: 1 sensors, time: 0.002280 sec, insert_rate: 438.551234 sens/sec
DEBUG - 04/01/2022 06:31:28 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Cancel timeout timer
DEBUG - 04/01/2022 06:31:28 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Start new loop
DEBUG - 04/01/2022 06:31:30 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Start timeout timer
INFO - 04/01/2022 06:31:30 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Worker [random_sensor] - Retrieved and processed 1 metrics in 0.000079 seconds
DEBUG - 04/01/2022 06:31:30 PM - [Process-1] - [mqtt.py] - examon.transport.mqtt - [MqttPub] Topic: org/examon/plugin/examon_pub/chnl/data/random_sensor - Payload: 40.6563350814879;1648830690.006
DEBUG - 04/01/2022 06:31:30 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Worker [random_sensor] - Insert: 1 sensors, time: 0.002456 sec, insert_rate: 407.213981 sens/sec
DEBUG - 04/01/2022 06:31:30 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Cancel timeout timer
DEBUG - 04/01/2022 06:31:30 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Start new loop
DEBUG - 04/01/2022 06:31:32 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Start timeout timer
INFO - 04/01/2022 06:31:32 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Worker [random_sensor] - Retrieved and processed 1 metrics in 0.000079 seconds
DEBUG - 04/01/2022 06:31:32 PM - [Process-1] - [mqtt.py] - examon.transport.mqtt - [MqttPub] Topic: org/examon/plugin/examon_pub/chnl/data/random_sensor - Payload: 1.0554041222391009;1648830692.005
DEBUG - 04/01/2022 06:31:32 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Worker [random_sensor] - Insert: 1 sensors, time: 0.002471 sec, insert_rate: 404.621262 sens/sec
DEBUG - 04/01/2022 06:31:32 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Cancel timeout timer
DEBUG - 04/01/2022 06:31:32 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Start new loop
DEBUG - 04/01/2022 06:31:34 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Start timeout timer
INFO - 04/01/2022 06:31:34 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Worker [random_sensor] - Retrieved and processed 1 metrics in 0.000082 seconds
DEBUG - 04/01/2022 06:31:34 PM - [Process-1] - [mqtt.py] - examon.transport.mqtt - [MqttPub] Topic: org/examon/plugin/examon_pub/chnl/data/random_sensor - Payload: 76.4034034158195;1648830694.007
DEBUG - 04/01/2022 06:31:34 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Worker [random_sensor] - Insert: 1 sensors, time: 0.002533 sec, insert_rate: 394.795181 sens/sec
DEBUG - 04/01/2022 06:31:34 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Cancel timeout timer
DEBUG - 04/01/2022 06:31:34 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Start new loop
DEBUG - 04/01/2022 06:31:36 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Start timeout timer
INFO - 04/01/2022 06:31:36 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Worker [random_sensor] - Retrieved and processed 1 metrics in 0.000081 seconds
DEBUG - 04/01/2022 06:31:36 PM - [Process-1] - [mqtt.py] - examon.transport.mqtt - [MqttPub] Topic: org/examon/plugin/examon_pub/chnl/data/random_sensor - Payload: 79.65416560731711;1648830696.006
DEBUG - 04/01/2022 06:31:36 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Worker [random_sensor] - Insert: 1 sensors, time: 0.002764 sec, insert_rate: 361.765051 sens/sec
DEBUG - 04/01/2022 06:31:36 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Cancel timeout timer
DEBUG - 04/01/2022 06:31:36 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Start new loop
DEBUG - 04/01/2022 06:31:38 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Start timeout timer
INFO - 04/01/2022 06:31:38 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Worker [random_sensor] - Retrieved and processed 1 metrics in 0.000080 seconds
DEBUG - 04/01/2022 06:31:38 PM - [Process-1] - [mqtt.py] - examon.transport.mqtt - [MqttPub] Topic: org/examon/plugin/examon_pub/chnl/data/random_sensor - Payload: 56.91540569779116;1648830698.005
DEBUG - 04/01/2022 06:31:38 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Worker [random_sensor] - Insert: 1 sensors, time: 0.002786 sec, insert_rate: 358.886284 sens/sec
DEBUG - 04/01/2022 06:31:38 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Cancel timeout timer
DEBUG - 04/01/2022 06:31:38 PM - [Process-1] - [sensorreader.py] - examon.plugin.sensorreader - Start new loop
^C
Interrupted..
Process Process-1:
Run¶
Actual execution
In [22]:
Copied!
! python examon_pub.py run --loglevel=WARNING
! python examon_pub.py run --loglevel=WARNING
Config:
{
"MQTT_BROKER": "127.0.0.1",
"MQTT_PORT": "1883",
"MQTT_TOPIC": "",
"MQTT_USER": "",
"MQTT_PASSWORD": "",
"K_SERVERS": "",
"K_PORT": "",
"K_USER": "",
"K_PASSWORD": "",
"TS": "2",
"LOG_FILENAME": "examon_pub.log",
"PID_FILENAME": "examon_pub.pid",
"runmode": "run",
"OUT_PROTOCOL": "mqtt",
"MQTT_FORMAT": "csv",
"COMPRESS": false,
"LOGFILE_SIZE_B": 5242880,
"LOG_LEVEL": "WARNING",
"DRY_RUN": false
}
Starting jobs...
^C
Interrupted..
Process Process-1:
Run as a service (daemon mode)¶
In this example, the publisher is executed in daemon mode
$ python examon_pub.py start --loglevel=WARNING
To stop it:
$ python examon_pub.py stop
Multiple sensors example¶
This example shows how to handle multiple sensors and dynamic tags.
In [23]:
Copied!
%%file examon_pub.py
import json
import time
import random
from examon.plugin.examonapp import ExamonApp
from examon.plugin.sensorreader import SensorReader
class Sensor:
def __init__(self, sensor_name='random_sensor', range_min=0, range_max=100.0):
self.sensor_name = sensor_name
self.range_min = range_min
self.range_max = range_max
def get_sensor_data(self, num_sensors=10):
payload = []
for s in range(0, num_sensors):
payload.append({
'sensor_name': self.sensor_name,
'id': str(s),
'value': random.uniform(self.range_min, self.range_max)
})
return payload
def read_data(self):
pass
def read_data(sr):
# get timestamp and data
timestamp = int(time.time()*1000)
raw_packet = sr.sensor.get_sensor_data()
# build the examon metric
examon_data = []
for raw_data in raw_packet:
metric = {}
metric['name'] = raw_data['sensor_name']
metric['value'] = raw_data['value']
metric['timestamp'] = timestamp
metric['tags'] = sr.get_tags()
# dynamically add new custom tags
metric['tags']['id'] = str(raw_data['id'])
# build the final packet
examon_data.append(metric)
# worker id (string) useful for debug/log
worker_id = sr.sensor.sensor_name
return (worker_id, examon_data,)
def worker(conf, tags):
"""
Worker process code
"""
# sensor instance
sensor = Sensor()
# SensorReader app
sr = SensorReader(conf, sensor)
# add read_data callback
sr.read_data = read_data
# set the default tags
sr.add_tags(tags)
# run the worker loop
sr.run()
if __name__ == '__main__':
# start creating an Examon app
app = ExamonApp()
app.parse_opt()
# for checking
print("Config:")
print(json.dumps(app.conf, indent=4))
# set default metrics tags
tags = app.examon_tags()
tags['org'] = 'examon'
tags['plugin'] = 'examon_pub'
tags['chnl'] = 'data'
# add a worker
app.add_worker(worker, app.conf, tags)
# run!
app.run()
%%file examon_pub.py
import json
import time
import random
from examon.plugin.examonapp import ExamonApp
from examon.plugin.sensorreader import SensorReader
class Sensor:
def __init__(self, sensor_name='random_sensor', range_min=0, range_max=100.0):
self.sensor_name = sensor_name
self.range_min = range_min
self.range_max = range_max
def get_sensor_data(self, num_sensors=10):
payload = []
for s in range(0, num_sensors):
payload.append({
'sensor_name': self.sensor_name,
'id': str(s),
'value': random.uniform(self.range_min, self.range_max)
})
return payload
def read_data(self):
pass
def read_data(sr):
# get timestamp and data
timestamp = int(time.time()*1000)
raw_packet = sr.sensor.get_sensor_data()
# build the examon metric
examon_data = []
for raw_data in raw_packet:
metric = {}
metric['name'] = raw_data['sensor_name']
metric['value'] = raw_data['value']
metric['timestamp'] = timestamp
metric['tags'] = sr.get_tags()
# dynamically add new custom tags
metric['tags']['id'] = str(raw_data['id'])
# build the final packet
examon_data.append(metric)
# worker id (string) useful for debug/log
worker_id = sr.sensor.sensor_name
return (worker_id, examon_data,)
def worker(conf, tags):
"""
Worker process code
"""
# sensor instance
sensor = Sensor()
# SensorReader app
sr = SensorReader(conf, sensor)
# add read_data callback
sr.read_data = read_data
# set the default tags
sr.add_tags(tags)
# run the worker loop
sr.run()
if __name__ == '__main__':
# start creating an Examon app
app = ExamonApp()
app.parse_opt()
# for checking
print("Config:")
print(json.dumps(app.conf, indent=4))
# set default metrics tags
tags = app.examon_tags()
tags['org'] = 'examon'
tags['plugin'] = 'examon_pub'
tags['chnl'] = 'data'
# add a worker
app.add_worker(worker, app.conf, tags)
# run!
app.run()
Overwriting examon_pub.py