RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
println Thread.currentThread()
Observable.from([1, 2, 3, 4])
.flatMap({
println 'in map thread == ' + Thread.currentThread()
def val = it
return Observable.create({ obs ->
Thread.sleep(500)
println 'in Observable within flatMap thread == ' + Thread.currentThread()
obs.onNext(val + ' mapped')
obs.onCompleted()
}).subscribeOn(Schedulers.io())
}
)
.toBlocking()
.subscribe(
{ it -> println "onNext() ${it} ${Thread.currentThread()}" },
{ error -> println "onError() ${error}" },
{ println 'onCompleted' }
)
println '...'
Thread[Thread-133,6,main]
in map thread == Thread[Thread-133,6,main]
in map thread == Thread[Thread-133,6,main]
in map thread == Thread[Thread-133,6,main]
in map thread == Thread[Thread-133,6,main]
in Observable within flatMap thread == Thread[RxCachedThreadScheduler-1,6,main]
in Observable within flatMap thread == Thread[RxCachedThreadScheduler-2,6,main]
in Observable within flatMap thread == Thread[RxCachedThreadScheduler-3,6,main]
in Observable within flatMap thread == Thread[RxCachedThreadScheduler-4,6,main]
onNext() 1 mapped Thread[Thread-133,6,main]
onNext() 2 mapped Thread[Thread-133,6,main]
onNext() 4 mapped Thread[Thread-133,6,main]
onNext() 3 mapped Thread[Thread-133,6,main]
onCompleted
...
Observable.fromCallable
defer
is more flexible because it allows you return any observable (but requires more code)
println Thread.currentThread()
Observable.from([1, 2, 3, 4])
.flatMap({
println 'in map thread == ' + Thread.currentThread()
def val = it
return Observable.fromCallable({
Thread.sleep(500)
println 'in Observable within flatMap thread == ' + Thread.currentThread()
return val + ' mapped'
}).subscribeOn(Schedulers.io())
}
)
//.toBlocking()
.subscribe(
{ it -> println "onNext() ${it} ${Thread.currentThread()}" },
{ error -> println "onError() ${error}" },
{ println 'onCompleted' }
)
println '...'
Observable.fromCallable
import rx.lang.scala.Observable
import scala.concurrent.duration._
object Main {
def main(args: Array[String]): Unit = {
val a = Observable.interval(1000 millis).map(_ => "A")
val d = Observable.interval(3000 millis).map(_ => "D")
val e = Observable.interval(5000 millis).map(_ => "E")
a.merge(d).merge(e)
.take(100)
.subscribe(x => println(x))
}
}
sbt run
it outputs nothing
testScheduler
to block.
def main(args: Array[String]) {
val lock: AnyRef = new AnyRef
val a = Observable.interval(1000 millis).map(_ => "A")
val d = Observable.interval(3000 millis).map(_ => "D")
val e = Observable.interval(5000 millis).map(_ => "E")
a.merge(d).merge(e)
.take(100)
.subscribe(new Subscriber[String]() {
def onCompleted() {
lock synchronized {
lock.notifyAll()
}
}
def onError(throwable: Throwable) {}
def onNext(s: String) = println(s)
})
lock synchronized {
lock.wait()
}
}
}