Skip to content

Instantly share code, notes, and snippets.

@andypetrella
Last active August 29, 2015 14:08
Show Gist options
  • Save andypetrella/732eed3ec00b4c13dfee to your computer and use it in GitHub Desktop.
Save andypetrella/732eed3ec00b4c13dfee to your computer and use it in GitHub Desktop.
Zip won't work? → "Honi soit qui mal y pense"

This gist is extracting the problem I'm facing from the Spark-Notebook (see here) to deal with a dynamic form that generates new Spark SQL.

It is in a transient status, meaning that it still contain a lot of legacy constructions that I'd like to get rid:

  • Connection
  • Observable
  • Observer But one thing at a time :-D.

The result of this thing will visually be like the image below (or above :-D)

// don't peer review the style, I'm doing quick hacks :-) (I know it's not a valid argument)
val ws:Widget = parts.map(_._2.widget).reduce((x:Widget, y:Widget) => x ++ y)
import rx.lang.scala.{Observable => RxObservable, Observer => RxObserver, _}
val mergedObservables:RxObservable[Seq[(String, Any)]] = {
val l:List[RxObservable[(String, Any)]] = parts.map{ p =>
// widget has a dataConnection:Connection
// Connection is a transient legacy crap that wraps a custom observer and a custom observable
// A custom Observable... wraps a rx.Observable (or more precisely, its mirror in the Scala world)
val ob = p._2.widget.currentData.observable.inner
ob.subscribe(x => logInfo(x.toString)) //// ←←←←← this SH*T IS PRINTED :-/
val o:RxObservable[(String, Any)] = ob.map((d:Any) => (p._2.name, d))
o
}
// just a try in case Observable.from(l) wouldn't stop → hence zip won't start, because it needs to know N
l.head.zip(l.tail.head).map(v => v._1 :: v._2 :: Nil)
// is from not stopping? ↑↑↑
//RxObservable.zip(RxObservable.from(l))
}
// Connection, okay → legay
var sql:Connection[String] = Connection.just("")
// ↓↓↓ None of these SH*T WILL BE PRINTED :'(
mergedObservables.subscribe(
onNext = (v:Seq[(String, Any)]) => {
logInfo("============ssssss============")
val values = v.toMap
// okay, ignore the string building below, not interesting for the problem
val s = parts.map { case (before, input) =>
val vv = values(input.name)
before + vv.toString
}.mkString(" ")
logInfo(s)
sql <-- Connection.just(s) // nah, fancy op to register the sql.observer to the observable of s :-/
},
onError = (t:Throwable) => logError("============ssssss============\n"+"Ouch in merged", t),
onCompleted = () => logWarn("============ssssss============\n"+" Merged completed ! ")
)
@headinthebox
Copy link

l.head.zip(l.tail.head).map(v => v._1 :: v._2 :: Nil)

You are not subscribing to this, so nothing will happen.

@headinthebox
Copy link

import rx.lang.scala.Observable

object MainScala {

  def main(args: Array[String]): Unit = {

    val xs = Observable.just(1,2,3,4)
    val ys = Observable.just(5,6,7,8)
    val zs = xs.zip(ys).doOnEach(xy => println(xy))

    //zs.subscribe(xy => println(xy))
    readLine()
  }
}

@andypetrella
Copy link
Author

Thanks a lot @headinthebox!
Specially for the tip regarding doOnEach!!!

About the subscription, it's not enough to mergedObservables which is the result of l.head.zip(l.tail.head).map(v => v._1 :: v._2 :: Nil). Or I miss something ;-D

@andypetrella
Copy link
Author

Argl → got it!

zip will fire only when ALL streams will fire! And I want a fire for each embed fire, so I guess I'll need to retry merge!

@headinthebox
Copy link

l.head.zip(l.tail.head).map(v => v._1 :: v._2 :: Nil)

The problem is you never subscribe to this. So nothing will happen.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment