#!/usr/bin/env python # coding: utf-8 # # # # HyperStream Tutorial 4: Real-time streams # # In this tutorial, we show how to create a new plugin that collects real-time data ussing a publicly available API. In this case, we use the [Environment Agency flood-monitoring API][1]. # # ## Creating a plugin tool to use the API # # ### 1. Create a folder in plugins # # First of all, we need to create a new folder to contain the new tool. The new folder needs to be in the folder __plugins__, in this example __plugins/example/tools/environment_data_gov_uk/__. Also, we need to create an **\__init\__.py** file in every subfolder. # # plugins/ # |- __init__.py # |- example/ # |- __init__.py # |- tools/ # |- __init__.py # |- environment_data_gov_uk # |- __init__.py # |- 2017-06-21_v0.0.1.py # # ### 2. Write the plugin in Python # # As we have seen in a previous tutorial, we can create a new plugin in Python, in this case the code of the plugin **./plugins/example/tools/environment_data_gov_uk/2017-06-21_v0.0.1.py** uses the API to query only one of the water readings for the specified interval of time: # # ```Python # from datetime import datetime # from datetime import datetime, timedelta # # from hyperstream import Tool, StreamInstance, StreamInstanceCollection # from hyperstream.utils import check_input_stream_count # from hyperstream.utils import UTC # # from dateutil.parser import parse # # import urllib # import urllib2 # import json # # # this uses Environment Agency flood and river level data from the real-time # # data API (Beta) # # For questions on the APIs please contact data.info@environment-agency.gov.uk, # # a forum for announcements and discussion is under consideration. # class EnvironmentDataGovUk(Tool): # def __init__(self, station): # self.station = station # super(EnvironmentDataGovUk, self).__init__() # # @check_input_stream_count(0) # def _execute(self, sources, alignment_stream, interval): # startdate = interval[0].strftime("%Y-%m-%d") # enddate = interval[1].strftime("%Y-%m-%d") # # url = "https://environment.data.gov.uk/flood-monitoring/id/stations/{}/readings".format(self.station) # values = {'startdate' : startdate, # 'enddate' : enddate} # url_parameters = urllib.urlencode(values) # # full_url = url + '?' + url_parameters # response = urllib2.urlopen(full_url) # data = json.load(response) # # for item in data['items']: # dt = parse(item.get('dateTime')) # if dt in interval: # value = float(item.get('value')) # yield StreamInstance(dt, value) # # ``` # # ### 3. Add HyperStream configuration # # Now, it is necessary to add information about this plugin into the **hyperstream_config.json**. In particular, we need to add the following information in the plugin section: # # - channel_id_prefix: This is to create Channels (explained in another tutorial). # - channel_names: A list of available Channels # - path: path to the new plugin # - has_tools: If the new plugin has tools # - has_assets: If it contains folders or files that are needed by the plugin # # Next, we have an example of an configuration file with the new plugin: # # ```json # { # "mongo": { # "host": "localhost", # "port": 27017, # "tz_aware": true, # "db": "hyperstream" # }, # "plugins": [{ # "channel_id_prefix": "example", # "channel_names": [], # "path": "plugins/example", # "has_tools": true, # "has_assets": false # }], # "online_engine": { # "interval": { # "start": -60, # "end": -10 # }, # "sleep": 5, # "iterations": 100 # } # } # # ``` # # # ### Aknowledge # # this uses Environment Agency flood and river level data from the real-time data API (Beta) # # [1]: https://environment.data.gov.uk/flood-monitoring/doc/reference#introduction # In[1]: get_ipython().run_line_magic('load_ext', 'watermark') import sys from datetime import datetime from datetime import datetime, timedelta sys.path.append("../") # Add parent dir in the Path from hyperstream import HyperStream, StreamId from hyperstream import TimeInterval from hyperstream.utils import UTC from utils import plot_high_chart get_ipython().run_line_magic('watermark', '-v -m -p hyperstream -g') # ### Select the water Station # # For our example, we will query a water station called Bristol Avon Little Avon Axe and North Somerset St. This station has the station number 531118. It is possible to select another station by changing the station_number; a list of 50 other possible stations can be found following [this link][2]. # # [2]: https://environment.data.gov.uk/flood-monitoring/id/stations?_limit=50 # In[2]: station_number = "531118" station_name = "Bristol Avon Little Avon Axe and North Somerset St" # ### Tool and Stream # # First we will create a Stream to store the data and an instance of the new tool. # In[3]: hs = HyperStream(loglevel=20) print hs environment_stream = hs.channel_manager.memory.get_or_create_stream("environment") environment_tool = hs.plugins.example.tools.environment_data_gov_uk(station=station_number) # ### Execute the tool # # Now we will specify an interval of time for which we want the water levels. In this particular case we will ask for the last 7 days. Then, we can execute the tool for the specified interval of time. The result will be stored in the specified Stream. # In[4]: now = datetime.utcnow().replace(tzinfo=UTC) before = (now - timedelta(weeks=1)).replace(tzinfo=UTC) ti = TimeInterval(before, now) environment_tool.execute(sources=[], sink=environment_stream, interval=ti) # ### Visualization # # Now we can visualize all the data stored in the stream # In[5]: my_time, my_data = zip(*[(key.__str__(), value) for key, value in environment_stream.window().items()]) plot_high_chart(my_time, my_data, type="high_stock", title=station_name, yax='meters') # In[ ]: