23 #ifndef GRAPHLAB_GRAPH_JOIN_HPP
24 #define GRAPHLAB_GRAPH_JOIN_HPP
26 #include <boost/unordered_map.hpp>
27 #include <graphlab/util/hopscotch_map.hpp>
28 #include <graphlab/graph/distributed_graph.hpp>
29 #include <graphlab/rpc/dc_dist_object.hpp>
170 template <
typename LeftGraph,
typename RightGraph>
198 struct injective_join_index {
199 std::vector<size_t> vtx_to_key;
202 std::vector<procid_t> opposing_join_proc;
205 injective_join_index left_inj_index, right_inj_index;
211 rmi(dc, this), left_graph(left), right_graph(right) { }
254 template <
typename LeftEmitKey,
typename RightEmitKey>
256 RightEmitKey right_emit_key) {
257 typedef std::pair<size_t, vertex_id_type> key_vertex_pair;
271 reset_and_fill_injective_index(left_inj_index,
273 left_emit_key,
"left graph");
275 reset_and_fill_injective_index(right_inj_index,
277 right_emit_key,
"right graph");
282 compute_injective_join();
303 template <
typename JoinOp>
305 injective_join(left_inj_index, left_graph,
306 right_inj_index, right_graph,
329 template <
typename JoinOp>
331 injective_join(right_inj_index, right_graph,
332 left_inj_index, left_graph,
337 template <
typename Graph,
typename EmitKey>
338 void reset_and_fill_injective_index(injective_join_index& idx,
341 const char* message) {
343 idx.vtx_to_key.resize(graph.num_local_vertices());
344 idx.key_to_vtx.clear();
345 idx.opposing_join_proc.resize(graph.num_local_vertices(), (
procid_t)(-1));
347 for(
lvid_type v = 0; v < graph.num_local_vertices(); ++v) {
348 typename Graph::local_vertex_type lv = graph.l_vertex(v);
350 typename Graph::vertex_type vtx(lv);
351 size_t key = emit_key(vtx);
352 idx.vtx_to_key[v] = key;
353 if (key != (
size_t)(-1)) {
354 if (idx.key_to_vtx.count(key) > 0) {
355 logstream(
LOG_ERROR) <<
"Duplicate key in " << message << std::endl;
356 logstream(
LOG_ERROR) <<
"Duplicate keys not permitted" << std::endl;
357 throw "Duplicate Key in Join";
359 idx.key_to_vtx.insert(std::make_pair(key, v));
365 void compute_injective_join() {
366 std::vector<std::vector<size_t> > left_keys =
367 get_procs_with_keys(left_inj_index.vtx_to_key, left_graph);
368 std::vector<std::vector<size_t> > right_keys =
369 get_procs_with_keys(right_inj_index.vtx_to_key, right_graph);
373 hopscotch_map<size_t, procid_t> left_key_to_procs;
377 for (
size_t p = 0; p < left_keys.size(); ++p) {
378 for (
size_t i = 0; i < left_keys[p].size(); ++i) {
379 ASSERT_MSG(left_key_to_procs.count(left_keys[p][i]) == 0,
380 "Duplicate keys not permitted for left graph keys in injective join");
381 left_key_to_procs.insert(std::make_pair(left_keys[p][i], p));
383 std::vector<size_t>().swap(left_keys[p]);
389 std::pair<size_t, procid_t> > > left_match(rmi.
numprocs());
392 std::pair<size_t, procid_t> > > right_match(rmi.
numprocs());
395 for (
size_t p = 0; p < right_keys.size(); ++p) {
396 for (
size_t i = 0; i < right_keys[p].size(); ++i) {
397 size_t key = right_keys[p][i];
398 hopscotch_map<size_t, procid_t>::iterator iter =
399 left_key_to_procs.find(key);
400 if (iter != left_key_to_procs.end()) {
401 ASSERT_MSG(iter->second != (
procid_t)(-1),
402 "Duplicate keys not permitted for right graph keys in injective join");
408 left_match[left_proc].push_back(std::make_pair(key, right_proc));
409 right_match[right_proc].push_back(std::make_pair(key, left_proc));
415 std::vector<size_t>().swap(right_keys[p]);
419 rmi.all_to_all(left_match);
420 rmi.all_to_all(right_match);
425 #pragma omp parallel for
427 for (
size_t p = 0;p < left_match.size(); ++p) {
428 for (
size_t i = 0;i < left_match[p].size(); ++i) {
430 hopscotch_map<size_t, vertex_id_type>::const_iterator iter =
431 left_inj_index.key_to_vtx.find(left_match[p][i].first);
432 ASSERT_TRUE(iter != left_inj_index.key_to_vtx.end());
434 left_inj_index.opposing_join_proc[iter->second] = left_match[p][i].second;
440 #pragma omp parallel for
442 for (
size_t p = 0;p < right_match.size(); ++p) {
443 for (
size_t i = 0;i < right_match[p].size(); ++i) {
445 hopscotch_map<size_t, vertex_id_type>::const_iterator iter =
446 right_inj_index.key_to_vtx.find(right_match[p][i].first);
447 ASSERT_TRUE(iter != right_inj_index.key_to_vtx.end());
449 right_inj_index.opposing_join_proc[iter->second] = right_match[p][i].second;
458 template <
typename Graph>
459 std::vector<std::vector<size_t> >
460 get_procs_with_keys(
const std::vector<size_t>& local_key_list, Graph& g) {
463 std::vector<std::vector<size_t> > procs_with_keys(rmi.
numprocs());
464 for (
size_t i = 0; i < local_key_list.size(); ++i) {
465 if (g.l_vertex(i).owned() && local_key_list[i] != (size_t)(-1)) {
467 procs_with_keys[target_procid].push_back(local_key_list[i]);
470 rmi.all_to_all(procs_with_keys);
471 return procs_with_keys;
474 template <
typename TargetGraph,
typename SourceGraph,
typename JoinOp>
475 void injective_join(injective_join_index& target,
476 TargetGraph& target_graph,
477 injective_join_index& source,
478 SourceGraph& source_graph,
484 std::pair<size_t, typename SourceGraph::vertex_data_type> > >
487 for (
size_t i = 0; i < source.opposing_join_proc.size(); ++i) {
488 if (source_graph.l_vertex(i).owned()) {
489 procid_t target_proc = source.opposing_join_proc[i];
490 if (target_proc >= 0 && target_proc < rmi.
numprocs()) {
491 source_data[target_proc].push_back(
492 std::make_pair(source.vtx_to_key[i],
493 source_graph.l_vertex(i).data()));
498 rmi.all_to_all(source_data);
501 #pragma omp parallel for
503 for (
size_t p = 0;p < source_data.size(); ++p) {
504 for (
size_t i = 0;i < source_data[p].size(); ++i) {
506 hopscotch_map<size_t, vertex_id_type>::const_iterator iter =
507 target.key_to_vtx.find(source_data[p][i].first);
508 ASSERT_TRUE(iter != target.key_to_vtx.end());
510 typename TargetGraph::local_vertex_type
511 lvtx = target_graph.l_vertex(iter->second);
512 typename TargetGraph::vertex_type vtx(lvtx);
513 joinop(vtx, source_data[p][i].second);
516 target_graph.synchronize();