Writing Your Own Graph Algorithms

The analytical engine in GraphScope derives from GRAPE, a graph processing system proposed on SIGMOD-2017. GRAPE differs from prior systems in its ability to parallelize sequential graph algorithms as a whole. In GRAPE, sequential algorithms can be easily plugged into with only minor changes and get parallelized to handle large graphs efficiently.

In this tutorial, we will show how to define and run your own algorithm in PIE and Pregel models.

Sounds like fun? Excellent, here we go!

Writing algorithm in PIE model

GraphScope enables users to write algorithms in the PIE programming model in a pure Python mode, first of all, you should import graphscope package and the pie decorator.

In [ ]:
import graphscope
from graphscope.framework.app import AppAssets
from graphscope.analytical.udf.decorators import pie

We use the single source shortest path (SSSP) algorithm as an example. To implement the PIE model, you just need to fulfill this class

In [ ]:
@pie(vd_type="double", md_type="double")
class SSSP_PIE(AppAssets):
    @staticmethod
    def Init(frag, context):
        pass
    @staticmethod
    def PEval(frag, context):
        pass
    @staticmethod
    def IncEval(frag, context):
        pass

The pie decorator contains two params named vd_type and md_type , which represent the vertex data type and message type respectively.

You may specify types for your own algorithms, optional values are int, double, and string. In our SSSP case, we compute the shortest distance to the source for all nodes, so we use double value for vd_type and md_type both.

In Init, PEval, and IncEval, it has frag and context as parameters. You can use these two parameters to access the fragment data and intermediate results. Detail usage please refer to Cython SDK API.

Fulfill Init Function

In [ ]:
@pie(vd_type="double", md_type="double")
class SSSP_PIE(AppAssets):
    @staticmethod
    def Init(frag, context):
        v_label_num = frag.vertex_label_num()
        for v_label_id in range(v_label_num):
            nodes = frag.nodes(v_label_id)
            context.init_value(
                nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate
            )
            context.register_sync_buffer(v_label_id, MessageStrategy.kSyncOnOuterVertex)
    @staticmethod
    def PEval(frag, context):
        pass
    @staticmethod
    def IncEval(frag, context):
        pass

The Init function are responsable for 1) setting the initial value for each node; 2) defining the strategy of message passing; and 3) specifing aggregator for handing received message on each rounds.

Note that the algorithm you defined will run on a property graph. So we should get the vertex label first by v_label_num = frag.vertex_label_num(), then we can traverse all nodes with the same label
and set the initial value by nodes = frag.nodes(v_label_id) and context.init_value(nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate).

Since we are computing the shorest path between the source node and others nodes. So we use PIEAggregateType.kMinAggregate as the aggregator for mesaage aggregation, which means it will
perform min operation upon all received messages. Other avaliable aggregators are kMaxAggregate, kSumAggregate, kProductAggregate, and kOverwriteAggregate.

At the end of Init function, we register the sync buffer for each node with MessageStrategy.kSyncOnOuterVertex, which tells the engine how to pass the message.

Fulfill PEval Function

In [ ]:
@pie(vd_type="double", md_type="double")
class SSSP_PIE(AppAssets):
    @staticmethod
    def Init(frag, context):
        v_label_num = frag.vertex_label_num()
        for v_label_id in range(v_label_num):
            nodes = frag.nodes(v_label_id)
            context.init_value(
                nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate
            )
            context.register_sync_buffer(v_label_id, MessageStrategy.kSyncOnOuterVertex)
    @staticmethod
    def PEval(frag, context):
        src = int(context.get_config(b"src"))
        graphscope.declare(graphscope.Vertex, source)
        native_source = False
        v_label_num = frag.vertex_label_num()
        for v_label_id in range(v_label_num):
            if frag.get_inner_node(v_label_id, src, source):
                native_source = True
                break
        if native_source:
            context.set_node_value(source, 0)
        else:
            return
        e_label_num = frag.edge_label_num()
        for e_label_id in range(e_label_num):
            edges = frag.get_outgoing_edges(source, e_label_id)
            for e in edges:
                dst = e.neighbor()
                distv = e.get_int(2)
                if context.get_node_value(dst) > distv:
                    context.set_node_value(dst, distv)
    @staticmethod
    def IncEval(frag, context):
        pass

In PEval of SSSP, it gets the queried source node by context.get_config(b"src").

PEval checks each fragment whether it contains source node by frag.get_inner_node(v_label_id, src, source). Note that the get_inner_node method needs a source parameter in type Vertex, which you can declare by graphscope.declare(graphscope.Vertex, source)

If a fragment contains the source node, it will traverse the outgoing edges of the source with frag.get_outgoing_edges(source, e_label_id). For each vertex, it computes the distance from the source, and updates the value if the it less than the initial value.

Fulfill IncEval Function

In [ ]:
@pie(vd_type="double", md_type="double")
class SSSP_PIE(AppAssets):
    @staticmethod
    def Init(frag, context):
        v_label_num = frag.vertex_label_num()
        for v_label_id in range(v_label_num):
            nodes = frag.nodes(v_label_id)
            context.init_value(
                nodes, v_label_id, 1000000000.0, PIEAggregateType.kMinAggregate
            )
            context.register_sync_buffer(v_label_id, MessageStrategy.kSyncOnOuterVertex)
    @staticmethod
    def PEval(frag, context):
        src = int(context.get_config(b"src"))
        graphscope.declare(graphscope.Vertex, source)
        native_source = False
        v_label_num = frag.vertex_label_num()
        for v_label_id in range(v_label_num):
            if frag.get_inner_node(v_label_id, src, source):
                native_source = True
                break
        if native_source:
            context.set_node_value(source, 0)
        else:
            return
        e_label_num = frag.edge_label_num()
        for e_label_id in range(e_label_num):
            edges = frag.get_outgoing_edges(source, e_label_id)
            for e in edges:
                dst = e.neighbor()
                distv = e.get_int(2)
                if context.get_node_value(dst) > distv:
                    context.set_node_value(dst, distv)
    @staticmethod
    def IncEval(frag, context):
        v_label_num = frag.vertex_label_num()
        e_label_num = frag.edge_label_num()
        for v_label_id in range(v_label_num):
            iv = frag.inner_nodes(v_label_id)
            for v in iv:
                v_dist = context.get_node_value(v)
                for e_label_id in range(e_label_num):
                    es = frag.get_outgoing_edges(v, e_label_id)
                    for e in es:
                        u = e.neighbor()
                        u_dist = v_dist + e.get_int(2)
                        if context.get_node_value(u) > u_dist:
                            context.set_node_value(u, u_dist)

The only difference between IncEval and PEval of SSSP algorithm is that IncEval are invoked on each fragment, rather than only the fragment with source node. A fragment will repeat the IncEval until there is no messages received. When all the fragments are finished computation, the algorithm is terminated.

Run Your Algorithm on A Graph.

First, let's establish a session and load a graph for testing.

In [ ]:
from graphscope.framework.loader import Loader

# the location of the property graph for testing
property_dir = '/home/jovyan/datasets/property'

graphscope.set_option(show_log=True)

k8s_volumes = {
    "data": {
        "type": "hostPath",
        "field": {
          "path": "/testingdata",
          "type": "Directory"
        },
        "mounts": {
          "mountPath": "/home/jovyan/datasets",
          "readOnly": True
        }
    }
}

sess = graphscope.session(k8s_volumes=k8s_volumes)

graph = sess.g(directed=False)
graph = graph.add_vertices("/home/jovyan/datasets/property/p2p-31_property_v_0", label="person")
graph = graph.add_edges("/home/jovyan/datasets/property/p2p-31_property_e_0", label="knows")

Then initialize your algorithm and query the shorest path from vertex 6 over the graph.

In [ ]:
sssp = SSSP_PIE()
ctx = sssp(graph, src=6)

Runing this cell, your algorithm should evaluate successfully. The results are stored in vineyard in the distributed machies. Let's fetch and check the results.

In [ ]:
r1 = (
    ctx.to_dataframe({"node": "v:person.id", "r": "r:person"})
    .sort_values(by=["node"])
    .to_numpy(dtype=float)
)
r1

Dump and Reload Your Algorithm

You can dump and save your define algorithm for future use.

In [ ]:
import os

# specify the path you want to dump
dump_path = os.path.expanduser("~/Workspace/sssp_pie.gar")

# dump
SSSP_PIE.to_gar(dump_path)

Now, you can find a package named sssp_pie.gar in your ~/Workspace. Reload this algorithm with following code.

In [ ]:
from graphscope.framework.app import load_app

# specify the path you want to dump
dump_path = os.path.expanduser("~/Workspace/sssp_pie.gar")

sssp2 = load_app("SSSP_PIE", dump_path)

Write Algorithm in Pregel Model

In addition to the sub-graph based PIE model, GraphScope supports vertex-centric Pregel model. To define a Pregel algorithm, you should import pregel decorator and fulfil the functions defined on vertex.

In [ ]:
import graphscope
from graphscope.framework.app import AppAssets
from graphscope.analytical.udf.decorators import pregel
In [ ]:
@pregel(vd_type="double", md_type="double")
class SSSP_Pregel(AppAssets):
    @staticmethod
    def Init(v, context):
        pass
    @staticmethod
    def Compute(messages, v, context):
        pass

The pregel decorator has two parameters named vd_type and md_type, which represent the vertex data type and message type respectively.

You can specify the types for your algorithm, options are int, double, and string. For SSSP, we set both to double.

Since Pregel model are defined on vertex, the Init and Compute functions has a parameter v to access the vertex data. See more details in Cython SDK API.

Fulfill Init Function¶

In [ ]:
@pregel(vd_type="double", md_type="double")
class SSSP_Pregel(AppAssets):
    @staticmethod
    def Init(v, context):
        v.set_value(1000000000.0)
    @staticmethod
    def Compute(messages, v, context):
        pass

The Init function sets the initial value for each node by v.set_value(1000000000.0)

Fulfill Compute function¶

In [ ]:
@pregel(vd_type="double", md_type="double")
class SSSP_Pregel(AppAssets):
    @staticmethod
    def Init(v, context):
        v.set_value(1000000000.0)

    @staticmethod
    def Compute(messages, v, context):
        src_id = context.get_config(b"src")
        cur_dist = v.value()
        new_dist = 1000000000.0
        if v.id() == src_id:
            new_dist = 0
        for message in messages:
            new_dist = min(message, new_dist)
        if new_dist < cur_dist:
            v.set_value(new_dist)
            for e_label_id in range(context.edge_label_num()):
                edges = v.outgoing_edges(e_label_id)
                for e in edges:
                    v.send(e.vertex(), new_dist + e.get_int(2))
        v.vote_to_halt()

The Compute function for SSSP computes the new distance for each node by the following steps:

1) Initialize the new value with value 1000000000
2) If the vertex is source node, set its distance to 0.
3) Compute the min value of messages received, and set the value if it less than the current value.

Repeat these, until no more new messages(shorter distance) are generated.

Optional Combiner

Optionally, we can define a combiner to reduce the message communication overhead.

In [ ]:
@pregel(vd_type="double", md_type="double")
class SSSP_Pregel(AppAssets):
    @staticmethod
    def Init(v, context):
        v.set_value(1000000000.0)

    @staticmethod
    def Compute(messages, v, context):
        src_id = context.get_config(b"src")
        cur_dist = v.value()
        new_dist = 1000000000.0
        if v.id() == src_id:
            new_dist = 0
        for message in messages:
            new_dist = min(message, new_dist)
        if new_dist < cur_dist:
            v.set_value(new_dist)
            for e_label_id in range(context.edge_label_num()):
                edges = v.outgoing_edges(e_label_id)
                for e in edges:
                    v.send(e.vertex(), new_dist + e.get_int(2))
        v.vote_to_halt()

    @staticmethod
    def Combine(messages):
        ret = 1000000000.0
        for m in messages:
            ret = min(ret, m)
        return ret

Run Your Pregel Algorithm on Graph.

Next, let's run your Pregel algorithm on the graph, and check the results.

In [ ]:
sssp_pregel = SSSP_Pregel()
ctx = sssp_pregel(graph, src=6)
In [ ]:
r2 = (
    ctx.to_dataframe({"node": "v:person.id", "r": "r:person"})
    .sort_values(by=["node"])
    .to_numpy(dtype=float)
)
r2

It is important to release resources when they are no longer used.

In [ ]:
sess.close()

Aggregator in Pregel

Pregel aggregators are a mechanism for global communication, monitoring, and counting. Each vertex can provide a value to an aggregator in superstep S, the system combines these
values using a reducing operator, and the resulting value is made available to all vertices in superstep S+1. GraphScope provides a number of predefined aggregators for Pregel algorithms, such as min, max, or sum operations on data types.

Here is a example for use a builtin aggregator, more details can be found in Cython SDK API

In [ ]:
@pregel(vd_type="double", md_type="double")
class Aggregators_Pregel_Test(AppAssets):
    @staticmethod
    def Init(v, context):
        # int
        context.register_aggregator(
            b"int_sum_aggregator", PregelAggregatorType.kInt64SumAggregator
        )
        context.register_aggregator(
            b"int_max_aggregator", PregelAggregatorType.kInt64MaxAggregator
        )
        context.register_aggregator(
            b"int_min_aggregator", PregelAggregatorType.kInt64MinAggregator
        )
        # double
        context.register_aggregator(
            b"double_product_aggregator", PregelAggregatorType.kDoubleProductAggregator
        )
        context.register_aggregator(
            b"double_overwrite_aggregator",
            PregelAggregatorType.kDoubleOverwriteAggregator,
        )
        # bool
        context.register_aggregator(
            b"bool_and_aggregator", PregelAggregatorType.kBoolAndAggregator
        )
        context.register_aggregator(
            b"bool_or_aggregator", PregelAggregatorType.kBoolOrAggregator
        )
        context.register_aggregator(
            b"bool_overwrite_aggregator", PregelAggregatorType.kBoolOverwriteAggregator
        )
        # text
        context.register_aggregator(
            b"text_append_aggregator", PregelAggregatorType.kTextAppendAggregator
        )
    @staticmethod
    def Compute(messages, v, context):
        if context.superstep() == 0:
            context.aggregate(b"int_sum_aggregator", 1)
            context.aggregate(b"int_max_aggregator", int(v.id()))
            context.aggregate(b"int_min_aggregator", int(v.id()))
            context.aggregate(b"double_product_aggregator", 1.0)
            context.aggregate(b"double_overwrite_aggregator", 1.0)
            context.aggregate(b"bool_and_aggregator", True)
            context.aggregate(b"bool_or_aggregator", False)
            context.aggregate(b"bool_overwrite_aggregator", True)
            context.aggregate(b"text_append_aggregator", v.id() + b",")
        else:
            if v.id() == b"1":
                assert context.get_aggregated_value(b"int_sum_aggregator") == 62586
                assert context.get_aggregated_value(b"int_max_aggregator") == 62586
                assert context.get_aggregated_value(b"int_min_aggregator") == 1
                assert context.get_aggregated_value(b"double_product_aggregator") == 1.0
                assert (
                    context.get_aggregated_value(b"double_overwrite_aggregator") == 1.0
                )
                assert context.get_aggregated_value(b"bool_and_aggregator") == True
                assert context.get_aggregated_value(b"bool_or_aggregator") == False
                assert (
                    context.get_aggregated_value(b"bool_overwrite_aggregator") == True
                )
                context.get_aggregated_value(b"text_append_aggregator")
            v.vote_to_halt()