Created
April 30, 2015 04:52
-
-
Save alexdorand/2c7ce9dd8531c51234e9 to your computer and use it in GitHub Desktop.
Here is the class with Java Rx problem. You need to create a bucket called "geo" for this to work
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.folion.activity.dao; | |
import com.couchbase.client.core.time.Delay; | |
import com.couchbase.client.java.Bucket; | |
import com.couchbase.client.java.Cluster; | |
import com.couchbase.client.java.CouchbaseCluster; | |
import com.couchbase.client.java.document.RawJsonDocument; | |
import com.couchbase.client.java.error.CASMismatchException; | |
import com.couchbase.client.java.error.DocumentDoesNotExistException; | |
import com.couchbase.client.java.util.retry.RetryBuilder; | |
import com.google.gson.Gson; | |
import rx.Observable; | |
import rx.functions.Func1; | |
import java.util.Arrays; | |
import java.util.concurrent.TimeUnit; | |
public class CouchbaseActivityDao { | |
private static CouchbaseActivityDao instance; | |
private Gson gson; | |
private Cluster cluster; | |
private String bucketName; | |
private Bucket bucket; | |
public CouchbaseActivityDao(String[] clusters, String bucketName) { | |
this.bucketName = bucketName; | |
cluster = CouchbaseCluster.create(Arrays.asList(clusters)); | |
bucket = cluster.openBucket(bucketName, 1, TimeUnit.DAYS); | |
gson = new Gson(); | |
} | |
public static CouchbaseActivityDao getInstance() { | |
if (instance == null) { | |
String[] serverURIs = "http://localhost:8091".split(","); | |
instance = new CouchbaseActivityDao(serverURIs, "geo"); | |
} | |
return instance; | |
} | |
public void casUpdateJson(final String key, final Action action) { | |
Observable | |
.just(key) | |
.flatMap(new Func1<String, Observable<RawJsonDocument>>() { | |
@Override | |
public Observable<RawJsonDocument> call(String id) { | |
System.out.println("get " + id); | |
return bucket.async().get(id, RawJsonDocument.class); | |
} | |
}) | |
.map(new Func1<RawJsonDocument, RawJsonDocument>() { | |
@Override | |
public RawJsonDocument call(RawJsonDocument rawJsonDocument) { | |
System.out.println("update " + rawJsonDocument.content()); | |
// modify your doc here | |
return rawJsonDocument; | |
} | |
}) | |
.defaultIfEmpty(RawJsonDocument.create(key, gson.toJson(action))) | |
.flatMap(new Func1<RawJsonDocument, Observable<?>>() { | |
@Override | |
public Observable<?> call(final RawJsonDocument rawJsonDocument) { | |
System.out.println("here"); | |
return bucket.async().replace(rawJsonDocument) | |
// retry when CAS concurrent modifications quickly | |
.retryWhen(RetryBuilder.anyOf(CASMismatchException.class) | |
.max(10).delay(Delay.fixed(10, TimeUnit.MICROSECONDS)).build()) | |
.onErrorResumeNext(new Func1<Throwable, Observable<RawJsonDocument>>() { | |
@Override | |
public Observable<RawJsonDocument> call(Throwable throwable) { | |
System.out.println("insert"); | |
// if doc does not exist on replace, fall back to insert | |
if (throwable instanceof DocumentDoesNotExistException) { | |
return bucket.async().insert(rawJsonDocument); | |
} | |
// if other error, forward it | |
return Observable.error(throwable); | |
} | |
}); | |
} | |
}) | |
.subscribe(); | |
} | |
public void runTest() { | |
Action action = new Action(); | |
action.setName("action-1"); | |
action.setType("action-type"); | |
CouchbaseActivityDao.getInstance().casUpdateJson("test", action); | |
} | |
public class Action { | |
private String name; | |
private String type; | |
public void setName(String name) { | |
this.name = name; | |
} | |
public void setType(String type) { | |
this.type = type; | |
} | |
public String getName() { | |
return name; | |
} | |
public String getType() { | |
return type; | |
} | |
} | |
public static void main(String arg[]) { | |
CouchbaseActivityDao.getInstance().runTest(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment