HyperStream Tutorial 1: Introduction

Requirements

In order to run this and the following tutorials, it is necessary to have access to a MongoDB server running in the localhost port 27017. It is possible to change the host and port of the MongoDB server by modifying the configuration file hyperstream_config.json located in the folder of this notebook.

We also require all the dependencies listed in the HyperStream requirements, the installation instructions can be found in https://github.com/IRC-SPHERE/HyperStream

It is possible to start a MongoDB server, a Jupyter Notebook and a MQTT server by running the script start_tutorial.sh

In [1]:
%load_ext watermark

import sys
sys.path.append("../") # Add parent dir in the Path

from hyperstream import HyperStream
from hyperstream import StreamId
from hyperstream import TimeInterval

from pytz import UTC
from datetime import datetime, timedelta

from utils import plot_high_chart
from utils import unix_time_miliseconds

%watermark -v -m -p hyperstream -g
CPython 2.7.6
IPython 5.3.0

hyperstream 0.3.0-beta

compiler   : GCC 4.8.4
system     : Linux
release    : 3.19.0-80-generic
machine    : x86_64
processor  : x86_64
CPU cores  : 4
interpreter: 64bit
Git hash   : f0e911526041b91fe7999a8968c80618d410e741

Starting a Hyperstream instance

First of all, we will create a HyperStream instance. This instance will connect to the MongoDB server that is specified in the configuration file hyperstream_config.json.

In [2]:
hs = HyperStream(loglevel=0)
print hs
HyperStream version 0.3.0-beta, connected to mongodb://localhost:27017/hyperstream, session id <no session>

Selecting a tool

HyperStream comes with a set of predefined tools in hyperstream.tools. These tools can be used to define the nodes of a factor graph that will produce values or compute certain functions given the specified input nodes. For this tutorial, we will focus on the clock tool. This tool produces time ticks given an initial time and a stride in seconds for each following tick. In this case we will generate one tick every hour.

In [3]:
clock_tool = hs.tools.clock(stride=1.0*60*60)

Storing the result in a Stream

We need to specify where we want to store the data that will be generated. It is possible to store the Stream in memory or in a MongoDB database. In this tutorial we use the memory channel by creating an instance of memory and then creating the stream in this channel.

In [4]:
M = hs.channel_manager.memory
# M = hs.channel_manager.mongo # To store the results in a MongoDB database

clock_stream = M.get_or_create_stream(stream_id=StreamId(name="clock_stream"))

Definition of a time interval

Now we can specify a time interval in which we want to execute the tool. In this example we will ask for an interval of 10 hours from yesterday.

In [5]:
yesterday_start = (datetime.utcnow() - timedelta(days=1)).replace(tzinfo=UTC)
yesterday_end = (yesterday_start + timedelta(hours=10)).replace(tzinfo=UTC)

ti = TimeInterval(yesterday_start, yesterday_end)

Executing the tool

Now that we defined the tool to use, where we want to store the results and the time interval, it is possible to execute the tool. Then, we will have all the computed results in the Stream specified as a sink.

In [6]:
clock_tool.execute(sources=[], sink=clock_stream, interval=ti, alignment_stream=None)

print clock_stream.calculated_intervals
(2017-07-20 16:00:37.904174+00:00, 2017-07-21 02:00:37.904174+00:00]

Printing the results

The resulting stream is stored in the ticker. We can get now a list of tuples containing the timestamps and their corresponding clock values.

In [7]:
for timestamp, value in clock_stream.window().items():
    print '[%s]: %s' % (timestamp, value)
[2017-07-20 17:00:00+00:00]: 2017-07-20 17:00:00+00:00
[2017-07-20 18:00:00+00:00]: 2017-07-20 18:00:00+00:00
[2017-07-20 19:00:00+00:00]: 2017-07-20 19:00:00+00:00
[2017-07-20 20:00:00+00:00]: 2017-07-20 20:00:00+00:00
[2017-07-20 21:00:00+00:00]: 2017-07-20 21:00:00+00:00
[2017-07-20 22:00:00+00:00]: 2017-07-20 22:00:00+00:00
[2017-07-20 23:00:00+00:00]: 2017-07-20 23:00:00+00:00
[2017-07-21 00:00:00+00:00]: 2017-07-21 00:00:00+00:00
[2017-07-21 01:00:00+00:00]: 2017-07-21 01:00:00+00:00
[2017-07-21 02:00:00+00:00]: 2017-07-21 02:00:00+00:00

Query the Stream

We can query the stream with any particular time interval. For example, we can ask for the first 5 hours of the precomputed starting time.

In [8]:
ti = TimeInterval(yesterday_start, (yesterday_start + timedelta(hours=5)).replace(tzinfo=UTC))

for timestamp, value in clock_stream.window(ti).items():
    print '[%s]: %s' % (timestamp, value)
[2017-07-20 17:00:00+00:00]: 2017-07-20 17:00:00+00:00
[2017-07-20 18:00:00+00:00]: 2017-07-20 18:00:00+00:00
[2017-07-20 19:00:00+00:00]: 2017-07-20 19:00:00+00:00
[2017-07-20 20:00:00+00:00]: 2017-07-20 20:00:00+00:00
[2017-07-20 21:00:00+00:00]: 2017-07-20 21:00:00+00:00

Executing a new interval

It is possible to execute the tool again specifying a different interval. Because the interval has never been computed in the specified Stream, the tool will compute the new data. If the interval was already computed the tool would not do any computation. As an example, we will execute the tool for the past 5 hours.

In [9]:
today_end = datetime.utcnow().replace(tzinfo=UTC)
today_start = (today_end - timedelta(hours=5)).replace(tzinfo=UTC)

ti = TimeInterval(today_start, today_end)

clock_tool.execute(sources=[], sink=clock_stream, interval=ti, alignment_stream=None)
print clock_stream.calculated_intervals
(2017-07-20 16:00:37.904174+00:00, 2017-07-21 02:00:37.904174+00:00] U (2017-07-21 11:00:37.945814+00:00, 2017-07-21 16:00:37.945814+00:00]

Now, we can see that we got two different intervals of time in the full Stream

In [10]:
ti = TimeInterval(yesterday_start, today_end)

for timestamp, value in clock_stream.window(ti).items():
    print '[%s]: %s' % (timestamp, value)
[2017-07-20 17:00:00+00:00]: 2017-07-20 17:00:00+00:00
[2017-07-20 18:00:00+00:00]: 2017-07-20 18:00:00+00:00
[2017-07-20 19:00:00+00:00]: 2017-07-20 19:00:00+00:00
[2017-07-20 20:00:00+00:00]: 2017-07-20 20:00:00+00:00
[2017-07-20 21:00:00+00:00]: 2017-07-20 21:00:00+00:00
[2017-07-20 22:00:00+00:00]: 2017-07-20 22:00:00+00:00
[2017-07-20 23:00:00+00:00]: 2017-07-20 23:00:00+00:00
[2017-07-21 00:00:00+00:00]: 2017-07-21 00:00:00+00:00
[2017-07-21 01:00:00+00:00]: 2017-07-21 01:00:00+00:00
[2017-07-21 02:00:00+00:00]: 2017-07-21 02:00:00+00:00
[2017-07-21 12:00:00+00:00]: 2017-07-21 12:00:00+00:00
[2017-07-21 13:00:00+00:00]: 2017-07-21 13:00:00+00:00
[2017-07-21 14:00:00+00:00]: 2017-07-21 14:00:00+00:00
[2017-07-21 15:00:00+00:00]: 2017-07-21 15:00:00+00:00
[2017-07-21 16:00:00+00:00]: 2017-07-21 16:00:00+00:00

Visualization

For this and the following tutorials we will use the JavaScript library highcharts. We have created a Python function called plot_high_chart for one time-serie or line and plot_multiple_stock for multiple time-series.

Then, we can visualize the two intervals where the tool has been executed.

Be aware that there is a missing time interval in the graph and the highchart jumps from one point to the next one ignoring a linear time.

In [11]:
my_time, my_data = zip(*[(key.__str__(), unix_time_miliseconds(value)) for key, value in clock_stream.window(ti).items()])

plot_high_chart(my_time, my_data, yax='miliseconds', title='Time from epoch', type='high_stock')
Time from epoch