RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
testScheduler
to block.
def main(args: Array[String]) {
val lock: AnyRef = new AnyRef
val a = Observable.interval(1000 millis).map(_ => "A")
val d = Observable.interval(3000 millis).map(_ => "D")
val e = Observable.interval(5000 millis).map(_ => "E")
a.merge(d).merge(e)
.take(100)
.subscribe(new Subscriber[String]() {
def onCompleted() {
lock synchronized {
lock.notifyAll()
}
}
def onError(throwable: Throwable) {}
def onNext(s: String) = println(s)
})
lock synchronized {
lock.wait()
}
}
}
@mlatu Why do you need map+filter? Just filter can already do the filtering and possible data manipulations (just nothing persistent).
flatMap is for one to many tranformations, map for one to one.
About preserving order, do you really need that in a reactive system? concatMap also evaluate everything serial i think, flatMap has the possibility to go concurrent if you need it.
Observable.range(10000000)./* map or flatmap */.subscribe(() -> endMeasure())
o.flatMap(x -> Observalbe.just(f(x)).filter(y -> y != null) )
map
is still cleaner.
lock
and how is it working here?