Last active
August 29, 2015 14:18
-
-
Save arkadijs/18de74c3b8d46ea9865b to your computer and use it in GitHub Desktop.
Instagram Tags media feed based on Observable / ReactiveX in Scala, Groovy, and Java. https://bitbucket.org/arkadi/instarx
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Grapes([ | |
@Grab('io.reactivex:rxgroovy:1.0.0'), | |
@Grab('org.codehaus.gpars:gpars:1.2.1'), | |
@Grab('com.github.groovy-wslite:groovy-wslite:1.1.0')]) | |
import groovy.transform.EqualsAndHashCode | |
import groovy.transform.ToString | |
import rx.Observable | |
import rx.Subscriber | |
import rx.schedulers.Schedulers | |
//import java.util.concurrent.ForkJoinPool | |
import java.util.concurrent.Future | |
import java.util.concurrent.TimeUnit | |
import groovy.json.JsonBuilder | |
import groovy.json.JsonSlurper | |
import jsr166y.ForkJoinPool | |
import static groovyx.gpars.GParsPool.executeAsync | |
import static groovyx.gpars.GParsPool.withExistingPool | |
import static groovyx.gpars.GParsPool.withPool | |
import wslite.rest.RESTClient | |
String participant = "change-me" | |
String clientId = "" // obtain your own at https://instagram.com/developer | |
String api = "https://api.instagram.com/v1" | |
String ui = "http://localhost:5050/ui" | |
int interval = 5 // seconds | |
int count = 10 // count of tagged media to return | |
def tags = "love instagood me tbt cute follow followme photooftheday happy tagforlikes beautiful girl like selfie picoftheday summer fun smile friends like4like instadaily fashion igers instalike food".split(" ") | |
int limit = 3 // number of tags per observable stream | |
def instaRest = new RESTClient(api) | |
def uiRest = new RESTClient(ui) | |
def fjp = new ForkJoinPool(limit) | |
def scheduler = Schedulers.newThread() | |
@EqualsAndHashCode @ToString(includeNames=true) | |
class Location { Double latitude; Double longitude; Integer id; String name; } | |
@EqualsAndHashCode(includes = "url") @ToString | |
class Media { String tag; String url; Location location; String participant; } | |
def unjson(json) { | |
json.data.collect { post -> | |
def l = post.location | |
def location = l ? new Location(latitude: l.latitude, longitude: l.longitude, id: l.id, name: l.name) : null | |
new Media(url: post.images.thumbnail.url, location: location) | |
} | |
} | |
def media = { String tag -> | |
def json = instaRest.get(path: "/tags/$tag/media/recent", query: [ client_id: clientId, count: count ]).json | |
def media = unjson(json) | |
media.each { it.tag = tag } | |
media | |
} | |
println 'Starting...' | |
// single-threaded HTTP fetch | |
// Observable<Observable<List<Media>>> => Observable<List<Media>> | |
def instagram /*Observable<List<Media>>*/ = Observable.merge(Observable.interval(interval, TimeUnit.SECONDS).map { _seq -> | |
Observable.from(tags.take(limit)).flatMap { tag -> // Observable<List<Media>> | |
try { | |
Observable.just(media(tag)) // .from() | |
} catch (Exception e) { | |
e.printStackTrace() | |
Observable.empty() | |
} | |
} | |
}) | |
// multi-threaded HTTP fetch | |
def instagram2 /*Observable<List<Media>>*/ = Observable.interval(interval, TimeUnit.SECONDS).flatMap { _seq -> | |
Observable.merge(withExistingPool(fjp) { _pool -> // List<Observable<List<Media>>> => Observable<List<Media>> | |
executeAsync( /*List<Closure<List<Media>>>*/ tags.take(limit).collect { tag -> { -> | |
try { | |
media(tag) | |
} catch (Exception e) { | |
e.printStackTrace() | |
[] | |
} | |
}}).collect { future -> Observable.from(future) } // List<Future<List<Media>>> => List<Observable<List<Media>>> | |
}) | |
} | |
// flatten | |
def instagram3 = Observable.create { Subscriber<Media> observer -> | |
instagram2.subscribe( | |
{ images -> images.each { image -> observer.onNext(image) } }, | |
{ ex -> observer.onError(ex) }, | |
{ -> observer.onCompleted() } | |
) | |
} | |
//instagram3.subscribe { images -> println(images) } | |
// de-duplicate | |
long start = System.currentTimeMillis() | |
def limited = instagram3.takeWhile { _ -> (System.currentTimeMillis() - start) < 20000 } .replay() | |
def threaded = limited.observeOn(scheduler) | |
threaded.subscribe( | |
{ image -> println(image) }, | |
{ ex -> println("===== error: " + ex) }, | |
{ -> println("===== completed"); Thread.sleep(1000); System.exit(0) } | |
) | |
threaded.count().subscribe { sz -> println("===== unfiltered stream size is $sz") } | |
threaded.distinct().count().subscribe { sz -> println("===== deduplicated stream size is $sz") } | |
limited.connect() | |
// send to UI | |
threaded.subscribe { image -> | |
image.participant = participant | |
uiRest.post() { | |
type "application/json" | |
charset "UTF-8" | |
text new JsonBuilder(image).toString() | |
} | |
} | |
Thread.sleep(100000) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package instarx; | |
import rx.Observable; | |
import rx.Scheduler; | |
import rx.Subscriber; | |
import rx.observables.ConnectableObservable; | |
import rx.schedulers.Schedulers; | |
import java.util.concurrent.Future; | |
import java.util.concurrent.TimeUnit; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.stream.Collectors; | |
import java.util.stream.Stream; | |
import com.mashape.unirest.http.HttpResponse; | |
import com.mashape.unirest.http.JsonNode; | |
import com.mashape.unirest.http.Unirest; | |
import com.mashape.unirest.http.exceptions.UnirestException; | |
import org.json.JSONArray; | |
import org.json.JSONObject; | |
public class JavaMain { | |
String participant = "change-me"; | |
String clientId = ""; // obtain your own at https://instagram.com/developer | |
String api = "https://api.instagram.com/v1"; | |
String ui = "http://localhost:5050/ui"; | |
int interval = 5; // seconds | |
int count = 10; // count of tagged media to return | |
String[] tags = "invent your own".split(" "); | |
Scheduler scheduler = Schedulers.newThread(); | |
public static void main(String[] args) { | |
new JavaMain().run(); | |
} | |
// http://unirest.io/java.html | |
List<Media> media(String tag) throws UnirestException { | |
HttpResponse<JsonNode> resp = Unirest.get(String.format("%s/tags/%s/media/recent", api, tag)) | |
.queryString("client_id", clientId) | |
.queryString("count", count) | |
.asJson(); | |
return unjson(resp.getBody().getObject()).stream().map(image -> image.setTag(tag)).collect(Collectors.toList()); | |
} | |
void run() { | |
// step one: single-threaded HTTP fetch | |
Observable<Long> ticker = Observable.interval(interval, TimeUnit.SECONDS); | |
Observable<String> obstags = Observable.from(tags); | |
Observable<Observable<List<Media>>> nested = ticker.map(_seq -> | |
/*Observable<List<Media>>*/ obstags.flatMap(tag -> { | |
try { | |
return Observable.just(media(tag)); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
return Observable.empty(); | |
} | |
})); | |
Observable<List<Media>> list = Observable.merge(nested); | |
list.subscribe(images -> images.forEach(image -> System.out.println("list: " + image))); | |
// step two: flatten | |
Observable<Media> instagram = Observable.create(/*Subscriber<Media>*/ observer -> { | |
list.subscribe( | |
images -> { | |
if (!observer.isUnsubscribed()) images.forEach(image -> observer.onNext(image)); | |
}, | |
observer::onError, | |
observer::onCompleted | |
); | |
}); | |
// step three: send to UI | |
instagram.subscribe(image -> { | |
try { | |
Unirest.post(ui).body(new JSONObject(image.setParticipant(participant)).toString()).asBinary(); | |
} catch (UnirestException e) { | |
e.printStackTrace(); | |
} | |
}); | |
// step four: deduplicate | |
long start = System.currentTimeMillis(); | |
ConnectableObservable<Media> limited = instagram.takeWhile( _image -> (System.currentTimeMillis() - start) < 20000) .replay(); | |
Observable<Media> threaded = limited.observeOn(scheduler); | |
threaded.subscribe( | |
System.out::println, | |
ex -> System.out.println("===== error: " + ex), | |
() -> { | |
System.out.println("===== completed"); | |
sleep(1000); | |
System.exit(0); | |
} | |
); | |
threaded.count().forEach(sz -> System.out.println("===== unfiltered stream size is " + sz)); | |
threaded.distinct(image -> image.getUrl()).count().forEach(sz -> System.out.println("===== deduplicated stream size is " + sz)); | |
limited.connect(); | |
sleep(100000); | |
} | |
void sleep(long msec) { | |
try { Thread.sleep(msec); } catch (InterruptedException e) { throw new RuntimeException(e); } | |
} | |
List<Media> unjson(JSONObject json) { | |
JSONArray posts = json.getJSONArray("data"); | |
List<Media> media = new ArrayList<>(posts.length()); | |
for (int i = 0; i < posts.length(); ++i) { | |
JSONObject post = posts.getJSONObject(i); | |
JSONObject l = post.optJSONObject("location"); | |
Location location = l == null ? null : new Location( | |
l.optDouble("latitude"), l.optDouble("longitude"), l.has("id") ? l.getInt("id") : null, l.has("name") ? l.optString("name") : null); | |
media.add(new Media(post.getJSONObject("images").getJSONObject("thumbnail").getString("url"), location)); | |
} | |
return media; | |
} | |
public class Location { | |
Double latitude; Double longitude; Integer id; String name; | |
public Location(Double latitude, Double longitude, Integer id, String name) { | |
this.latitude = latitude; | |
this.longitude = longitude; | |
this.id = id; | |
this.name = name; | |
} | |
@Override | |
public String toString() { | |
return "Location{" + | |
"latitude=" + latitude + | |
", longitude=" + longitude + | |
", id=" + id + | |
", name='" + name + '\'' + | |
'}'; | |
} | |
public Double getLatitude() { | |
return latitude; | |
} | |
public Double getLongitude() { | |
return longitude; | |
} | |
public Integer getId() { | |
return id; | |
} | |
public String getName() { | |
return name; | |
} | |
} | |
public class Media { | |
String tag; String url; Location location; String participant; | |
public Media(String url, Location location) { | |
this.url = url; | |
this.location = location; | |
} | |
public Media setTag(String tag) { | |
this.tag = tag; | |
return this; | |
} | |
public Media setParticipant(String participant) { | |
this.participant = participant; | |
return this; | |
} | |
@Override | |
public String toString() { | |
return "Media{" + | |
"tag='" + tag + '\'' + | |
", url='" + url + '\'' + | |
", location=" + location + | |
", participant='" + participant + '\'' + | |
'}'; | |
} | |
public String getTag() { | |
return tag; | |
} | |
public String getUrl() { | |
return url; | |
} | |
public Location getLocation() { | |
return location; | |
} | |
public String getParticipant() { | |
return participant; | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package instarx | |
import scala.collection.JavaConversions._ | |
import scala.util.control.Breaks._ | |
import scala.concurrent.ExecutionContext.global | |
import scala.concurrent.{Await, Future} | |
import scala.concurrent.duration._ | |
import org.json4s._ | |
import org.json4s.jackson.JsonMethods._ | |
import org.json4s.jackson.Serialization | |
import org.json4s.jackson.Serialization.{read, write} | |
import rx.lang.scala.{Observable, Observer, Subscriber, Subscription} | |
import rx.lang.scala.subjects.ReplaySubject | |
import rx.lang.scala.schedulers.NewThreadScheduler | |
import dispatch._, Defaults._ | |
object Main { | |
val log = org.slf4j.LoggerFactory.getLogger(getClass) | |
val participant = Some("change-me") | |
val clientId = "" // obtain your own at https://instagram.com/developer | |
val api = "https://api.instagram.com/v1" | |
val ui = "http://localhost:5050/ui" | |
val interval = 5.seconds | |
val count = 10 // count of tagged media to return | |
val tags = "love instagood me tbt cute follow followme photooftheday happy tagforlikes beautiful girl like selfie picoftheday summer fun smile friends like4like instadaily fashion igers instalike food".split(" ").toSeq | |
val limit = 3 // number of tags per observable stream | |
val scheduler = NewThreadScheduler() | |
case class Location(latitude: Option[Double], longitude: Option[Double], id: Option[BigInt], name: Option[String]) | |
case class Media(tag: String, url: String, location: Option[Location], participant: Option[String] = None) | |
// Parse Instagram media response to extract image URL and location (if any) | |
private def unjson(text: String)(implicit tag: String): Seq[Media] = { | |
implicit val formats = DefaultFormats | |
val json = parse(text) | |
val media/*: Seq[Media]*/ = for { | |
JArray(posts) <- json \ "data" | |
post <- posts | |
JString(url) <- post \ "images" \ "thumbnail" \ "url" | |
} yield Media(tag, url, (post \ "location").toOption.map(_.extract[Location])) | |
media | |
} | |
/* The life of the stream starts as a framework provided stream of timer events created | |
by Observable.interval(). For every Instagram Tag a HTTP call is made to gather the list | |
of Media URL-s. The Future returned by HTTP client is converted by Observable.from() into | |
Observable. Then Observables are flatten()-ed to un-nest them into Observable[Media]. | |
*/ | |
def main(args: Array[String]) { | |
// Instagram limit is 5000 API requests per hour per client id or access token | |
val instagram/*: Observable[Seq[Media]]*/ = Observable.interval(interval).map { _ => | |
Observable.from(tags.take(limit)).flatMap { implicit tag => | |
val req = url(s"$api/tags/$tag/media/recent?client_id=$clientId&count=$count") | |
val future/*: Future[Seq[Media]]*/ = Http(req OK as.String).map { resp => | |
unjson(resp) | |
} .recover { case e: Exception => Nil } | |
Observable.from(future) | |
} | |
} /*Observable[Observable[Seq[Media]]]*/ .flatten /*=> Observable[Seq[Media]]*/ | |
//instagram.subscribe(images => println(images)) | |
// How to get Observable[Media]]? | |
// Manual solution using (Replay)Subject which is both a Subscription and Observable: | |
// the events are bridged from `instagram: Observable[Seq[Media]]` - for each Media in | |
// the received sequence Subject.onNext() is called. | |
val instagram2 = Observable((subscriber: Subscriber[Media]) => { | |
val rsubj = ReplaySubject[Media] | |
instagram.subscribe(images => images.foreach(rsubj.onNext), ex => rsubj.onError(ex), () => rsubj.onCompleted()) | |
rsubj.subscribe(subscriber) | |
}) | |
val instagram3 = Observable.create((observer: Observer[Media]) => { | |
val rsubj = ReplaySubject[Media] | |
instagram.subscribe(images => images.foreach(rsubj.onNext), ex => rsubj.onError(ex), () => rsubj.onCompleted()) | |
rsubj.subscribe(observer) | |
}) | |
// What if there is no Observable to subscribe to? How to build an Observable from scratch? | |
// https://github.com/ReactiveX/RxScala/blob/0.x/examples/src/main/scala/SyncObservable.scala | |
// Here instagram.subscribe() - a subscription to Observable is used, but it could be anything | |
// else: Future.onComplete(), Actor body, an imperative piece of code that decides the event | |
// must be pushed to subscriber, etc. | |
// Follow the protocol: | |
// (1) check for Subscriber.isUnsubscribed, (2) call onNext to push the data, (3) finish the | |
// stream with onCompleted, (4) signal the error _and_ finish the stream with onError. | |
val instagram4 = Observable((subscriber: Subscriber[Media]) => { | |
var s: Subscription = null | |
s = instagram.subscribe( | |
images => if (!subscriber.isUnsubscribed) images.foreach(subscriber.onNext) else if (s != null) s.unsubscribe(), | |
ex => subscriber.onError(ex), | |
() => subscriber.onCompleted()) | |
}) | |
// A simpler solution by using nested Observables | |
val instagram5/*: Observable[Media]*/ = Observable.interval(interval).map { _ => | |
Observable.from(tags.drop(limit).take(limit)).flatMap { implicit tag => | |
val req = url(s"$api/tags/$tag/media/recent?client_id=$clientId&count=$count") | |
val future/*: Future[Observable[Media]]*/ = Http(req OK as.String).map { resp => | |
Observable.from(unjson(resp)) // notice the difference: Seq[Media] => Observable[Media] | |
} .recover { case e: Exception => Observable.empty } | |
Observable.from(future) | |
} | |
} /*Observable[Observable[Observable[Media]]]*/ .flatten.flatten /*=> Observable[Media]*/ | |
// Instagram's media/recent query may return images already pulled in previous run. | |
// Apply Observable.distinct() to filter out duplicates. | |
val start = System.currentTimeMillis | |
val limited = instagram5.takeWhile(_ => (System.currentTimeMillis - start) < 20000).replay | |
val threaded = limited.observeOn(scheduler) // introduce some concurrency | |
threaded.subscribe(images => println(images), ex => println("===== error: " + ex), () => { println("===== completed"); Thread.sleep(1000); System.exit(0) }) | |
threaded.size.foreach(sz => println(s"===== unfiltered stream size is $sz")) | |
threaded.distinct(_.url).size.foreach(sz => println(s"===== deduplicated stream size is $sz")) | |
limited.connect | |
// smooth the delivery of images (advanced task) - create Smooth (Average) operator, like | |
// Sample and Debounce, but (1) no event loss and (2) internal adaptive trigger that track | |
// incoming rate and smoothly adapts outgoing rate to keep it steady | |
// 123........45......6790123.......4 => 1...2...3...4..5.....6.7.8.9.0.1.2.3.4 | |
// Send media URL and location to UI | |
// {"tag":"tbt","url":"https://scontent.cdninstagram.com/....jpg","location":{"latitude":51.504976275,"longitude":-0.087847965,"id":225481160,"name":"The Shard London"},"participant":"change-me"} | |
threaded.subscribe { image => | |
implicit val formats = Serialization.formats(NoTypeHints) | |
val req = url(ui).setContentType("application/json", "UTF-8").setBody(write(image.copy(participant = participant))) | |
Await.ready(Http(req.POST), 2.seconds) | |
} | |
Thread.sleep(10000) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment