This is a tutorial on ElasticSearch for the Pandas programmer.
Installing is easy. Get Java and download elasticsearch. Unzip it. Then run bin/elasticsearch.bat
or bin/elasticsearch
.
Let's take a dataset from the Internet and populate it into Elasticsearch. This Singapore property data is archived at http://files.gramener.com/data/. Let's load that via Pandas.
import pandas as pd
data = pd.read_csv('http://files.gramener.com/data/singapore-property-data.csv')
The data has information about various construction projects, their location, the number of units and the price range at which they were sold.
data.irow(0)
year 2007 month 6 Project Name 38 AMBER Street Name AMBER ROAD Developer Ho Brothers Investment Pte Ltd Property Type Non-Landed Locality RCR Total Number of Units in Project 30 Cumulative Units Launched to-date 0 Cumulative Units Sold to-date 0 Cumulative Units Launched but Unsold 0 Units Launched in the Month 0 Units Sold in the Month 0 Median Price($psm) # in the Month Number Sold By Price Range - Lowest Price ($psm) # in the Month - Highest Price ($psm) # in the Month - Name: 0, dtype: object
Let's populate this into Elasticsearch under an index called singapore
and a type called property
.
First, let's delete everything under it.
import json
import requests
root = 'http://localhost:9200'
database = root + '/singapore/property'
requests.delete(database).json()
{u'acknowledged': True}
# Loop through the first 5 row and add the document. The ID is the index
for index, row in data.head().iterrows():
print requests.put(database + '/%d' % index, data=row.to_json()).json()
{u'_type': u'property', u'_id': u'0', u'created': True, u'_version': 1, u'_index': u'singapore'} {u'_type': u'property', u'_id': u'1', u'created': True, u'_version': 1, u'_index': u'singapore'} {u'_type': u'property', u'_id': u'2', u'created': True, u'_version': 1, u'_index': u'singapore'} {u'_type': u'property', u'_id': u'3', u'created': True, u'_version': 1, u'_index': u'singapore'} {u'_type': u'property', u'_id': u'4', u'created': True, u'_version': 1, u'_index': u'singapore'}
The above step is slow for a large number of documents, and can take a few minutes. The bulk API is faster. Without worrying about how, let's run this code:
bulk_command = []
action = json.dumps({
"index": {
"_index": "singapore",
"_type": "property",
"_id": "%d"
}
})
for index, row in data.iterrows():
bulk_command.append(action % index + '\n')
bulk_command.append(row.to_json() + '\n')
result = requests.post(root + '/_bulk', data=''.join(bulk_command)).json()
print 'Errors:', result['errors']
Errors: False
requests.get(database + '/_count').json()
{u'_shards': {u'failed': 0, u'successful': 5, u'total': 5}, u'count': 31683}
len(data)
31683
requests.post(database + '/_count').json()['count']
31683
data.groupby('year')['Project Name'].count()
year 2007 2402 2008 5072 2009 5587 2010 5652 2011 5819 2012 6121 2013 1030 Name: Project Name, dtype: int64
requests.post(database + '/_search?search_type=count', data=json.dumps({
"size": 0,
"aggs": {
"yearwise": {
"terms": {
"field": "year"
}
}
}
})).json()['aggregations']['yearwise']['buckets']
[{u'doc_count': 6121, u'key': 2012}, {u'doc_count': 5819, u'key': 2011}, {u'doc_count': 5652, u'key': 2010}, {u'doc_count': 5587, u'key': 2009}, {u'doc_count': 5072, u'key': 2008}, {u'doc_count': 2402, u'key': 2007}, {u'doc_count': 1030, u'key': 2013}]
data.groupby('year')['Total Number of Units in Project'].sum()
year 2007 279810 2008 533265 2009 591630 2010 653184 2011 792051 2012 978774 2013 177945 Name: Total Number of Units in Project, dtype: int64
requests.post(database + '/_search?search_type=count', data=json.dumps({
"size": 0,
"aggs": {
"yearwise": {
"terms": {
"field": "year"
},
"aggs": {
"total_units": {
"sum": {
"field": "Total Number of Units in Project"
}
}
}
}
}
})).json()['aggregations']['yearwise']['buckets']
[{u'doc_count': 6121, u'key': 2012, u'total_units': {u'value': 978774.0}}, {u'doc_count': 5819, u'key': 2011, u'total_units': {u'value': 792051.0}}, {u'doc_count': 5652, u'key': 2010, u'total_units': {u'value': 653184.0}}, {u'doc_count': 5587, u'key': 2009, u'total_units': {u'value': 591630.0}}, {u'doc_count': 5072, u'key': 2008, u'total_units': {u'value': 533265.0}}, {u'doc_count': 2402, u'key': 2007, u'total_units': {u'value': 279810.0}}, {u'doc_count': 1030, u'key': 2013, u'total_units': {u'value': 177945.0}}]
# Shutdown
requests.post(root + '/_shutdown').json()
{u'cluster_name': u'elasticsearch', u'nodes': {u'cZkboCdnSi2PqPUJD6pfiA': {u'name': u'Tenpin'}}}