Last active
August 21, 2017 19:52
-
-
Save shakil807g/86f5cab64f0a48c9b943cc5411558d82 to your computer and use it in GitHub Desktop.
Rxjava 2 combineLatest example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* Rxjava combineLastest Example */ | |
disposable.add(RxFirebase.observeChildEvent(mDatabase.child(AppConstant.USERS) | |
.child(Utils.fireUserId(getUser().getUserId())).child(AppConstant.GROUPS)) | |
.filter(event -> event.getEventType() == RxFirebaseChildEvent.EventType.ADDED) | |
.flatMap(event -> { | |
Log.d(TAG, "setFirebaseListener: "+event.getValue()); | |
String groupID = event.getValue().getKey(); | |
String otherUserID = Utils.findOtherUserIdFromGroupID(groupID,getUser().getUserId()); | |
return Flowable.combineLatest( | |
RxFirebase.observeChildEvent(mDatabase.child(AppConstant.MESSAGES) // 1 | |
.child(groupID).orderByKey().limitToLast(1)) | |
.filter(dataSnap -> dataSnap.getEventType() == RxFirebaseChildEvent.EventType.ADDED) | |
.map(dataChildEvent -> Message.parseData(dataChildEvent.getValue())) | |
.onErrorReturn(throwable -> new Message()) | |
.filter(message -> !Utils.isMessageEmpty(message)) | |
.doOnNext(message -> Log.d(TAG, "message change : ")), | |
RxFirebase.observeValueEvent(mDatabase.child(AppConstant.USERS) | |
.child(Utils.fireUserId(otherUserID))) | |
.map(Member::dataSnapshotParse) | |
.distinctUntilChanged((member, member2) -> | |
member.getUser_id().equals(member2.getUser_id()) && | |
member.getUsername().equals(member2.getUsername()) && | |
member.getFull_name().equals(member2.getFull_name()) && | |
member.getProfile_picture().equals(member2.getProfile_picture()) && | |
member.is_verified() == member2.is_verified() && | |
member.getUser_type().equals(member2.getUser_type()) && | |
member.getProfession_text().equals(member2.getProfession_text())) | |
.onErrorReturn(throwable -> new Member()) | |
.filter(member -> !Utils.isMemberisEmpty(member)) | |
.doOnNext(message -> Log.d(TAG, "user change : ")), | |
(message, otheruser) -> { | |
Log.d(TAG, "setFirebaseListener: "+message); | |
Log.d(TAG, "setFirebaseListener: "+otheruser); | |
InboxItem inboxItem = new InboxItem(message.getGroup_id(), | |
message.getTimeLong(),message.getMessage(),message.getType(), | |
message.getUser_id(),otheruser.getFull_name(),otheruser.getUsername(), | |
otheruser.getProfession_text(),otheruser.is_verified(), | |
otheruser.getProfile_picture(),otheruser.getUser_type(),otheruser.getUser_id()); | |
return inboxItem; | |
}); | |
}) | |
.subscribeOn(AndroidSchedulers.mainThread()) | |
.subscribe(inboxItem -> { | |
realm.executeTransaction(realm1 -> { | |
realm1.insertOrUpdate(inboxItem); | |
}); | |
},Throwable::printStackTrace)); | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment