-
-
Save jongwook/8236438 to your computer and use it in GitHub Desktop.
import static com.google.common.collect.Lists.transform; | |
import static java.nio.charset.StandardCharsets.UTF_8; | |
import static rx.Observable.from; | |
import static rx.Observable.merge; | |
public Observable<String> watch(String path) { | |
return Observable.create(observer -> { | |
AtomicBoolean unsubscribed = new AtomicBoolean(false); | |
BackgroundCallback callback = (client, event) -> { | |
switch (Code.get(event.getResultCode())) { | |
case OK: | |
byte[] bytes = event.getData(); | |
if (bytes != null) { | |
observer.onNext(new String(bytes, UTF_8)); | |
} | |
break; | |
case NONODE: | |
observer.onError(new NoNodeException(path)); | |
} | |
}; | |
Consumer<Watcher> watch = watcher -> { | |
try { | |
curator.getData().usingWatcher(watcher).inBackground(callback).forPath(path); | |
} catch (Exception e) { | |
observer.onError(e); | |
} | |
}; | |
watch.accept(new Watcher() { | |
@Override | |
public void process(WatchedEvent event) { | |
if (unsubscribed.get()) return; | |
switch (event.getType()) { | |
case NodeCreated: | |
case NodeDataChanged: | |
watch.accept(this); | |
case NodeDeleted: | |
observer.onCompleted(); | |
} | |
} | |
}); | |
return Subscriptions.create(() -> unsubscribed.set(true)); | |
}); | |
} | |
public Observable<List<String>> watchChildren(String path) { | |
return Observable.create(observer -> { | |
AtomicBoolean unsubscribed = new AtomicBoolean(false); | |
BackgroundCallback callback = (client, event) -> { | |
switch (Code.get(event.getResultCode()) { | |
case OK: | |
List<String> children = event.getChildren(); | |
observer.onNext(children != null ? children : Collections.emptyList()); | |
break; | |
case NONODE: | |
observer.onError(new NoNodeException(path)); | |
} | |
}; | |
Consumer<Watcher> watch = watcher -> { | |
try { | |
curator.getChildren().usingWatcher(watcher).inBackground(callback).forPath(path); | |
} catch (Exception e) { | |
observer.onError(e); | |
} | |
}; | |
watch.accept(new Watcher() { | |
@Override | |
public void process(WatchedEvent event) { | |
if (unsubscribed.get()) return; | |
switch (event.getType()) { | |
case NodeCreated: | |
case NodeChildrenChanged: | |
watch.accept(this); | |
break; | |
case NodeDeleted: | |
observer.onCompleted(); | |
} | |
} | |
}); | |
return Subscriptions.create(() -> unsubscribed.set(true)); | |
}); | |
} | |
public Observable<List<String>> getChildren(String path) { | |
return Observable.create(observer -> { | |
try { | |
curator.getChildren().inBackground((client, event) -> { | |
Code code = Code.get(event.getResultCode()); | |
switch (code) { | |
case OK: | |
observer.onNext(event.getChildren()); | |
observer.onCompleted(); | |
break; | |
default: | |
observer.onError(KeeperException.create(code)); | |
} | |
}).forPath(path); | |
} catch (Exception e) { | |
observer.onError(e); | |
} | |
return Subscriptions.empty(); | |
}); | |
} | |
public Observable<Void> delete(String path) { | |
return Observable.create(observer -> { | |
try { | |
curator.delete().inBackground((client, event) -> { | |
Code code = Code.get(event.getResultCode()); | |
switch (code) { | |
case OK: | |
case NONODE: | |
observer.onCompleted(); | |
break; | |
default: | |
observer.onError(KeeperException.create(code)); | |
} | |
}).forPath(path); | |
} catch (Exception e) { | |
observer.onError(e); | |
} | |
return Subscriptions.empty(); | |
}); | |
} | |
public Observable<Void> deleteAll(String path) { | |
return getChildren(path).flatMap(children -> { | |
List<Observable<Void>> requests = transform(children, child -> | |
deleteAll(path + "/" + child) | |
); | |
return merge(from(requests)).lastOrDefault(null); | |
}).flatMap(done -> delete(path)); | |
} |
A better implementation of deleteAll
:
public Observable<Void> deleteAll(String path) {
return getChildren(path).flatMap(child -> deleteAll(path + "/" + child))
.lastOrDefault(null).flatMap(dummy -> delete(path));
}
I'm not sure if I understand curator
correctly. In zookeeper, there is only one EventThread to handle the events in one zookeeper instance. I just glanced the codes in curator. Looks there is only one background thread to handle the background operation, too. That's perfect for RxJava. If not, there may be some concurrency issue.
Thanks for the suggestion!
I updated deleteAll()
using flatMap
as you suggested; The Subscription
was not cancellable anyways since Curator does not provide such method.
Also I wanted to keep the return type of getChildren()
to be Observable<List<String>>
to conform with watchChildren()
which needs to produce a chunked list of children each time a new watched message arrives. So it makes a bit involved implementation using merge(from(...
.
I didn't know that I shouldn't call onCompleted
in unsubscribe
. Thanks for the information!
L50, L96:
Subscription.unsubscribe
should not callonCompleted
. WhenSubscription.unsubscribe
is called, it means the caller is aware that the Observable will be canceled and expects that the Observer will receive nothing later. However, RxJava hasSafeObserver
to prevent from callingonCompleted
inSubscription.unsubscribe
,onCompleted
will not be sent to your Observer actually.