HyperStream Tutorial 2: Your own tools

HyperStream has a set of predefined tools in hyperstream.tools. However, it is possible to define your own tools and factors. In this tutorial, we show how to create a simple plugin that reads a CSV file. In this tutorial, we already created the tool and made all the configurations necessary for it to work. We will describe how this one was created, and how can you create a new one.

Creating a plugin tool to read csv files

1. Create a folder in plugins

First of all, we need to create a new folder to contain our new tool. The new folder needs to be in the folder plugins, in this example plugins/example/tools/csv_reader/. We need to create an __init__.py file in every subfolder to be able to treat all the folders as a Python package.

    |- __init__.py
    |- one_plugin/
    |   |- __init__.py
    |   |- tools/
    |       |- __init__.py
    |       |- tool_a
    |           |- __init__.py
    |           |- 2017-06-20_v0.0.1.py
    |           |- 2017-06-22_v0.0.3.py
    |- another_plugin/
        |- __init__.py
        |- tools/
            |- tool_b/
            |   |- __init__.py
            |   |- 2017-06-20_v0.0.1.py
            |   |- 2017-06-22_v0.1.0.py
            |- tool_c/
                |- __init__.py
                |- 2017-06-20_v0.0.2.py

2. Write the plugin in Python

Then, we need to create a new Python file following the name convention <year>-<month>-<day>_v<version>.<subversion>.<subsubversion>.py. In this example you can find the file with the following content in ./plugins/example/tools/csv_reader/2017-06-20_v0.0.1.py

from hyperstream import Tool, StreamInstance
from hyperstream.utils import check_input_stream_count

from dateutil.parser import parse

class CsvReader(Tool):
    def __init__(self, filename):
        super(CsvReader, self).__init__(filename=filename)

    def _execute(self, sources, alignment_stream, interval):

        # Let's make the assumption that the first field is the timestamp

        first = True

        with open(self.filename, 'rU') as f:
            for line in f.readlines():
                if first:
                    first = False
                elements = line.split(',')
                dt = parse(elements[0])
                if dt in interval:
                    yield StreamInstance(dt, map(float, elements[1:]))

3. Add HyperStream configuration

Now, it is necessary to add information about this plugin into the hyperstream_config.json. In particular, we need to add the following information in the plugin section:

  • channel_id_prefix: This is to create Channels (explained in another tutorial).
  • channel_names: A list of available Channels
  • path: path to the new plugin
  • has_tools: If the new plugin has tools
  • has_assets: If it contains folders or files that are needed by the plugin

Next, we have an example of an configuration file with the new plugin:

    "mongo": {
        "host": "localhost",
        "port": 27017,
        "tz_aware": true,
        "db": "hyperstream"
    "plugins": [{
        "channel_id_prefix": "example",
        "channel_names": [],
        "path": "plugins/example",
        "has_tools": true,
        "has_assets": false
    "online_engine": {
        "interval": {
            "start": -60,
            "end": -10
        "sleep": 5,
        "iterations": 100

Using the new tool

Now we can write some HyperStream code that uses the new plugin.

In [1]:
%load_ext watermark

import sys
sys.path.append("../") # Add parent dir in the Path

from hyperstream import HyperStream
from hyperstream import TimeInterval
from hyperstream.utils import UTC

from datetime import datetime

from utils import plot_high_chart
from utils import plot_multiple_stock

%watermark -v -m -p hyperstream -g

hs = HyperStream(loglevel=20)
print hs
CPython 2.7.6
IPython 5.3.0

hyperstream 0.3.0-beta

compiler   : GCC 4.8.4
system     : Linux
release    : 3.19.0-80-generic
machine    : x86_64
processor  : x86_64
CPU cores  : 4
interpreter: 64bit
Git hash   : f0e911526041b91fe7999a8968c80618d410e741
HyperStream version 0.3.0-beta, connected to mongodb://localhost:27017/hyperstream, session id <no session>

Loading the new plugin

After instantiating HyperStream, if the configuration of the plugin and the plugin are in the right place, we will be able to load our new tool csv_reader, specifying where is the input file.

The data is the Polar Ice data that can be found in this link

In [2]:
reader_tool = hs.plugins.example.tools.csv_reader('data/sea_ice.csv')

Create a stream

Now we can create a stream to store all the results in memory.

In [3]:
sea_ice_stream = hs.channel_manager.memory.get_or_create_stream("sea_ice")

Execute the tool

We can now execute the tool in the interval of interest

In [4]:
ti = TimeInterval(datetime(1990, 1, 1).replace(tzinfo=UTC), datetime(2011, 11, 1).replace(tzinfo=UTC))

reader_tool.execute(sources=[], sink=sea_ice_stream, interval=ti)

TimeIntervals([TimeInterval(start=datetime.datetime(1990, 1, 1, 0, 0, tzinfo=<UTC>), end=datetime.datetime(2011, 11, 1, 0, 0, tzinfo=<UTC>))])

Query the stream

And finally we can query certain period of time to the tool and store the information in the created stream

In [5]:
ti = TimeInterval(datetime(1995, 1, 1).replace(tzinfo=UTC), datetime(1996, 1, 1).replace(tzinfo=UTC))

for key, value in sea_ice_stream.window(ti).items():
    print '[%s]: %s' % (key, value)
[1995-02-01 00:00:00+00:00]: [13.3, 2.12]
[1995-03-01 00:00:00+00:00]: [13.28, 2.74]
[1995-04-01 00:00:00+00:00]: [12.32, 5.35]
[1995-05-01 00:00:00+00:00]: [10.76, 8.23]
[1995-06-01 00:00:00+00:00]: [8.86, 10.37]
[1995-07-01 00:00:00+00:00]: [6.05, 12.47]
[1995-08-01 00:00:00+00:00]: [4.61, 14.16]
[1995-09-01 00:00:00+00:00]: [4.38, 14.42]
[1995-10-01 00:00:00+00:00]: [5.91, 13.47]
[1995-11-01 00:00:00+00:00]: [8.95, 11.38]
[1995-12-01 00:00:00+00:00]: [11.02, 7.03]
[1996-01-01 00:00:00+00:00]: [12.07, 3.43]

Visualize all the interval

We can now visualize one of the values of the Stream, in this case the sea level in the Antarctica.

In [6]:
my_time, my_data = zip(*[(key.__str__(), value[1]) for key, value in sea_ice_stream.window().items()])

plot_high_chart(my_time, my_data, type="high_stock", title='Sea level in the Antarctica', yax='meters')
Sea level in the Antarctica

We can also visualize both of the Stream values, the Arctic and the Antarctica sea levels:

In [7]:
time = [key.__str__() for key, value in sea_ice_stream.window().items()]
data = [list(a) for a in zip(*[value for key, value in sea_ice_stream.window().items()])]
htype= 'spline'
names = ['Arctic', 'Antarctica']
plot_multiple_stock(data, time=time, names=names, htype=htype, title='Sea level', ylabel='meters')
Sea level

Visualize specific interval

We can visualize the reduced time interval that we specified above.

In [8]:
time = [key.__str__() for key, value in sea_ice_stream.window(ti).items()]
data = [list(a) for a in zip(*[value for key, value in sea_ice_stream.window(ti).items()])]
htype= 'spline'
names = ['Arctic', 'Antarctica']
plot_multiple_stock(data, time=time, names=names, htype=htype, title='test multi-output', ylabel='meters')
test multi-output