#default_exp event
Helpers for getting GitHub API events
#export
from fastcore.utils import *
from fastcore.foundation import *
from fastcore.meta import *
from ghapi.core import *
from ghapi.page import *
import time
from itertools import islice
api = GhApi()
#export
def _list_events(g, username=None, org=None, owner=None, repo=None):
if (username or org or owner) and \
not (bool(username) ^ bool(org) ^ bool(owner)): raise Exception('Can not pass more than one of username, org, and owner')
if (owner and not repo): owner,repo = repo.split('/')
if owner: return g.list_public_events_for_repo_network,{'owner':owner,'repo':repo}
if org: return g.list_public_org_events,{'org':org}
if username: return g.list_public_events_for_user,{'username':username}
return g.list_public_events,{}
#export
def _id2int(x):
x.id = int(x.id)
return x
#export
@patch
@delegates(_list_events)
def list_events(self:GhApi, per_page=30, page=1, **kwargs):
"Fetch public events for repo network, org, user, or all"
oper,kw = _list_events(self.activity, **kwargs)
return oper(per_page=per_page, page=page, **kw).map(_id2int)
#export
@patch
@delegates(_list_events)
def list_events_parallel(self:GhApi, per_page=30, n_pages=8, **kwargs):
"Fetch as many events from `list_events` in parallel as available"
oper,kw = _list_events(self.activity, **kwargs)
return pages(oper, n_pages, per_page=per_page, **kw).concat().map(_id2int)
list_events
and list_events_parallel
support the following:
Events from | Example |
---|---|
Organization | api.list_events_parallel(org='fastai') |
User | api.list_events_parallel(username='jph00') |
Repository network | api.list_events_parallel(owner='fastai', repo='fastcore') |
All public | api.list_events_parallel() |
#export
_bot_re = re.compile('b[o0]t')
def _want_evt(o, types, incl_bot):
if not incl_bot and _bot_re.search(nested_attr(o, 'actor.login') or ''): return False
if types and o.type not in types: return False
return True
#export
@patch
@delegates(_list_events)
def fetch_events(self:GhApi, n_pages=3, pause=0.4, per_page=30, types=None, incl_bot=False, **kwargs):
"Generate an infinite stream of events, optionally filtered to `types, with `pause` seconds between requests"
seen = set()
if types: types=setify(types)
while True:
evts = self.list_events_parallel(n_pages=n_pages, per_page=per_page, **kwargs)
new_evts = L(o for o in evts if o.id not in seen and _want_evt(o, types, incl_bot))
seen.update(new_evts.attrgot('id'))
yield from new_evts
if pause: time.sleep(pause)
' '.join(o.type for o in islice(api.fetch_events(username='jph00'), 5))
'IssueCommentEvent PullRequestEvent PullRequestReviewCommentEvent PullRequestReviewCommentEvent PullRequestReviewEvent'
#hide
# evts = GhEvents(types=('IssuesEvent','ReleaseEvent','PullRequestEvent'), bots=True)
#hide
from nbdev.export import notebook2script
notebook2script()
Converted 00_core.ipynb. Converted 01_actions.ipynb. Converted 02_auth.ipynb. Converted 03_page.ipynb. Converted 04_event.ipynb. Converted 10_cli.ipynb. Converted 50_fullapi.ipynb. Converted 80_tutorial_actions.ipynb. Converted 90_build_lib.ipynb. Converted index.ipynb.