Bulk operations
Increase performance by performing multiple operations together.
Bulk operations are operations that are scheduled in bulk to the network, saving on bandwidth and latency. Rather than performing and waiting for one operation at a time, you can schedule multiple operations and then wait for them all to complete.
There are two ways to perform bulk operations. One is to schedule operations one at a time and the other is to schedule them all at once. Both approaches have the same performance gains and differ only in respect to behavior in error situations: In the latter none of the operations are scheduled if any operation in the list could not be scheduled.
The following example schedules operations one at a time and waits for them:
lcb_store_cmd_t scmd = { 0 };
const lcb_store_cmd_t *cmdlist = &scmd;
unsigned ii;
lcb_error_t err;
scmd.v.v0.operation = LCB_SET;
for (ii = 0; ii < 100; ++ii) {
char kbuf[4096];
char vbuf[4096];
sprintf(kbuf, "Key_%u", ii);
sprintf(vbuf, "Value_%u", ii);
scmd.v.v0.key = kbuf;
scmd.v.v0.nkey = strlen(kbuf);
scmd.v.v0.bytes = vbuf;
scmd.v.v0.nbytes = strlen(vbuf);
err = lcb_store(instance, NULL, 1, &cmdlist);
if (err != LCB_SUCCESS) {
fprintf(stderr, "Couldn't schedule storage for %s! %s\n", kbuf, lcb_sterror(NULL, err));
}
}
lcb_wait(instance);
lcb_store_cmd_t *cmds;
const lcb_store_cmd_t **cmdlist;
unsigned ii;
lcb_error_t err;
const int ncommands = 100;
cmds = calloc(ncommands, sizeof(*cmds));
cmdlist = calloc(ncommands, sizeof(*cmdlist));
for (ii = 0; ii < ncommands; ++ii) {
char kbuf[4096], vbuf[4096];
lcb_store_cmd_t *cmd = cmds + ii;
sprintf(kbuf, "Key_%u", ii);
sprintf(vbuf, "Value_%u", ii);
cmd.v.v0.operation = LCB_SET;
cmd.v.v0.key = strdup(kbuf); // need to free this later.
cmd.v.v0.nkey = strlen(kbuf);
cmd.v.v0.bytes = strdup(vbuf);
cmd.v.v0.nbytes = strlen(vbuf);
cmdlist[ii] = cmd;
}
err = lcb_store(instance, NULL, ncommands, cmdlist);
if (err == LCB_SUCCESS) {
lcb_wait(instance);
}
for (ii = 0; ii < ncommands; ++ii) {
free(cmds[ii].v.v0.key);
free(cmds[ii].v.v0.bytes);
}
In general, the recommended method is to use the first approach (that is, schedule individual commands and then wait for them) due to its simplicity and ease of memory management.
Callbacks for bulk operations are invoked once per each item scheduled; thus if you schedule a hundred items in a bulk operation you will receive 100 invocations of the installed callback.
struct CouchbaseValue {
string data;
uint64_t cas;
lcb_error_t err;
bool success() const { return err == LCB_SUCCESS; }
CouchbaseValue(const lcb_get_resp_t *resp, lcb_error_t rc) {
err = rc;
if (!success()) {
return;
}
data.assign((const char*)resp->v.v0.bytes, resp->v.v0.nbytes);
cas = resp->v.v0.cas;
}
};
class MultiGet {
public:
map<string,CouchbaseValue> items;
void handleResponse(const lcb_get_resp_t *resp, lcb_error_t err) {
// Key is always provided;
string key((const char *)resp->v.v0.key, resp->v.v0.nkey);
items[key] = CouchbaseValue(resp, err);
}
};
extern "C" {
static void gotItem(lcb_t, const void *cookie, lcb_error_t err, const lcb_get_resp_t *resp)
{
MultiGet *mg = const_cast<MultiGet*>(reinterpret_cast<const MultiGet*>(cookie));
mg->handleResponse(resp, err);
}
} // extern "C"
int main(void)
{
vector<int> orderNums;
// Fill it with garbage data
for (unsigned ii = 0; ii < 100; ii++) {
orderNums.push_back(ii);
}
lcb_t instance;
lcb_create(&instance, NULL);
lcb_connect(instance);
lcb_wait(instance);
MultiGet mg;
for (unsigned ii = 0; ii < orderNums.size(); ii++) {
char kbuf[32];
sprintf(kbuf, "O:%u", orderNums[ii]);
lcb_get_cmd_t cmd;
memset(&cmd, 0, sizeof cmd);
const lcb_get_cmd_t *cmdlist = &cmd;
cmd.v.v0.key = kbuf;
cmd.v.v0.nkey = strlen(kbuf);
lcb_get(instance, &mg, 1, &cmdlist);
}
lcb_wait(instance);
// Assuming all goes well (error code left out for brevity),
// mg.items should contain all the order information (or indicate they don't exist)
lcb_destroy(instance);
return 0;
}