Replicating data from different databases
Problem
You have two or more different databases with various data respectively collections in each one of this, but you want your data to be collected at one place.
Note: For this solution you need at least Arango 2.0 and you must run the script in every database you want to be collect data from.
Solution
First of all you have to start a server on endpoint:
arangod --server.endpoint tcp://127.0.0.1:8529
Now you have to create two collections and name them data and replicationStatus
db._create("data");
db._create("replicationStatus");
Save the following script in a file named js/common/modules/org/mysync.js
var internal = require("internal");
// maximum number of changes that we can handle
var maxChanges = 1000;
// URL of central node
var transferUrl = "http://127.0.0.1:8599/_api/import?collection=central&type=auto&createCollection=true&complete=true";
var transferOptions = {
method: "POST",
timeout: 60
};
// the collection that keeps the status of what got replicated to central node
var replicationCollection = internal.db.replicationStatus;
// the collection containing all data changes
var changesCollection = internal.db.data;
function keyCompare (l, r) {
if (l.length != r.length) {
return l.length - r.length < 0 ? -1 : 1;
}
// length is equal
for (i = 0; i < l.length; ++i) {
if (l[i] != r[i]) {
return l[i] < r[i] ? -1 : 1;
}
}
return 0;
};
function logger (msg) {
"use strict";
require("console").log("%s", msg);
}
function replicate () {
"use strict";
var key = "status"; // const
var status, newStatus;
try {
// fetch the previous replication state
status = replicationCollection.document(key);
newStatus = { _key: key, lastKey: status.lastKey };
}
catch (err) {
// no previous replication state. start from the beginning
newStatus = { _key: key, lastKey: "0" };
}
// fetch the latest changes (need to reverse them because `last` returns newest changes first)
var changes = changesCollection.last(maxChanges).reverse(), change;
var transfer = [ ];
for (change in changes) {
if (changes.hasOwnProperty(change)) {
var doc = changes[change];
if (keyCompare(doc._key, newStatus.lastKey) <= 0) {
// already handled in a previous replication run
continue;
}
// documents we need to transfer
// if necessary, we could rewrite the documents here, e.g. insert
// extra values, create client-specific keys etc.
transfer.push(doc);
if (keyCompare(doc._key, newStatus.lastKey) > 0) {
// keep track of highest key
newStatus.lastKey = doc._key;
}
}
}
if (transfer.length === 0) {
// nothing to do
logger("nothing to transfer");
return;
}
logger("transferring " + transfer.length + " document(s)");
// now transfer the documents to the remote server
var result = internal.download(transferUrl, JSON.stringify(transfer), transferOptions);
if (result.code >= 200 && result.code <= 202) {
logger("central server accepted the documents: " + JSON.stringify(result));
}
else {
// error
logger("central server did not accept the documents: " + JSON.stringify(result));
throw "replication error";
}
// update the replication state
if (status) {
// need to update the previous replication state
replicationCollection.update(key, newStatus);
}
else {
// need to insert the replication state (1st time)
replicationCollection.save(newStatus);
}
logger("deleting old documents");
// finally remove all elements that we transferred successfully from the changes collection
// no need to keep them
transfer.forEach(function (k) {
changesCollection.remove(k);
});
}
exports.execute = function (param) {
"use strict";
logger("replication wake up");
replicate();
logger("replication shutdown");
};
Afterwards change the URL of the central node in the script to the one you chosen before - e.g. tcp://127.0.0.1:8599
Now register the script as a recurring action:
require("internal").definePeriodic(1, 10, "org/arangodb/mysync", "execute", "");
Note: At this point you can change the time the script will be executed.
Comment
The server started on endpoint will be the central node. It collects changes from the local node by replicating its data. The script will pick up everything that has been changed since the last alteration in your data collection. Every 10 seconds - or the time you chosen - the script will be executed and send the changed data to the central node where it will be imported into a collection named central. After that the transferred data will be removed from the data collection.
If you want to test your script simply add some data to your data collection - e.g.:
for (i = 0; i < 100; ++i) db.data.save({ value: i });
Author: Jan Steemann
Tags: #database #collection