HyperStream has a set of predefined tools in hyperstream.tools. However, it is possible to define your own tools and factors. In this tutorial, we show how to create a simple plugin that reads a CSV file. In this tutorial, we already created the tool and made all the configurations necessary for it to work. We will describe how this one was created, and how can you create a new one.
First of all, we need to create a new folder to contain our new tool. The new folder needs to be in the folder plugins, in this example plugins/example/tools/csv_reader/. We need to create an __init__.py file in every subfolder to be able to treat all the folders as a Python package.
plugins/
|- __init__.py
|- one_plugin/
| |- __init__.py
| |- tools/
| |- __init__.py
| |- tool_a
| |- __init__.py
| |- 2017-06-20_v0.0.1.py
| |- 2017-06-22_v0.0.3.py
|- another_plugin/
|- __init__.py
|- tools/
|- tool_b/
| |- __init__.py
| |- 2017-06-20_v0.0.1.py
| |- 2017-06-22_v0.1.0.py
|- tool_c/
|- __init__.py
|- 2017-06-20_v0.0.2.py
Then, we need to create a new Python file following the name convention <year>-<month>-<day>_v<version>.<subversion>.<subsubversion>.py. In this example you can find the file with the following content in ./plugins/example/tools/csv_reader/2017-06-20_v0.0.1.py
from hyperstream import Tool, StreamInstance
from hyperstream.utils import check_input_stream_count
from dateutil.parser import parse
class CsvReader(Tool):
def __init__(self, filename):
super(CsvReader, self).__init__(filename=filename)
@check_input_stream_count(0)
def _execute(self, sources, alignment_stream, interval):
# Let's make the assumption that the first field is the timestamp
first = True
with open(self.filename, 'rU') as f:
for line in f.readlines():
if first:
first = False
continue
elements = line.split(',')
dt = parse(elements[0])
if dt in interval:
yield StreamInstance(dt, map(float, elements[1:]))
Now, it is necessary to add information about this plugin into the hyperstream_config.json. In particular, we need to add the following information in the plugin section:
Next, we have an example of an configuration file with the new plugin:
{
"mongo": {
"host": "localhost",
"port": 27017,
"tz_aware": true,
"db": "hyperstream"
},
"plugins": [{
"channel_id_prefix": "example",
"channel_names": [],
"path": "plugins/example",
"has_tools": true,
"has_assets": false
}],
"online_engine": {
"interval": {
"start": -60,
"end": -10
},
"sleep": 5,
"iterations": 100
}
}
Now we can write some HyperStream code that uses the new plugin.
%load_ext watermark
import sys
sys.path.append("../") # Add parent dir in the Path
from hyperstream import HyperStream
from hyperstream import TimeInterval
from hyperstream.utils import UTC
from datetime import datetime
from utils import plot_high_chart
from utils import plot_multiple_stock
%watermark -v -m -p hyperstream -g
hs = HyperStream(loglevel=20)
print hs
CPython 2.7.6 IPython 5.3.0 hyperstream 0.3.0-beta compiler : GCC 4.8.4 system : Linux release : 3.19.0-80-generic machine : x86_64 processor : x86_64 CPU cores : 4 interpreter: 64bit Git hash : f0e911526041b91fe7999a8968c80618d410e741 HyperStream version 0.3.0-beta, connected to mongodb://localhost:27017/hyperstream, session id <no session>
reader_tool = hs.plugins.example.tools.csv_reader('data/sea_ice.csv')
Now we can create a stream to store all the results in memory.
sea_ice_stream = hs.channel_manager.memory.get_or_create_stream("sea_ice")
We can now execute the tool in the interval of interest
ti = TimeInterval(datetime(1990, 1, 1).replace(tzinfo=UTC), datetime(2011, 11, 1).replace(tzinfo=UTC))
reader_tool.execute(sources=[], sink=sea_ice_stream, interval=ti)
sea_ice_stream.calculated_intervals
TimeIntervals([TimeInterval(start=datetime.datetime(1990, 1, 1, 0, 0, tzinfo=<UTC>), end=datetime.datetime(2011, 11, 1, 0, 0, tzinfo=<UTC>))])
And finally we can query certain period of time to the tool and store the information in the created stream
ti = TimeInterval(datetime(1995, 1, 1).replace(tzinfo=UTC), datetime(1996, 1, 1).replace(tzinfo=UTC))
for key, value in sea_ice_stream.window(ti).items():
print '[%s]: %s' % (key, value)
[1995-02-01 00:00:00+00:00]: [13.3, 2.12] [1995-03-01 00:00:00+00:00]: [13.28, 2.74] [1995-04-01 00:00:00+00:00]: [12.32, 5.35] [1995-05-01 00:00:00+00:00]: [10.76, 8.23] [1995-06-01 00:00:00+00:00]: [8.86, 10.37] [1995-07-01 00:00:00+00:00]: [6.05, 12.47] [1995-08-01 00:00:00+00:00]: [4.61, 14.16] [1995-09-01 00:00:00+00:00]: [4.38, 14.42] [1995-10-01 00:00:00+00:00]: [5.91, 13.47] [1995-11-01 00:00:00+00:00]: [8.95, 11.38] [1995-12-01 00:00:00+00:00]: [11.02, 7.03] [1996-01-01 00:00:00+00:00]: [12.07, 3.43]
We can now visualize one of the values of the Stream, in this case the sea level in the Antarctica.
my_time, my_data = zip(*[(key.__str__(), value[1]) for key, value in sea_ice_stream.window().items()])
plot_high_chart(my_time, my_data, type="high_stock", title='Sea level in the Antarctica', yax='meters')
We can also visualize both of the Stream values, the Arctic and the Antarctica sea levels:
time = [key.__str__() for key, value in sea_ice_stream.window().items()]
data = [list(a) for a in zip(*[value for key, value in sea_ice_stream.window().items()])]
htype= 'spline'
names = ['Arctic', 'Antarctica']
plot_multiple_stock(data, time=time, names=names, htype=htype, title='Sea level', ylabel='meters')
We can visualize the reduced time interval that we specified above.
time = [key.__str__() for key, value in sea_ice_stream.window(ti).items()]
data = [list(a) for a in zip(*[value for key, value in sea_ice_stream.window(ti).items()])]
htype= 'spline'
names = ['Arctic', 'Antarctica']
plot_multiple_stock(data, time=time, names=names, htype=htype, title='test multi-output', ylabel='meters')