The analytical engine in GraphScope derives from GRAPE, a graph processing system proposed on SIGMOD-2017. GRAPE differs from prior systems in its ability to parallelize sequential graph algorithms as a whole. In GRAPE, sequential algorithms can be easily plugged into with only minor changes and get parallelized to handle large graphs efficiently.
In this tutorial, we will show how to define and run your own algorithm in PIE and Pregel models.
Sounds like fun? Excellent, here we go!
# Install graphscope package if you are NOT in the Playground
!pip3 install graphscope
GraphScope enables users to write algorithms in the PIE programming model in a pure Python mode, first of all, you should import graphscope package and the pie decorator.
# Import the graphscope module.
import graphscope
from graphscope.framework.app import AppAssets
from graphscope.analytical.udf.decorators import pie
graphscope.set_option(show_log=False) # enable logging
We use the single source shortest path (SSSP) algorithm as an example. To implement the PIE model, you just need to fulfill this class
@pie(vd_type="double", md_type="double")
class SSSP_PIE(AppAssets):
@staticmethod
def Init(frag, context):
pass
@staticmethod
def PEval(frag, context):
pass
@staticmethod
def IncEval(frag, context):
pass
The pie decorator contains two params named vd_type
and md_type
, which represent the vertex data type and message type respectively.
You may specify types for your own algorithms, optional values are int
, double
, and string
.
In our SSSP case, we compute the shortest distance to the source for all nodes, so we use double
value for vd_type
and md_type
both.
In Init
, PEval
, and IncEval
, it has frag and context as parameters. You can use these two parameters to access the fragment data and intermediate results. Detail usage please refer to Cython SDK API.
@pie(vd_type="double", md_type="double")
class SSSP_PIE(AppAssets):
@staticmethod
def Init(frag, context):
v_label_num = frag.vertex_label_num()
for v_label_id in range(v_label_num):
nodes = frag.nodes(v_label_id)
context.init_value(
nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate
)
context.register_sync_buffer(v_label_id, MessageStrategy.kSyncOnOuterVertex)
@staticmethod
def PEval(frag, context):
pass
@staticmethod
def IncEval(frag, context):
pass
The Init
function are responsable for 1) setting the initial value for each node; 2) defining the strategy of message passing; and 3) specifing aggregator for handing received message on each rounds.
Note that the algorithm you defined will run on a property graph. So we should get the vertex label first by v_label_num = frag.vertex_label_num()
, then we can traverse all nodes with the same label
and set the initial value by nodes = frag.nodes(v_label_id)
and context.init_value(nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate)
.
Since we are computing the shorest path between the source node and others nodes. So we use PIEAggregateType.kMinAggregate
as the aggregator for mesaage aggregation, which means it will
perform min
operation upon all received messages. Other avaliable aggregators are kMaxAggregate
, kSumAggregate
, kProductAggregate
, and kOverwriteAggregate
.
At the end of Init
function, we register the sync buffer for each node with MessageStrategy.kSyncOnOuterVertex
, which tells the engine how to pass the message.
@pie(vd_type="double", md_type="double")
class SSSP_PIE(AppAssets):
@staticmethod
def Init(frag, context):
v_label_num = frag.vertex_label_num()
for v_label_id in range(v_label_num):
nodes = frag.nodes(v_label_id)
context.init_value(
nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate
)
context.register_sync_buffer(v_label_id, MessageStrategy.kSyncOnOuterVertex)
@staticmethod
def PEval(frag, context):
src = int(context.get_config(b"src"))
graphscope.declare(graphscope.Vertex, source)
native_source = False
v_label_num = frag.vertex_label_num()
for v_label_id in range(v_label_num):
if frag.get_inner_node(v_label_id, src, source):
native_source = True
break
if native_source:
context.set_node_value(source, 0)
else:
return
e_label_num = frag.edge_label_num()
for e_label_id in range(e_label_num):
edges = frag.get_outgoing_edges(source, e_label_id)
for e in edges:
dst = e.neighbor()
distv = e.get_int(2)
if context.get_node_value(dst) > distv:
context.set_node_value(dst, distv)
@staticmethod
def IncEval(frag, context):
pass
In PEval
of SSSP, it gets the queried source node by context.get_config(b"src")
.
PEval
checks each fragment whether it contains source node by frag.get_inner_node(v_label_id, src, source)
. Note that the get_inner_node
method needs a source
parameter in type Vertex
, which you can declare by graphscope.declare(graphscope.Vertex, source)
If a fragment contains the source node, it will traverse the outgoing edges of the source with frag.get_outgoing_edges(source, e_label_id)
. For each vertex, it computes the distance from the source, and updates the value if the it less than the initial value.
@pie(vd_type="double", md_type="double")
class SSSP_PIE(AppAssets):
@staticmethod
def Init(frag, context):
v_label_num = frag.vertex_label_num()
for v_label_id in range(v_label_num):
nodes = frag.nodes(v_label_id)
context.init_value(
nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate
)
context.register_sync_buffer(v_label_id, MessageStrategy.kSyncOnOuterVertex)
@staticmethod
def PEval(frag, context):
src = int(context.get_config(b"src"))
graphscope.declare(graphscope.Vertex, source)
native_source = False
v_label_num = frag.vertex_label_num()
for v_label_id in range(v_label_num):
if frag.get_inner_node(v_label_id, src, source):
native_source = True
break
if native_source:
context.set_node_value(source, 0)
else:
return
e_label_num = frag.edge_label_num()
for e_label_id in range(e_label_num):
edges = frag.get_outgoing_edges(source, e_label_id)
for e in edges:
dst = e.neighbor()
distv = e.get_int(2)
if context.get_node_value(dst) > distv:
context.set_node_value(dst, distv)
@staticmethod
def IncEval(frag, context):
v_label_num = frag.vertex_label_num()
e_label_num = frag.edge_label_num()
for v_label_id in range(v_label_num):
iv = frag.inner_nodes(v_label_id)
for v in iv:
v_dist = context.get_node_value(v)
for e_label_id in range(e_label_num):
es = frag.get_outgoing_edges(v, e_label_id)
for e in es:
u = e.neighbor()
u_dist = v_dist + e.get_int(2)
if context.get_node_value(u) > u_dist:
context.set_node_value(u, u_dist)
The only difference between IncEval
and PEval
of SSSP algorithm is that IncEval
are invoked
on each fragment, rather than only the fragment with source node. A fragment will repeat the IncEval
until there is no messages received. When all the fragments are finished computation, the algorithm is terminated.
# Load p2p network dataset
from graphscope.dataset import load_p2p_network
graph = load_p2p_network(directed=False, generate_eid=False)
Then initialize your algorithm and query the shorest path from vertex 6
over the graph.
sssp = SSSP_PIE()
ctx = sssp(graph, src=6)
Runing this cell, your algorithm should evaluate successfully. The results are stored in vineyard in the distributed machies. Let's fetch and check the results.
r1 = (
ctx.to_dataframe({"node": "v:host.id", "r": "r:host"})
.sort_values(by=["node"])
.to_numpy(dtype=float)
)
r1
You can dump and save your define algorithm for future use.
import os
# specify the path you want to dump
dump_path = os.path.expanduser("~/sssp_pie.gar")
# dump
SSSP_PIE.to_gar(dump_path)
Now, you can find a package named sssp_pie.gar
in your ~/
. Reload this algorithm with following code.
from graphscope.framework.app import load_app
# specify the path you want to dump
dump_path = os.path.expanduser("~/sssp_pie.gar")
sssp2 = load_app(dump_path)
In addition to the sub-graph based PIE model, GraphScope supports vertex-centric Pregel model. To define a Pregel algorithm, you should import pregel decorator and fulfil the functions defined on vertex.
import graphscope
from graphscope.framework.app import AppAssets
from graphscope.analytical.udf.decorators import pregel
@pregel(vd_type="double", md_type="double")
class SSSP_Pregel(AppAssets):
@staticmethod
def Init(v, context):
pass
@staticmethod
def Compute(messages, v, context):
pass
The pregel decorator has two parameters named vd_type
and md_type
, which represent the vertex data type and message type respectively.
You can specify the types for your algorithm, options are int
, double
, and string
. For SSSP, we set both to double
.
Since Pregel model are defined on vertex, the Init
and Compute
functions has a parameter v
to access the vertex data. See more details in Cython SDK API.
@pregel(vd_type="double", md_type="double")
class SSSP_Pregel(AppAssets):
@staticmethod
def Init(v, context):
v.set_value(1000000000.0)
@staticmethod
def Compute(messages, v, context):
pass
The Init
function sets the initial value for each node by v.set_value(1000000000.0)
@pregel(vd_type="double", md_type="double")
class SSSP_Pregel(AppAssets):
@staticmethod
def Init(v, context):
v.set_value(1000000000.0)
@staticmethod
def Compute(messages, v, context):
src_id = context.get_config(b"src")
cur_dist = v.value()
new_dist = 1000000000.0
if v.id() == src_id:
new_dist = 0
for message in messages:
new_dist = min(message, new_dist)
if new_dist < cur_dist:
v.set_value(new_dist)
for e_label_id in range(context.edge_label_num()):
edges = v.outgoing_edges(e_label_id)
for e in edges:
v.send(e.vertex(), new_dist + e.get_int(2))
v.vote_to_halt()
The Compute
function for SSSP computes the new distance for each node by the following steps:
min
value of messages received, and set the value if it less than the current value.Repeat these, until no more new messages(shorter distance) are generated.
Optionally, we can define a combiner to reduce the message communication overhead.
@pregel(vd_type="double", md_type="double")
class SSSP_Pregel(AppAssets):
@staticmethod
def Init(v, context):
v.set_value(1000000000.0)
@staticmethod
def Compute(messages, v, context):
src_id = context.get_config(b"src")
cur_dist = v.value()
new_dist = 1000000000.0
if v.id() == src_id:
new_dist = 0
for message in messages:
new_dist = min(message, new_dist)
if new_dist < cur_dist:
v.set_value(new_dist)
for e_label_id in range(context.edge_label_num()):
edges = v.outgoing_edges(e_label_id)
for e in edges:
v.send(e.vertex(), new_dist + e.get_int(2))
v.vote_to_halt()
@staticmethod
def Combine(messages):
ret = 1000000000.0
for m in messages:
ret = min(ret, m)
return ret
Next, let's run your Pregel algorithm on the graph, and check the results.
sssp_pregel = SSSP_Pregel()
ctx = sssp_pregel(graph, src=6)
r2 = (
ctx.to_dataframe({"node": "v:host.id", "r": "r:host"})
.sort_values(by=["node"])
.to_numpy(dtype=float)
)
r2
Pregel aggregators are a mechanism for global communication, monitoring, and counting. Each vertex can provide a value to an aggregator in superstep S
, the system combines these
values using a reducing operator, and the resulting value is made available to all vertices in superstep S+1
. GraphScope provides a number of predefined aggregators for Pregel algorithms, such as min
, max
, or sum
operations on data types.
Here is a example for use a builtin aggregator, more details can be found in Cython SDK API
@pregel(vd_type="double", md_type="double")
class Aggregators_Pregel_Test(AppAssets):
@staticmethod
def Init(v, context):
# int
context.register_aggregator(
b"int_sum_aggregator", PregelAggregatorType.kInt64SumAggregator
)
context.register_aggregator(
b"int_max_aggregator", PregelAggregatorType.kInt64MaxAggregator
)
context.register_aggregator(
b"int_min_aggregator", PregelAggregatorType.kInt64MinAggregator
)
# double
context.register_aggregator(
b"double_product_aggregator", PregelAggregatorType.kDoubleProductAggregator
)
context.register_aggregator(
b"double_overwrite_aggregator",
PregelAggregatorType.kDoubleOverwriteAggregator,
)
# bool
context.register_aggregator(
b"bool_and_aggregator", PregelAggregatorType.kBoolAndAggregator
)
context.register_aggregator(
b"bool_or_aggregator", PregelAggregatorType.kBoolOrAggregator
)
context.register_aggregator(
b"bool_overwrite_aggregator", PregelAggregatorType.kBoolOverwriteAggregator
)
# text
context.register_aggregator(
b"text_append_aggregator", PregelAggregatorType.kTextAppendAggregator
)
@staticmethod
def Compute(messages, v, context):
if context.superstep() == 0:
context.aggregate(b"int_sum_aggregator", 1)
context.aggregate(b"int_max_aggregator", int(v.id()))
context.aggregate(b"int_min_aggregator", int(v.id()))
context.aggregate(b"double_product_aggregator", 1.0)
context.aggregate(b"double_overwrite_aggregator", 1.0)
context.aggregate(b"bool_and_aggregator", True)
context.aggregate(b"bool_or_aggregator", False)
context.aggregate(b"bool_overwrite_aggregator", True)
context.aggregate(b"text_append_aggregator", v.id() + b",")
else:
if v.id() == b"1":
assert context.get_aggregated_value(b"int_sum_aggregator") == 62586
assert context.get_aggregated_value(b"int_max_aggregator") == 62586
assert context.get_aggregated_value(b"int_min_aggregator") == 1
assert context.get_aggregated_value(b"double_product_aggregator") == 1.0
assert (
context.get_aggregated_value(b"double_overwrite_aggregator") == 1.0
)
assert context.get_aggregated_value(b"bool_and_aggregator") == True
assert context.get_aggregated_value(b"bool_or_aggregator") == False
assert (
context.get_aggregated_value(b"bool_overwrite_aggregator") == True
)
context.get_aggregated_value(b"text_append_aggregator")
v.vote_to_halt()