from IPython.display import Image
Use elasticsearch to:
Near real-time: there is a slight latency (normally one second) from the time you index a document until the time it becomes searchable.
Distributed: a cluster is a collection of one or more nodes (servers) that together holds your entire data.
Scalable: we'll get to that in a second
Index: an index is a collection of documents that have somewhat similar characteristics. For example, you can have an index for customer data, another index for a product catalog, and yet another index for order data.
Document: a document is a basic unit of information that can be indexed. For example, you can have a document for a single customer, another document for a single product, and yet another for a single order.
What happens when the size of data on a given index exceeds the hardware limits of a node?
Elasticsearch automagically subdivides the index into multiple pieces called shards.
Sharding is important for two primary reasons:
Replicas are just copies of shards. Replicas are important too:
$ ./bin/elasticsearch
import elasticsearch
es = elasticsearch.Elasticsearch([{'host': 'localhost', 'port': 9200}])
import requests
res = requests.get('http://localhost:9200')
print(res.content)
{ "name" : "JWYUFIB", "cluster_name" : "elasticsearch", "cluster_uuid" : "UbuWrUr-TFW1vHcNmstVvg", "version" : { "number" : "6.2.2", "build_hash" : "10b1edd", "build_date" : "2018-02-16T19:01:30.685723Z", "build_snapshot" : false, "lucene_version" : "7.2.1", "minimum_wire_compatibility_version" : "5.6.0", "minimum_index_compatibility_version" : "5.0.0" }, "tagline" : "You Know, for Search" }
Image("img/cat_v2.png")
*nix
command-line toolsImage("img/cat_indices.png")
Image("img/cat_one_index.png")
Image("img/cat_10_shards.png")
Image("img/cat_master_node.png")
es.index(
index='books',
doc_type='novel',
id=1,
body={
'author': 'Mary Shelley',
'title': 'Frankenstein; or, The Modern Prometheus',
'topics': ['medicine', 'monster', 'frame narrative',
'alchemy', 'horror'],
'awesomeness': 0.8
}
)
{u'_id': u'1', u'_index': u'books', u'_primary_term': 1, u'_seq_no': 2, u'_shards': {u'failed': 0, u'successful': 1, u'total': 2}, u'_type': u'novel', u'_version': 1, u'result': u'created'}
es.search(
index="books",
body={
"query": {
"match": {
'author':'Mary Shelley'
}
}
}
)
{u'_shards': {u'failed': 0, u'skipped': 0, u'successful': 5, u'total': 5}, u'hits': {u'hits': [], u'max_score': None, u'total': 0}, u'timed_out': False, u'took': 19}
es.delete(
index='books',
doc_type='novel',
id=1
)
{u'_id': u'1', u'_index': u'books', u'_primary_term': 1, u'_seq_no': 3, u'_shards': {u'failed': 0, u'successful': 1, u'total': 2}, u'_type': u'novel', u'_version': 2, u'result': u'deleted'}
Image("img/Data-Memory-Beta-660x400.jpg")
Image("img/694px-Star_Wars_Logo.svg.png")
Let's iterate over SWAPI people documents and index them
# es.indices.delete(index='sw')
# es.indices.delete(index='books')
def convert_to_float(content):
if content['height'] == 'unknown':
content['height'] = 0.0
else:
content['height'] = int(content['height'])
return content
# Create index
es.indices.create(index='sw')
--------------------------------------------------------------------------- RequestError Traceback (most recent call last) <ipython-input-17-d5c15af9f2a8> in <module>() 1 # Create index ----> 2 es.indices.create(index='sw') /Users/sbraden/.venvs/elasticsearch/lib/python2.7/site-packages/elasticsearch/client/utils.pyc in _wrapped(*args, **kwargs) 74 if p in kwargs: 75 params[p] = kwargs.pop(p) ---> 76 return func(*args, params=params, **kwargs) 77 return _wrapped 78 return _wrapper /Users/sbraden/.venvs/elasticsearch/lib/python2.7/site-packages/elasticsearch/client/indices.pyc in create(self, index, body, params) 89 raise ValueError("Empty value passed for a required argument 'index'.") 90 return self.transport.perform_request('PUT', _make_path(index), ---> 91 params=params, body=body) 92 93 @query_params('allow_no_indices', 'expand_wildcards', 'flat_settings', /Users/sbraden/.venvs/elasticsearch/lib/python2.7/site-packages/elasticsearch/transport.pyc in perform_request(self, method, url, headers, params, body) 312 313 try: --> 314 status, headers_response, data = connection.perform_request(method, url, params, body, headers=headers, ignore=ignore, timeout=timeout) 315 316 except TransportError as e: /Users/sbraden/.venvs/elasticsearch/lib/python2.7/site-packages/elasticsearch/connection/http_urllib3.pyc in perform_request(self, method, url, params, body, timeout, ignore, headers) 161 if not (200 <= response.status < 300) and response.status not in ignore: 162 self.log_request_fail(method, full_url, url, body, duration, response.status, raw_data) --> 163 self._raise_error(response.status, raw_data) 164 165 self.log_request_success(method, full_url, url, body, response.status, /Users/sbraden/.venvs/elasticsearch/lib/python2.7/site-packages/elasticsearch/connection/base.pyc in _raise_error(self, status_code, raw_data) 123 logger.warning('Undecodable raw error response from server: %s', err) 124 --> 125 raise HTTP_EXCEPTIONS.get(status_code, TransportError)(status_code, error_message, additional_info) 126 127 RequestError: TransportError(400, u'resource_already_exists_exception', u'index [sw/TPGrb3lNS4WESOITKcY84g] already exists')
# Define a mapping (important for aggregation tasks later)
es.indices.put_mapping(
index="sw",
doc_type="people",
body={
"properties": {
"height": {
"type": "integer",
}
}
}
)
import json
r = requests.get('http://localhost:9200')
i = 1
while r.status_code == 200:
r = requests.get('http://swapi.co/api/people/'+ str(i))
es.index(
index='sw',
doc_type='people',
id=i,
body=convert_to_float(r.json())
)
i=i+1
r = requests.get('http://localhost:9200')
i = 18
while r.status_code == 200:
r = requests.get('http://swapi.co/api/people/'+ str(i))
es.index(
index='sw',
doc_type='people',
id=i,
body=convert_to_float(r.json())
)
i=i+1
es.count()
{u'_shards': {u'failed': 0, u'skipped': 0, u'successful': 10, u'total': 10}, u'count': 87}
es.get(
index='sw',
doc_type='people',
id=19
)
{u'_id': u'19', u'_index': u'sw', u'_source': {u'birth_year': u'unknown', u'created': u'2014-12-12T11:16:56.569000Z', u'edited': u'2014-12-20T21:17:50.343000Z', u'eye_color': u'blue', u'films': [u'https://swapi.co/api/films/1/'], u'gender': u'male', u'hair_color': u'brown', u'height': 180, u'homeworld': u'https://swapi.co/api/planets/26/', u'mass': u'110', u'name': u'Jek Tono Porkins', u'skin_color': u'fair', u'species': [u'https://swapi.co/api/species/1/'], u'starships': [u'https://swapi.co/api/starships/12/'], u'url': u'https://swapi.co/api/people/19/', u'vehicles': []}, u'_type': u'people', u'_version': 2, u'found': True}
result = es.search(
index="sw",
body={
"query": {"match": {
'name':'Darth Vader'
}}
}
)
for hit in result['hits']['hits']:
print hit['_source']['name']
print hit['_score']
Darth Vader 5.316138 Darth Maul 2.3211865
This will give us both Darth Vader AND Darth Maul. Id 4 and id 44 (notice that they are in the same index, even if we use different node client call the index command). Both results have a score, although Darth Vader is much higher than Darth Maul (2.77 vs 0.60), since Vader is a exact match. Take that Darth Maul!
Useful for autocomplete applications.
result = es.search(
index="sw",
body={
"query": {"prefix": {
"name": "lu"
}}
}
)
for hit in result['hits']['hits']:
print hit['_source']['name']
print hit['_score']
Luke Skywalker 1.0 Luminara Unduli 1.0
Introduces the concept of fuzzy matching.
es.search(
index="sw",
body={
"query": {
"multi_match": {
"query": "jaba",
"fields": ["name", "species"],
"fuzziness": 1
}}})
{u'_shards': {u'failed': 0, u'skipped': 0, u'successful': 5, u'total': 5}, u'hits': {u'hits': [{u'_id': u'16', u'_index': u'sw', u'_score': 1.6810147, u'_source': {u'birth_year': u'600BBY', u'created': u'2014-12-10T17:11:31.638000Z', u'edited': u'2014-12-20T21:17:50.338000Z', u'eye_color': u'orange', u'films': [u'https://swapi.co/api/films/4/', u'https://swapi.co/api/films/3/', u'https://swapi.co/api/films/1/'], u'gender': u'hermaphrodite', u'hair_color': u'n/a', u'height': 175, u'homeworld': u'https://swapi.co/api/planets/24/', u'mass': u'1,358', u'name': u'Jabba Desilijic Tiure', u'skin_color': u'green-tan, brown', u'species': [u'https://swapi.co/api/species/5/'], u'starships': [], u'url': u'https://swapi.co/api/people/16/', u'vehicles': []}, u'_type': u'people'}], u'max_score': 1.6810147, u'total': 1}, u'timed_out': False, u'took': 11}
Aggregations:
Image("img/aggregation.png")
# Check on your index mapping. Can you aggregate over a certain field?
result = es.indices.get_mapping(index="sw", doc_type="people")
result['sw']['mappings']['people']['properties']['height']
{u'type': u'integer'}
result = es.search(
index="sw",
doc_type="people",
body={
"aggs": {
"average_height": {
"avg":{
"field": "height",
}
}
}
}
)
result['aggregations']['average_height']
{u'value': 162.33333333333334}