In order to run this and the following tutorials, it is necessary to have access to a MongoDB server running in the localhost port 27017. It is possible to change the host and port of the MongoDB server by modifying the configuration file hyperstream_config.json located in the folder of this notebook.
We also require all the dependencies listed in the HyperStream requirements, the installation instructions can be found in https://github.com/IRC-SPHERE/HyperStream
It is possible to start a MongoDB server, a Jupyter Notebook and a MQTT server by running the script start_tutorial.sh
%load_ext watermark
import sys
sys.path.append("../") # Add parent dir in the Path
from hyperstream import HyperStream
from hyperstream import StreamId
from hyperstream import TimeInterval
from pytz import UTC
from datetime import datetime, timedelta
from utils import plot_high_chart
from utils import unix_time_miliseconds
%watermark -v -m -p hyperstream -g
CPython 2.7.6 IPython 5.3.0 hyperstream 0.3.0-beta compiler : GCC 4.8.4 system : Linux release : 3.19.0-80-generic machine : x86_64 processor : x86_64 CPU cores : 4 interpreter: 64bit Git hash : f0e911526041b91fe7999a8968c80618d410e741
First of all, we will create a HyperStream instance. This instance will connect to the MongoDB server that is specified in the configuration file hyperstream_config.json.
hs = HyperStream(loglevel=0)
print hs
HyperStream version 0.3.0-beta, connected to mongodb://localhost:27017/hyperstream, session id <no session>
HyperStream comes with a set of predefined tools in hyperstream.tools. These tools can be used to define the nodes of a factor graph that will produce values or compute certain functions given the specified input nodes. For this tutorial, we will focus on the clock tool. This tool produces time ticks given an initial time and a stride in seconds for each following tick. In this case we will generate one tick every hour.
clock_tool = hs.tools.clock(stride=1.0*60*60)
We need to specify where we want to store the data that will be generated. It is possible to store the Stream in memory or in a MongoDB database. In this tutorial we use the memory channel by creating an instance of memory and then creating the stream in this channel.
M = hs.channel_manager.memory
# M = hs.channel_manager.mongo # To store the results in a MongoDB database
clock_stream = M.get_or_create_stream(stream_id=StreamId(name="clock_stream"))
Now we can specify a time interval in which we want to execute the tool. In this example we will ask for an interval of 10 hours from yesterday.
yesterday_start = (datetime.utcnow() - timedelta(days=1)).replace(tzinfo=UTC)
yesterday_end = (yesterday_start + timedelta(hours=10)).replace(tzinfo=UTC)
ti = TimeInterval(yesterday_start, yesterday_end)
Now that we defined the tool to use, where we want to store the results and the time interval, it is possible to execute the tool. Then, we will have all the computed results in the Stream specified as a sink.
clock_tool.execute(sources=[], sink=clock_stream, interval=ti, alignment_stream=None)
print clock_stream.calculated_intervals
(2017-07-20 16:00:37.904174+00:00, 2017-07-21 02:00:37.904174+00:00]
The resulting stream is stored in the ticker. We can get now a list of tuples containing the timestamps and their corresponding clock values.
for timestamp, value in clock_stream.window().items():
print '[%s]: %s' % (timestamp, value)
[2017-07-20 17:00:00+00:00]: 2017-07-20 17:00:00+00:00 [2017-07-20 18:00:00+00:00]: 2017-07-20 18:00:00+00:00 [2017-07-20 19:00:00+00:00]: 2017-07-20 19:00:00+00:00 [2017-07-20 20:00:00+00:00]: 2017-07-20 20:00:00+00:00 [2017-07-20 21:00:00+00:00]: 2017-07-20 21:00:00+00:00 [2017-07-20 22:00:00+00:00]: 2017-07-20 22:00:00+00:00 [2017-07-20 23:00:00+00:00]: 2017-07-20 23:00:00+00:00 [2017-07-21 00:00:00+00:00]: 2017-07-21 00:00:00+00:00 [2017-07-21 01:00:00+00:00]: 2017-07-21 01:00:00+00:00 [2017-07-21 02:00:00+00:00]: 2017-07-21 02:00:00+00:00
We can query the stream with any particular time interval. For example, we can ask for the first 5 hours of the precomputed starting time.
ti = TimeInterval(yesterday_start, (yesterday_start + timedelta(hours=5)).replace(tzinfo=UTC))
for timestamp, value in clock_stream.window(ti).items():
print '[%s]: %s' % (timestamp, value)
[2017-07-20 17:00:00+00:00]: 2017-07-20 17:00:00+00:00 [2017-07-20 18:00:00+00:00]: 2017-07-20 18:00:00+00:00 [2017-07-20 19:00:00+00:00]: 2017-07-20 19:00:00+00:00 [2017-07-20 20:00:00+00:00]: 2017-07-20 20:00:00+00:00 [2017-07-20 21:00:00+00:00]: 2017-07-20 21:00:00+00:00
It is possible to execute the tool again specifying a different interval. Because the interval has never been computed in the specified Stream, the tool will compute the new data. If the interval was already computed the tool would not do any computation. As an example, we will execute the tool for the past 5 hours.
today_end = datetime.utcnow().replace(tzinfo=UTC)
today_start = (today_end - timedelta(hours=5)).replace(tzinfo=UTC)
ti = TimeInterval(today_start, today_end)
clock_tool.execute(sources=[], sink=clock_stream, interval=ti, alignment_stream=None)
print clock_stream.calculated_intervals
(2017-07-20 16:00:37.904174+00:00, 2017-07-21 02:00:37.904174+00:00] U (2017-07-21 11:00:37.945814+00:00, 2017-07-21 16:00:37.945814+00:00]
Now, we can see that we got two different intervals of time in the full Stream
ti = TimeInterval(yesterday_start, today_end)
for timestamp, value in clock_stream.window(ti).items():
print '[%s]: %s' % (timestamp, value)
[2017-07-20 17:00:00+00:00]: 2017-07-20 17:00:00+00:00 [2017-07-20 18:00:00+00:00]: 2017-07-20 18:00:00+00:00 [2017-07-20 19:00:00+00:00]: 2017-07-20 19:00:00+00:00 [2017-07-20 20:00:00+00:00]: 2017-07-20 20:00:00+00:00 [2017-07-20 21:00:00+00:00]: 2017-07-20 21:00:00+00:00 [2017-07-20 22:00:00+00:00]: 2017-07-20 22:00:00+00:00 [2017-07-20 23:00:00+00:00]: 2017-07-20 23:00:00+00:00 [2017-07-21 00:00:00+00:00]: 2017-07-21 00:00:00+00:00 [2017-07-21 01:00:00+00:00]: 2017-07-21 01:00:00+00:00 [2017-07-21 02:00:00+00:00]: 2017-07-21 02:00:00+00:00 [2017-07-21 12:00:00+00:00]: 2017-07-21 12:00:00+00:00 [2017-07-21 13:00:00+00:00]: 2017-07-21 13:00:00+00:00 [2017-07-21 14:00:00+00:00]: 2017-07-21 14:00:00+00:00 [2017-07-21 15:00:00+00:00]: 2017-07-21 15:00:00+00:00 [2017-07-21 16:00:00+00:00]: 2017-07-21 16:00:00+00:00
For this and the following tutorials we will use the JavaScript library highcharts. We have created a Python function called plot_high_chart for one time-serie or line and plot_multiple_stock for multiple time-series.
Then, we can visualize the two intervals where the tool has been executed.
Be aware that there is a missing time interval in the graph and the highchart jumps from one point to the next one ignoring a linear time.
my_time, my_data = zip(*[(key.__str__(), unix_time_miliseconds(value)) for key, value in clock_stream.window(ti).items()])
plot_high_chart(my_time, my_data, yax='miliseconds', title='Time from epoch', type='high_stock')