#default_exp event
Helpers for getting GitHub API events
#export
from fastcore.utils import *
from fastcore.foundation import *
from ghapi.core import *
from ghapi.page import *
import time
from itertools import islice
api = GhApi()
#export
def _list_events(username=None, org=None, owner=None, repo=None):
if (username or org or owner) and \
not (bool(username) ^ bool(org) ^ bool(owner)): raise Exception('Can not pass more than one of username, org, and owner')
if (owner and not repo) or (repo and not owner): 'Must pass both repo and owner, if passing either'
if owner: return api.activity.list_public_events_for_repo_network,{'owner':owner,'repo':repo}
if org: return api.activity.list_public_org_events,{'org':org}
if username: return api.activity.list_public_events_for_user,{'username':username}
return api.activity.list_public_events,{}
#export
@patch
def list_events(self:GhApi, username=None, org=None, owner=None, repo=None, per_page=30, page=1):
"Fetch public events for repo network, org, user, or all"
oper,kwargs = _list_events(username=username, org=org, owner=owner, repo=repo)
return oper(per_page=per_page, page=page, **kwargs)
#export
@patch
def list_events_parallel(self:GhApi, username=None, org=None, owner=None, repo=None, per_page=30, n_pages=8):
"Fetch as many events from `list_events` in parallel as available"
oper,kwargs = _list_events(username=username, org=org, owner=owner, repo=repo)
return pages(oper, n_pages, per_page=per_page, **kwargs).concat()
list_events
and list_events_parallel
support the following:
Events from | Example |
---|---|
Organization | api.list_events_parallel(org='fastai') |
User | api.list_events_parallel(username='jph00') |
Repository network | api.list_events_parallel(owner='fastai', repo='fastcore') |
All public | api.list_events_parallel() |
#export
def fetch_events(types=None):
"Generate an infinite stream of events optionally filtered to `types`"
if types: types=setify(types)
seen = set()
while True:
evts = [o for o in api.list_events_parallel(n_pages=2) if o.id not in seen]
print('\n***', len(evts))
for o in evts:
seen.add(o.id)
if not types or o.type in types: yield o
time.sleep(0.6)
for o in islice(fetch_events('PullRequestEvent'), 100): print(o.id, end=' ')
next(fetch_events('PullRequestEvent'))
evts = GhEvents(types=('IssuesEvent','ReleaseEvent','PullRequestEvent'), bots=True)
for ev in evts: ...
for b in islice(evts,10): ...
#hide
from nbdev.export import notebook2script
notebook2script()
Converted 00_core.ipynb. Converted 01_actions.ipynb. Converted 02_auth.ipynb. Converted 03_page.ipynb. Converted 04_event.ipynb. Converted 10_cli.ipynb. Converted 50_fullapi.ipynb. Converted 80_tutorial_actions.ipynb. Converted 90_build_lib.ipynb. Converted index.ipynb.