GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
6: Runtime Scheduling

During engine execution, the engine maintains (in a distributed fashion), a schedule of vertex programs to run. The engine.signal_all() function call essentially injects a list of all vertices in the graph into the engine's scheduler. In this section, we will see how the schedule can be modified during engine execution and how that could be used (to great effect in some cases) to accelerate convergence of your program.

Self Scheduling Example

The simplest form of "dynamic" scheduling is to repeat each vertex's execution for a certain fixed number of iterations (say 10). To do that, we add a "counter" to the data on each vertex by modifying the web_page struct:

struct web_page {
std::string pagename;
double pagerank;
int counter;
web_page():pagerank(0.0),counter(0) { }
explicit web_page(std::string name):pagename(name),
pagerank(0.0),counter(0){ }
void save(graphlab::oarchive& oarc) const {
oarc << pagename << pagerank << counter;
}
void load(graphlab::iarchive& iarc) {
iarc >> pagename >> pagerank >> counter;
}
};

Observe that the constructors were modified to initialize the counters at 0, and the save/load functions must now also save the counter variable.

To achieve self-scheduling, we simply modify the apply() function in the pagerank_program to increment the counter, and signal the current vertex if the counter has not reached 10.

// Use the total rank of adjacent pages to update this page
void apply(icontext_type& context, vertex_type& vertex,
const gather_type& total) {
double newval = total * 0.85 + 0.15;
vertex.data().pagerank = newval;
++vertex.data().counter;
if (vertex.data().counter < 10) context.signal(vertex);
// of course, instead of simply "10" here, this could be comparing
// against a global variable set by a command line option.
}

The context.signal(vertex) call inserts the current vertex into the scheduler. The guarantee provided by the signal() call is that: The vertex signaled will be eventually executed some time after completion of the signal() function call.

If used together with the synchronous engine, this program will perform exactly the equivalent of the traditional "matrix-multiplication-like" PageRank iteration.

Dynamic Scheduling Example

Alternatively, we could take a more "contextual" approach to scheduling. Considering that PageRank is a numeric procedure performed on a large graph, it is not unreasonable to believe that some parts of the graph will converge before other parts of the graph. We could therefore save computation if we only recompute vertices which may change by large amounts.

To implement this, we consider the pagerank_program implemented earlier. We will not make modifications to the gather phases, but we will change the apply phase and introduce a scatter phase. The goal is to achive the following:

  • If the current vertex's PageRank does not change much, no additional action is performed.
  • However, if the current vertex's PageRank changed by greater than some threshold (1E-3), we will signal() all out-pages to recompute their PageRank value.
class dynamic_pagerank_program :
public graphlab::ivertex_program<graph_type, double>,
private:
// a variable local to this program
bool perform_scatter;
public:
// no changes to gather_edges and gather
edge_dir_type gather_edges(icontext_type& context,
const vertex_type& vertex) const {
}
double gather(icontext_type& context, const vertex_type& vertex,
edge_type& edge) const {
return edge.source().data().pagerank / edge.source().num_out_edges();
}
// Use the total rank of adjacent pages to update this page
void apply(icontext_type& context, vertex_type& vertex,
const gather_type& total) {
double newval = total * 0.85 + 0.15;
double prevval = vertex.data.pagerank;
vertex.data().pagerank = newval;
perform_scatter = (std::fabs(prevval - newval) > 1E-3);
}
// The scatter edges depend on whether the pagerank has converged
edge_dir_type scatter_edges(icontext_type& context,
const vertex_type& vertex) const {
if (perform_scatter) return graphlab::OUT_EDGES;
else return graphlab::NO_EDGES;
}
void scatter(icontext_type& context, const vertex_type& vertex,
edge_type& edge) const {
context.signal(edge.target());
}
};

Firstly, we observe that we introduced a private variable perform_scatter to the program. This variable is short-lived and is local to this particular execution of the pagerank program. In the apply() function, we compute the change to the current vertex's pagerank, and if it is above a certain threshold, we set the perform_scatter variable to true.

This next influences the behavior of the scatter_edges() function. If perform_scatter is false, (i.e. insufficient change was made to the current PageRank), we do not perform a scatter. However, if sufficient change was made, scatter_edges() will return graphlab::OUT_EDGES which will cause the scatter() function to be executed on all out-going edges of the current vertex. The scatter() function then simply schedules/signals the destination vertex, requesting it to be executed in the future, picking up the large change made to the current vertex.

If ran using the synchronous engine, you will observe that the time spent within each synchronous iteration decreases, as the number of "signalled" vertices in each iteration decreases over time. This also works well in the asynchronous setting where powerful dynamic schedulers are used to control the order of execution.

Note:
If you dig further into the documentation you will see that the signalling operation can itself be used as a messaging primitive to carry a message to a destination vertex. This allows GraphLab v2.1 to in some sense, also include "Pregel" as part of the implementation. Furthermore, the message could define a "message priority" which can be used in conjunction with the priority-queue based dynamic schedulers to obtain greater control over the order of execution in the asynchronous engine.

In the next section, we will see how to save output of the system.