Updating documents
Describes updating documents by using the replace() or upsert() methods.
Replace
The replace method replaces the Document if it exists, but fails with a DocumentDoesNotExistException otherwise.
In addition, if the CAS value is set (not equal to 0) on the Document it is respected and passed to the server. If the CAS value does not match the current server value, it fails with a CASMismatchException.
JsonObject content = JsonObject.empty().put("name", "Michael");
JsonDocument doc = JsonDocument.create("docId", content);
Observable<JsonDocument> inserted = bucket.replace(doc)
If the Document also has the expiry time set, it will be respected and picked up by the server.
It doesn't matter what type of Document is replaced, it's type is inferred from the method argument and the corresponding Transcoder is used to encode it.
The Document returned as a result is a different one compare to the Document passed in. It references some values like its id and content, but also has the new CAS value set.
The following sample will automatically take the CAS value into account because it is populated from the get() call and respected on the replace() call.
bucket
.get("id")
.map(new Func1<JsonDocument, JsonDocument>() {
@Override
public JsonDocument call(JsonDocument document) {
modifyDocumentSomehow(document);
return document;
}
})
.flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
@Override
public Observable<JsonDocument> call(JsonDocument document) {
return bucket.replace(document);
}
}).subscribe();
Since this operation can fail if there is a CASMismatchException, a common pattern is to retry the complete process until it succeeds:
Observable
.defer(() -> bucket.get("id"))
.map(document -> {
document.content().put("modified", new Date().getTime());
return document;
})
.flatMap(bucket::replace)
.retryWhen(attempts ->
attempts.flatMap(n -> {
if (!(n.getThrowable() instanceof CASMismatchException)) {
return Observable.error(n.getThrowable());
}
return Observable.timer(1, TimeUnit.SECONDS);
})
)
.subscribe();
This code snipped uses defer() to always do a fresh get() (remember that a Subject, which is used in the SDK internally, caches the value and therefore a resubscribe will just return the same value. defer() makes sure to create a new one on every resubscribe). Afterwards, it artificially modifies the document and then tries to store it through a replace() call. If it succeeds all is good, if it fails with an Exception the retryWhen() block is executed. In this block, the code checks if it is a CASMismatchException and if so, executes a timer before retrying. Other errors could be handled in there as well (even with different retry strategies), but in this example other errors are passed along.
Upsert
The upsert method works similar to replace, but it also stores the Document if it does not exist (so there is no DocumentDoesNotExistException thrown.
It also does not use the CAS value to handle concurrent updates, even when set on the document. Use replace instead.
JsonObject content = JsonObject.empty().put("name", "Michael");
JsonDocument doc = JsonDocument.create("docId", content);
Observable<JsonDocument> inserted = bucket.upsert(doc)
If the Document also has the expiry time set, it will be respected and picked up by the server.
It doesn't matter what type of Document is upserted, it's type is inferred from the method argument and the corresponding Transcoder is used to encode it.
The Document returned as a result is a different one compare to the Document passed in. It references some values like its id and content, but also has the CAS value set.
Durability Requirements
If no durability requirements are set on the replace or upsert methods, the operation will succeed when the server acknowledges the document in its managed cache layer. While this is a performant operation, there might be situations where you want to make sure that your document has been persisted and/or replicated so that it survives power outages and other node failures.
Both methods provide overloads to supply such requirements:
Observable<D> replace(D document, PersistTo persistTo);
Observable<D> replace(D document, ReplicateTo replicateTo);
Observable<D> replace(D document, PersistTo persistTo, ReplicateTo replicateTo);
Observable<D> upsert(D document, PersistTo persistTo);
Observable<D> upsert(D document, ReplicateTo replicateTo);
Observable<D> upsert(D document, PersistTo persistTo, ReplicateTo replicateTo);
You can configure either just one or both of the requirements when inserting or upserting. From an application point of view nothing needs to be changed when working with the response, although there is something that need to be kept in mind:
The internal implementation first performs a regular replace or upsert operation and afterwards starts polling the specific affected cluster nodes for the state of the document. If something fails during this operation (and failing the Observable), the original operation might have succeeded nonetheless.
// Insert the document and make sure it is persisted to the master node
bucket.replace(document, PersistTo.MASTER);
// Insert the document and make sure it is replicate to one replica node
bucket.replace(document, ReplicateTo.ONE);
// Insert the documen and make sure it is persisted to one node and replicated to two
bucket.replace(document, PersistTo.ONE, ReplicateTo.TWO);
Batching
Because everything is asynchronous by default, batching replaces or upserts can be achieved with Observable functionality.
A combination of just() and flatMap() is used to store them without blocking:
JsonDocument doc1 = JsonDocument.create("id1", content);
JsonDocument doc2 = JsonDocument.create("id2", content);
JsonDocument doc3 = JsonDocument.create("id3", content);
Observable
.just(doc1, doc2, doc3)
.flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
@Override
public Observable<JsonDocument> call(JsonDocument document) {
return bucket.replace(document);
}
}).subscribe();