RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Hi guys!
I’ve faced one strange problem.
private static Observable<List<Repository>> repositories(GitHubApi api, String user) {
return api.getRepositories(new UserQuery(user), Sort.UPDATED, Order.ASC)
.map(Result::response)
.map(Response::body)
.map(RepositoriesResponse::getItems);
}
@Override public Observable<List<RepositoryDto>> getUsersRepositories(String user) {
return repositories(mApi, user)
.doOnNext(DataRepositoryImpl::saveToCache) // WTF???
.flatMap(Observable::from)
.map(DomainDataMapper::toRepositoryDto)
.toList();
}
This code works well. I can obtain list of repositories from github and show them on UI, but I’m wondering why method saveToCache
is not invoked.
println Thread.currentThread()
Observable.from([1, 2, 3, 4])
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.flatMap({
println 'in map thread == ' + Thread.currentThread()
Thread.sleep(500)
return Observable.just(it + ' mapped')
}
)
.toBlocking()
.subscribe(
{ it -> println "onNext() ${it} ${Thread.currentThread()}" },
{ error -> println "onError() ${error}" },
{ println 'onCompleted' }
)
println '...'
Thread[Thread-122,6,main]
in map thread == Thread[RxCachedThreadScheduler-1,6,main]
in map thread == Thread[RxCachedThreadScheduler-1,6,main]
onNext() 1 mapped Thread[Thread-122,6,main]
in map thread == Thread[RxCachedThreadScheduler-1,6,main]
onNext() 2 mapped Thread[Thread-122,6,main]
in map thread == Thread[RxCachedThreadScheduler-1,6,main]
onNext() 3 mapped Thread[Thread-122,6,main]
onNext() 4 mapped Thread[Thread-122,6,main]
onCompleted
...
flatMap
to each get there own thread from Schedulers.io()
[RxCachedThreadScheduler-1,6,main]
Schedulers.io()
?
println Thread.currentThread()
Observable.from([1, 2, 3, 4])
.flatMap({
println 'in map thread == ' + Thread.currentThread()
def val = it
return Observable.create({ obs ->
Thread.sleep(500)
println 'in Observable within flatMap thread == ' + Thread.currentThread()
obs.onNext(val + ' mapped')
obs.onCompleted()
}).subscribeOn(Schedulers.io())
}
)
.toBlocking()
.subscribe(
{ it -> println "onNext() ${it} ${Thread.currentThread()}" },
{ error -> println "onError() ${error}" },
{ println 'onCompleted' }
)
println '...'
Thread[Thread-133,6,main]
in map thread == Thread[Thread-133,6,main]
in map thread == Thread[Thread-133,6,main]
in map thread == Thread[Thread-133,6,main]
in map thread == Thread[Thread-133,6,main]
in Observable within flatMap thread == Thread[RxCachedThreadScheduler-1,6,main]
in Observable within flatMap thread == Thread[RxCachedThreadScheduler-2,6,main]
in Observable within flatMap thread == Thread[RxCachedThreadScheduler-3,6,main]
in Observable within flatMap thread == Thread[RxCachedThreadScheduler-4,6,main]
onNext() 1 mapped Thread[Thread-133,6,main]
onNext() 2 mapped Thread[Thread-133,6,main]
onNext() 4 mapped Thread[Thread-133,6,main]
onNext() 3 mapped Thread[Thread-133,6,main]
onCompleted
...
Observable.fromCallable
defer
is more flexible because it allows you return any observable (but requires more code)
println Thread.currentThread()
Observable.from([1, 2, 3, 4])
.flatMap({
println 'in map thread == ' + Thread.currentThread()
def val = it
return Observable.fromCallable({
Thread.sleep(500)
println 'in Observable within flatMap thread == ' + Thread.currentThread()
return val + ' mapped'
}).subscribeOn(Schedulers.io())
}
)
//.toBlocking()
.subscribe(
{ it -> println "onNext() ${it} ${Thread.currentThread()}" },
{ error -> println "onError() ${error}" },
{ println 'onCompleted' }
)
println '...'
Observable.fromCallable
import rx.lang.scala.Observable
import scala.concurrent.duration._
object Main {
def main(args: Array[String]): Unit = {
val a = Observable.interval(1000 millis).map(_ => "A")
val d = Observable.interval(3000 millis).map(_ => "D")
val e = Observable.interval(5000 millis).map(_ => "E")
a.merge(d).merge(e)
.take(100)
.subscribe(x => println(x))
}
}
sbt run
it outputs nothing