The goal of this notebook is to learn how to connect to an Elasticsearch view and run queries against it. It is not a tutorial on the elasticsearch DSL language for which many well written learning resources are available.
This notebook assumes you've already created project. If not follow the Blue Brain Nexus Quick Start tutorial.
You'll work through the following steps:
This tutorial makes use of the elasticsearch and elasticsearch-dsl python clients allowing to connect to an elasticsearch search endpoint and perform various types of queries against it.
Every project in Blue Brain Nexus comes with an ElasticsearchView enabling to explore and search datausing the Elasticsearch DSL language. The address of such ElasticsearchView is
https://sandbox.bluebrainnexus.io/v1/views/tutorialnexus/$PROJECTLABEL/_search
for a project with label $PROJECTLABEL. The organiation 'tutorialnexus' is the one used throughout the tutorial but it can be replaced by any other organization.
# Install https://github.com/elastic/elasticsearch-py
!pip install elasticsearch
# Install https://github.com/elastic/elasticsearch-dsl-py
!pip install elasticsearch-dsl
import getpass
token = getpass.getpass()
deployment = "https://nexus-sandbox.io/v1"
org_label = "tutorialnexus"
project_label ="myProject"
headers = {}
There is a need to extends the default Urllib3HttpConnection used by the elasticsearch client to:
Solution partially taken from: from https://github.com/elastic/elasticsearch-py/issues/407
from elasticsearch import Elasticsearch, Urllib3HttpConnection
from elasticsearch.serializer import JSONSerializer, DEFAULT_SERIALIZERS
from elasticsearch_dsl import Search
import time
import ssl
import urllib3
from urllib3.exceptions import ReadTimeoutError, SSLError as UrllibSSLError
from urllib3.util.retry import Retry
import warnings
import gzip
from base64 import decodestring
class MyConnection(Urllib3HttpConnection):
def __init__(self,*args, **kwargs):
extra_headers = kwargs.pop('extra_headers', {})
super(MyConnection, self).__init__(*args, **kwargs)
self.headers.update(extra_headers)
def perform_request(
self, method, url, params=None, body=None, timeout=None, ignore=(), headers=None
):
#url = self.url_prefix +url
url = self.url_prefix
#if params:
# url = "%s?%s" % (url, urlencode(params))
full_url = self.host + self.url_prefix
start = time.time()
try:
kw = {}
if timeout:
kw["timeout"] = timeout
# in python2 we need to make sure the url and method are not
# unicode. Otherwise the body will be decoded into unicode too and
# that will fail (#133, #201).
if not isinstance(url, str):
url = url.encode("utf-8")
if not isinstance(method, str):
method = method.encode("utf-8")
request_headers = self.headers
if headers:
request_headers = request_headers.copy()
request_headers.update(headers)
if self.http_compress and body:
try:
body = gzip.compress(body)
except AttributeError:
# oops, Python2.7 doesn't have `gzip.compress` let's try
# again
body = gzip.zlib.compress(body)
response = self.pool.urlopen(
method, url, body, retries=Retry(False), headers=request_headers, **kw
)
duration = time.time() - start
raw_data = response.data.decode("utf-8")
except Exception as e:
self.log_request_fail(
method, full_url, url, body, time.time() - start, exception=e
)
if isinstance(e, UrllibSSLError):
raise SSLError("N/A", str(e), e)
if isinstance(e, ReadTimeoutError):
raise ConnectionTimeout("TIMEOUT", str(e), e)
raise ConnectionError("N/A", str(e), e)
# raise errors based on http status codes, let the client handle those if needed
if not (200 <= response.status < 300) and response.status not in ignore:
self.log_request_fail(
method, full_url, url, body, duration, response.status, raw_data
)
self._raise_error(response.status, raw_data)
self.log_request_success(
method, full_url, url, body, response.status, raw_data, duration
)
return response.status, response.getheaders(), raw_data
es_endpoint = deployment+"/views/"+org_label+"/"+project_label+"/nxv%3AdefaultElasticSearchIndex/_search"
print("Elasticsearch View address: " +es_endpoint)
DEFAULT_SERIALIZERS["application/ld+json"] = JSONSerializer()
es_wrapper = Elasticsearch(es_endpoint, connection_class=MyConnection,send_get_body_as='POST', extra_headers={"Authorization":"Bearer {}".format(token)})
# Retrieve non deprecated resources
s = Search(using=es_wrapper) \
.filter("term", _deprecated="false")
# Aggregate them by type
s.aggs.bucket('per_type', 'terms', field='@type')
response = s.execute()
total = response.hits.total
print('total hits', total.relation, total.value)
# Don't forget that ressources are paginated with 10 as default: use from and size (on Nexus API) to get all the hits
print("Displaying 10 first search results (score id type)")
for hit in response:
print(hit.meta.score, hit['@id'], "" if "@type" not in hit else hit['@type'])
print("Displaying aggregation")
for _type in response.aggregations.per_type.buckets:
print(str(_type.doc_count)+ " resources of type "+_type.key)
The goal here is to illustrate hwo to use the Nexus SDK to create an Elasticsearch view. The full documentation can be found at: https://bluebrainnexus.io/docs/api/current/kg/kg-views-api.html#create-an-elasticsearchview-using-post.
{
"@id": "{someid}",
"@type": [ "View", "ElasticSearchView"],
"resourceSchemas": [ "{resourceSchema}", ...],
"resourceTypes": [ "{resourceType}", ...],
"resourceTag": "{tag}",
"sourceAsText": {sourceAsText},
"includeMetadata": {includeMetadata},
"includeDeprecated": {includeDeprecated},
"mapping": _elasticsearch mapping_
}
An ElasticSearchView is a way to tell Nexus:
resources that conform to a given schema: set resourceSchemas to the targeted schemas
resources that are of a given type: set resourceTypes to the targeted types
resources that are tagged: set resourceTag to the targeted tag value.
!pip install nexus-sdk
import nexussdk as nexus
nexus.config.set_environment(deployment)
nexus.config.set_token(token)
type_to_index = "https://bluebrain.github.io/nexus/vocabulary/File"
view_data = {
"@id": "http://myView6.com",
"@type": [
"ElasticSearchView"
],
"includeMetadata": True,
"includeDeprecated": False,
"resourceTypes":type_to_index,
"mapping": {
"dynamic": "false",
"properties": {
"@id": {
"type": "keyword"
},
"@type": {
"type": "keyword"
}
}
},
"sourceAsText": False
}
try:
response = nexus.views.create_es(org_label=org_label, project_label=project_label,view_data=view_data)
print(dict(response))
except nexus.HTTPError as ne:
print(ne.response.json())
response = nexus.views.list(org_label=org_label, project_label=project_label)
print(dict(response))
Please refer to https://bluebrainnexus.io/docs/api/current/kg/kg-views-api.html#fetch-view-statistics for more details about how to access the view indexing progress.