#!/usr/bin/env python # coding: utf-8 # # Environment Setup # In[4]: from os import path as p, chdir if 'examples' in p.abspath('.'): chdir('..') # # Examples # ## Fetch a webpage # In this example, we fetch the title of a webpage. # In[5]: from riko.modules.fetchpage import pipe url = 'https://news.ycombinator.com/' next(pipe(conf={'url': url, 'start': '', 'end': ''})) # ## Fetch a webpage using an xpath # Here, we fetch the the first hackernews story link using an xpath. # In[6]: from riko.modules.xpathfetchpage import pipe xpath = '/html/body/center/table/tr[3]/td/table/tr[1]/td[3]/a' next(pipe(conf={'url': 'https://news.ycombinator.com/', 'xpath': xpath})) # ## Word Count # Here, we use several pipes to count the number of words on a webpage. # In[8]: ### Create a SyncPipe flow ### # # `SyncPipe` is a convenience class that creates chainable flows # and allows for parallel processing. from riko.collections import SyncPipe ### Set the pipe configurations ### # # Notes: # 1. the `detag` option will strip all html tags from the result # 2. fetch the text contained inside the 'body' tag of the hackernews homepage # 3. replace newlines with spaces and assign the result to 'content' # 4. tokenize the resulting text using whitespace as the delimeter # 5. count the number of times each token appears # 6. obtain the raw stream # 7. extract the first word and its count url = 'https://news.ycombinator.com/' fetch_conf = {'url': url, 'start': '', 'end': '', 'detag': True} # 1 replace_conf = {'rule': [{'find': '\r\n', 'replace': ' '}, {'find': '\n', 'replace': ' '}]} flow = ( SyncPipe('fetchpage', conf=fetch_conf) # 2 .strreplace(conf=replace_conf, assign='content') # 3 .tokenizer(conf={'delimiter': ' '}, emit=True) # 4 .count(conf={'count_key': 'content'})) # 5 stream = flow.output # 6 next(stream) # 7 # ## Fetching feeds # `riko` can fetch rss feeds from both local and remote filepaths via "source" # `pipes`. Each "source" `pipe` returns a `stream`, i.e., an iterator of # dictionaries, aka `items`. # In[8]: from riko.modules.fetch import pipe ### Fetch an RSS feed ### stream = pipe(conf={'url': 'https://news.ycombinator.com/rss'}) item = next(stream) item['title'], item['link'], item['comments'] # In[9]: from riko.modules.fetchsitefeed import pipe ### Fetch the first RSS feed found ### # # Note: regardless of how you fetch an RSS feed, it will have the same # structure stream = pipe(conf={'url': 'http://arstechnica.com/rss-feeds/'}) item = next(stream) item.keys() # In[10]: item['title'], item['author'], item['id'] # Please see the [FAQ](https://github.com/nerevu/riko/blob/master/docs/FAQ.rst) for a complete list of supported [file types](https://github.com/nerevu/riko/blob/master/docs/FAQ.rst#what-file-types-are-supported) and # [protocols](https://github.com/nerevu/riko/blob/master/docs/FAQ.rst#what-protocols-are-supported). Please see [Fetching data and feeds](https://github.com/nerevu/riko/blob/master/COOKBOOK.rst#fetching-data-and-feeds) for more examples. # ## Synchronous processing # `riko` can modify `streams` via the 40 built-in `pipes` # In[11]: from riko.collections import SyncPipe ### Set the pipe configurations ### fetch_conf = {'url': 'https://news.ycombinator.com/rss'} filter_rule = {'field': 'link', 'op': 'contains', 'value': '.com'} xpath = '/html/body/center/table/tr[3]/td/table[2]/tr[1]/td/table/tr/td[3]/span/span' xpath_conf = {'url': {'subkey': 'comments'}, 'xpath': xpath} ### Create a SyncPipe flow ### # # `SyncPipe` is a convenience class that creates chainable flows # and allows for parallel processing. # # The following flow will: # 1. fetch the hackernews RSS feed # 2. filter for items with '.com' in the link # 3. sort the items ascending by title # 4. fetch the first comment from each item # 5. flatten the result into one raw stream # 6. extract the first item's content # # Note: sorting is not lazy so take caution when using this pipe flow = ( SyncPipe('fetch', conf=fetch_conf) # 1 .filter(conf={'rule': filter_rule}) # 2 .sort(conf={'rule': {'sort_key': 'title'}}) # 3 .xpathfetchpage(conf=xpath_conf)) # 4 stream = flow.output # 5 next(stream)['content'] # 6 # Please see [alternate workflow creation](https://github.com/nerevu/riko/blob/master/COOKBOOK.rst#synchronous-processing) for an alternative (function based) method for # creating a `stream`. Please see [pipes](https://github.com/nerevu/riko/blob/master/docs/FAQ.rst#what-pipes-are-available) for a complete list of available `pipes`. # ## Parallel processing # An example using `riko`'s parallel API to spawn a `ThreadPool`. You can instead enable a `ProcessPool` by additionally passing `threads=False` to `SyncPipe`, i.e., `SyncPipe('fetch', conf={'url': url}, parallel=True, threads=False)`. # In[12]: from riko.collections import SyncPipe ### Set the pipe configurations ### fetch_conf = {'url': 'https://news.ycombinator.com/rss'} filter_rule = {'field': 'link', 'op': 'contains', 'value': '.com'} xpath = '/html/body/center/table/tr[3]/td/table[2]/tr[1]/td/table/tr/td[3]/span/span' xpath_conf = {'url': {'subkey': 'comments'}, 'xpath': xpath} ### Create a parallel SyncPipe flow ### # # The following flow will: # 1. fetch the hackernews RSS feed # 2. filter for items with '.com' in the article link # 3. fetch the first comment from all items in parallel (using 4 workers) # 4. flatten the result into one raw stream # 5. extract the first item's content # # Note: no point in sorting after the filter since parallel fetching doesn't guarantee # order flow = ( SyncPipe('fetch', conf=fetch_conf, parallel=True, workers=4) # 1 .filter(conf={'rule': filter_rule}) # 2 .xpathfetchpage(conf=xpath_conf)) # 3 stream = flow.output # 4 next(stream)['content'] # 5 # ## Asynchronous processing # To enable asynchronous processing, you must install the `async` module. # `pip install riko[async]` # An example using `riko`'s asynchronous API. # In[2]: from riko.bado import coroutine, react from riko.collections import AsyncPipe ### Set the pipe configurations ### fetch_conf = {'url': 'https://news.ycombinator.com/rss'} filter_rule = {'field': 'link', 'op': 'contains', 'value': '.com'} xpath = '/html/body/center/table/tr[3]/td/table[2]/tr[1]/td/table/tr/td[3]/span/span' xpath_conf = {'url': {'subkey': 'comments'}, 'xpath': xpath} ### Create an AsyncPipe flow ### # # The following flow will: # 1. fetch the hackernews RSS feed # 2. filter for items with '.com' in the article link # 3. asynchronously fetch the first comment from each item (using 4 connections) # 4. flatten the result into one raw stream # 5. extract the first item's content # # Note: no point in sorting after the filter since async fetching doesn't guarantee # order @coroutine def run(reactor): stream = yield ( AsyncPipe('fetch', conf=fetch_conf, connections=4) # 1 .filter(conf={'rule': filter_rule}) # 2 .xpathfetchpage(conf=xpath_conf) # 3 .output) # 4 print(next(stream)['content']) # 5 try: react(run) except SystemExit: pass # # Design Principles # The primary data structures in `riko` are the `item` and `stream`. An `item` # is just a python dictionary, and a `stream` is an iterator of `items`. You can # create a `stream` manually with something as simple as # `[{'content': 'hello world'}]`. You manipulate `streams` in # `riko` via `pipes`. A `pipe` is simply a function that accepts either a # `stream` or `item`, and returns a `stream`. `pipes` are composable: you # can use the output of one `pipe` as the input to another `pipe`. # # `riko` `pipes` come in two flavors; `operators` and `processors`. # `operators` operate on an entire `stream` at once and are unable to handle # individual items. Example `operators` include `pipecount`, `pipefilter`, # and `pipereverse`. # In[14]: from riko.modules.reverse import pipe stream = [{'title': 'riko pt. 1'}, {'title': 'riko pt. 2'}] next(pipe(stream)) # `processors` process individual `items` and can be parallelized across # threads or processes. Example `processors` include `pipefetchsitefeed`, # `pipehash`, `pipeitembuilder`, and `piperegex`. # In[15]: from riko.modules.hash import pipe item = {'title': 'riko pt. 1'} stream = pipe(item, field='title') next(stream) # Some `processors`, e.g., `pipetokenizer`, return multiple results. # In[16]: from riko.modules.tokenizer import pipe item = {'title': 'riko pt. 1'} tokenizer_conf = {'delimiter': ' '} stream = pipe(item, conf=tokenizer_conf, field='title') next(stream) # In[17]: # In this case, if we just want the result, we can `emit` it instead stream = pipe(item, conf=tokenizer_conf, field='title', emit=True) next(stream) # `operators` are split into sub-types of `aggregators` # and `composers`. `aggregators`, e.g., `count`, combine # all `items` of an input `stream` into a new `stream` with a single `item`; # while `composers`, e.g., `filter`, create a new `stream` containing # some or all `items` of an input `stream`. # In[26]: from riko.modules.count import pipe stream = [{'title': 'riko pt. 1'}, {'title': 'riko pt. 2'}] next(pipe(stream)) # `processors` are split into sub-types of `source` and `transformer`. # `sources`, e.g., `itembuilder`, can create a `stream` while # `transformers`, e.g. `hash` can only transform items in a `stream`. # In[27]: from riko.modules.itembuilder import pipe attrs = {'key': 'title', 'value': 'riko pt. 1'} next(pipe(conf={'attrs': attrs})) # The following table summaries these observations: # # # type | sub-type | input | output | parallelizable? | creates streams? # -----|----------|-------|--------|-----------------|----------------- # operator | aggregator | stream | stream * | # operator | composer | stream | stream | # processor | source | item | stream | √ | √ # processor | transformer | item | stream | √ # If you are unsure of the type of `pipe` you have, check its metadata. # # `*` the output `stream` of an `aggregator` is an iterator of only 1 `item`. # In[3]: from riko.modules import fetchpage, count fetchpage.async_pipe.__dict__ # In[29]: count.pipe.__dict__ # The `SyncPipe` and `AsyncPipe` classes (among other things) perform this # check for you to allow for convenient method chaining and transparent # parallelization. # In[30]: from riko.collections import SyncPipe attrs = [ {'key': 'title', 'value': 'riko pt. 1'}, {'key': 'content', 'value': "Let's talk about riko!"}] flow = SyncPipe('itembuilder', conf={'attrs': attrs}).hash() flow.list[0] # Please see the [cookbook](https://github.com/nerevu/riko/blob/master/docs/COOKBOOK.rst) for advanced examples including how to wire in # vales from other pipes or accept user input. # # Command-line Interface # `riko` provides a command, `runpipe`, to execute `workflows`. A # `workflow` is simply a file containing a function named `pipe` that creates # a `flow` and processes the resulting `stream`. # ## CLI Setup # `flow.py` # In[31]: from __future__ import print_function from riko.collections import SyncPipe conf1 = {'attrs': [{'value': 'https://google.com', 'key': 'content'}]} conf2 = {'rule': [{'find': 'com', 'replace': 'co.uk'}]} def pipe(test=False): kwargs = {'conf': conf1, 'test': test} flow = SyncPipe('itembuilder', **kwargs).strreplace(conf=conf2) stream = flow.output for i in stream: print(i) # ## CLI Usage # Now to execute `flow.py`, type the command `runpipe flow`. You should # then see the following output in your terminal: # `https://google.co.uk` # `runpipe` will also search the `examples` directory for `workflows`. Type # `runpipe demo` and you should see the following output: # `Deadline to clear up health law eligibility near 682`