%matplotlib inline
%load_ext cypher
import asyncio
import networkx as nx
import pandas as pd
import projx as px
import matplotlib.pyplot as plt
from datetime import datetime
from gizmo import AsyncGremlinClient
plt.rcParams['figure.figsize'] = 12, 7
Fire up the Neo4j server first:
./neo4j-community-2.1.6/bin/neo4j console
# The arXiv cond-mat data set: http://konect.uni-koblenz.de/networks/opsahl-collaboration
arXiv_condmat_etl = {
"extractor": {
"edgelist": {
"filename": "data/opsahl-collaboration/out.opsahl-collaboration",
"delim": " ",
"pattern": [
{"node": {"alias": "n"}},
{"edge": {}},
{"node": {"alias": "m"}}
]
}
},
"transformers": [
{
"edge": {
"pattern": [
{"node": {"alias": "n", "label": "Author"}},
{"edge": {"label": "IN"}},
{"node": {"alias": "m", "label": "Paper"}}
]
}
}
],
"loader": {
"edgelist2neo4j": {
"uri": "http://localhost:7474/db/data",
"stmt_per_req": 500,
"req_per_tx": 25,
"indicies": [
{"label": "Author", "attr": "UniqueId"},
{"label": "Paper", "attr": "UniqueId"}
]
}
}
}
px.execute_etl(arXiv_condmat_etl)
Statements per request: 500 Requests per transactions: 25 Created index: CREATE INDEX ON :Author(UniqueId); Created index: CREATE INDEX ON :Paper(UniqueId); Load complete: merged 58500 edges in 0:00:28.459678
Not bad! ~2000 merged edges/sec
# This uses ipython-cypher by @versae.
num_rels = %cypher match (auth:Author)-[rels:IN]->(pap:Paper) return count(rels)
num_auths = %cypher match (auth:Author) return count(auth)
num_papers = %cypher match (pap:Paper) return count(pap)
1 rows affected. 1 rows affected. 1 rows affected.
print(num_rels, num_auths, num_papers)
+-------------+ | count(rels) | +-------------+ | 58595 | +-------------+ +-------------+ | count(auth) | +-------------+ | 16726 | +-------------+ +------------+ | count(pap) | +------------+ | 22015 | +------------+
neo4j2nx_etl = {
"extractor": {
"neo4j": {
"query": "match (n:Author)-[:IN]->(m:Paper) return n, m",
"uri": "http://localhost:7474/db/data"
}
},
"transformers": [
{
"node": {
"pattern": [{"node": {"alias": "n", "unique": "UniqueId"}}],
"set": [
{"key": "name", "value_lookup": "n.UniqueId"},
{"key": "type", "value": "Author"}
]
},
},
{
"node": {
"pattern": [{"node": {"alias": "m", "unique": "UniqueId"}}],
"set": [
{"key": "name", "value_lookup": "m.UniqueId"},
{"key": "type", "value": "Paper"}
]
},
},
{
"edge": {
"pattern": [
{"node": {"alias": "n", "unique": "UniqueId"}},
{"edge": {}},
{"node": {"alias": "m", "unique": "UniqueId"}}
]
}
}
],
"loader": {
"neo4j2nx": {}
}
}
graph = px.execute_etl(neo4j2nx_etl)
num_papers = len([n for n, k in graph.nodes(data=True) if k["type"] == "Paper"])
num_authors = len([n for n, k in graph.nodes(data=True) if k["type"] == "Author"])
# Everything seems to be in order.
print(len(graph.edges()), num_authors, num_papers)
58595 16726 22015
p = px.Projection(graph)
subgraph = p.execute("""
match (a1:Author)-(p:Paper)-(a2:Author)
project (a1)-(a2) method newman Paper
delete p""")
print(len(subgraph.nodes()), len(subgraph.edges())) # ~500 authors that wrote a single paper solo.
16264 47594
eweight = pd.Series([a["weight"] for (s, t, a) in subgraph.edges(data=True)])
eweight.describe()
count 47594.000000 mean 0.571679 std 0.809551 min 0.058824 25% 0.174242 50% 0.333333 75% 0.500000 max 22.333333 dtype: float64
def prob_dist(itrbl):
count = {}
for i in itrbl:
count.setdefault(i, 0)
count[i] += 1
return pd.Series(count)
deg_dist = prob_dist(nx.degree(subgraph).values())
plt.scatter(deg_dist.index, deg_dist)
plt.xscale("log")
plt.yscale("log")
# Write to txtfile
with open('output.txt', 'w') as f:
for s, t in subgraph.edges():
f.write(str(s) + '\t' + str(t) + '\n')
Note: This is just a quick demo to try out the driver, I've made no attempt to do any sort of batch loading/server tuning.
You can get the Titan 0.0.9 here. It comes packaged with the TP3 Gremlin Server, unpack and...
Fire up the Gremlin Server:
./bin/gremlin-server.sh
def build_schema(gc):
script = """
mgmt = g.openManagement();
uniqueId = mgmt.makePropertyKey('uniqueId').dataType(Integer.class).make();
mgmt.buildIndex('byId', Vertex.class).addKey(uniqueId).unique().buildCompositeIndex();
collabs=mgmt.makeEdgeLabel('collabs').make();
mgmt.commit();"""
task = gc.task(gc.submit, script,
consumer=lambda x: print("Commited tx with response code: {}".format(x.status_code)))
gc.run_until_complete(task)
def load_edges(gc):
start = datetime.now()
script = """
getOrCreate = { id ->
def n = g.V().has('uniqueId', id)
if (n.hasNext()) {n.next()} else {g.addVertex("uniqueId", id)}
}
new File('output.txt').eachLine {
(fromVertex, toVertex) = it.split('\t').collect(getOrCreate)
fromVertex.addEdge('collabs', toVertex)
}
g.tx().commit()"""
task = gc.task(gc.submit, script,
consumer=lambda x: print("Commited tx with response code: {}".format(x.status_code)))
gc.run_until_complete(task)
print("Loaded in {}".format(datetime.now() - start))
@asyncio.coroutine
def count_nodes(gc):
yield from gc.submit("g.V().count()", collect=False, consumer=lambda x: print(x))
@asyncio.coroutine
def count_edges(gc):
yield from gc.submit("g.E().count()", collect=False, consumer=lambda x: print(x))
gc = AsyncGremlinClient()
build_schema(gc)
Commited tx with response code: 200
load_edges(gc)
Commited tx with response code: 200 Loaded in 0:00:16.713325
gc.run_until_complete(count_nodes(gc))
[16264]
gc.run_until_complete(count_edges(gc))
[47594]