These are chat archives for ReactiveX/RxJava

21st
Mar 2016
cavemansspa
@cavemansspa
Mar 21 2016 15:10
println Thread.currentThread()       
Observable.from([1, 2, 3, 4])
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.flatMap({
    println 'in map thread == ' + Thread.currentThread()
    Thread.sleep(500)
    return Observable.just(it + ' mapped')
    }
)
.toBlocking()
.subscribe(
    { it -> println "onNext() ${it} ${Thread.currentThread()}" },
    { error -> println "onError() ${error}" },
    { println 'onCompleted' }
)
println '...'
Thread[Thread-122,6,main]
in map thread == Thread[RxCachedThreadScheduler-1,6,main]
in map thread == Thread[RxCachedThreadScheduler-1,6,main]
onNext() 1 mapped Thread[Thread-122,6,main]
in map thread == Thread[RxCachedThreadScheduler-1,6,main]
onNext() 2 mapped Thread[Thread-122,6,main]
in map thread == Thread[RxCachedThreadScheduler-1,6,main]
onNext() 3 mapped Thread[Thread-122,6,main]
onNext() 4 mapped Thread[Thread-122,6,main]
onCompleted
...
in this example, i'm expecting the flatMap to each get there own thread from Schedulers.io()
the output shows that the same thread is used i.e. [RxCachedThreadScheduler-1,6,main]
how do you get flatMap request to use multiple threads from Schedulers.io()?
Artem Zinnatullin :slowpoke:
@artem-zinnatullin
Mar 21 2016 15:23
@cavemansspa apply subscribeOn() to Observable that you return in the flatMap, like flatMap(value -> Observable.something().subscribeOn())
Dorus
@Dorus
Mar 21 2016 15:23
@cavemansspa use observeOn inside flatMap.
Artem Zinnatullin :slowpoke:
@artem-zinnatullin
Mar 21 2016 15:26
@Dorus then the computation of the Observable returned in flatMap() will happen on initial (iothread in this case) and only emission down to chain after flatMap will happen on observeOn scheduler
Dorus
@Dorus
Mar 21 2016 15:27
yeah, it depend on the structure of your inner observalbe. Sometimes you might need subscribeOn like you mentioned.
Artem Zinnatullin :slowpoke:
@artem-zinnatullin
Mar 21 2016 15:28
yup :)
cavemansspa
@cavemansspa
Mar 21 2016 15:29
ok -- giving that a go.
Dorus
@Dorus
Mar 21 2016 15:37

You can also do

.flatMap({
    Schedulers.io().createWorker().schedule(() -> {
    println 'in map thread == ' + Thread.currentThread()
    Thread.sleep(500)
    return Observable.just(it + ' mapped')
    })})

(Not 100% sure if i use worker correct here, might still need to dispose it in the end somehow.)

actually nvm, that wont even return anything :P
cavemansspa
@cavemansspa
Mar 21 2016 15:39
println Thread.currentThread()       
Observable.from([1, 2, 3, 4])
.flatMap({
    println 'in map thread == ' + Thread.currentThread()
    def val = it
    return Observable.create({ obs ->
        Thread.sleep(500)
        println 'in Observable within flatMap thread == ' + Thread.currentThread()
        obs.onNext(val + ' mapped')
        obs.onCompleted()
    }).subscribeOn(Schedulers.io())
}
)   
.toBlocking()
.subscribe(
    { it -> println "onNext() ${it} ${Thread.currentThread()}" },
    { error -> println "onError() ${error}" },
    { println 'onCompleted' }
)
println '...'
Thread[Thread-133,6,main]
in map thread == Thread[Thread-133,6,main]
in map thread == Thread[Thread-133,6,main]
in map thread == Thread[Thread-133,6,main]
in map thread == Thread[Thread-133,6,main]
in Observable within flatMap thread == Thread[RxCachedThreadScheduler-1,6,main]
in Observable within flatMap thread == Thread[RxCachedThreadScheduler-2,6,main]
in Observable within flatMap thread == Thread[RxCachedThreadScheduler-3,6,main]
in Observable within flatMap thread == Thread[RxCachedThreadScheduler-4,6,main]
onNext() 1 mapped Thread[Thread-133,6,main]
onNext() 2 mapped Thread[Thread-133,6,main]
onNext() 4 mapped Thread[Thread-133,6,main]
onNext() 3 mapped Thread[Thread-133,6,main]
onCompleted
...
that does the trick
Dorus
@Dorus
Mar 21 2016 15:42
Or
.flatMap({
    println 'in map thread == ' + Thread.currentThread()
    def val = it
    Observable.just(0)
        .observeOn(Schedulers.io())
        .map({ 
          Thread.sleep(500)
          println 'in Observable within flatMap thread == ' + Thread.currentThread()
          val + ' mapped'
        })
}
This is scala right?
cavemansspa
@cavemansspa
Mar 21 2016 15:43
groovy
Dorus
@Dorus
Mar 21 2016 15:43
ah okay, not familiar with groovy :P
In schala you could leave out the return commands if they are the last thing in a function.
Artem Zinnatullin :slowpoke:
@artem-zinnatullin
Mar 21 2016 15:44
@cavemansspa btw, better use Observable.fromCallable() instead of create() just to not deal with Observable contract (in case you need to emit one value)
cavemansspa
@cavemansspa
Mar 21 2016 15:44
same with groovy. the groovyConsole makes testing these scenarios interactively very easy.
@Dorus -- okay, will have a closer look at Observable.fromCallable
Dorus
@Dorus
Mar 21 2016 15:46
fromCallable is a smart one too yes.
cavemansspa
@cavemansspa
Mar 21 2016 15:48
@Dorus -- looks good, I should be able to pass a groovy closure to that.
Dorus
@Dorus
Mar 21 2016 15:49
I believe those functions should be able to take a Scheduler directly
Or not, cant find it now :P
Anyway i'm sure you figure it out from here.
cavemansspa
@cavemansspa
Mar 21 2016 15:52
@Dorus / @artem-zinnatullin -- thank you for the help.
chijikpijik
@chijikpijik
Mar 21 2016 16:01
what difference between fromCallable and defer with just?
Artem Zinnatullin :slowpoke:
@artem-zinnatullin
Mar 21 2016 16:02
no difference in your case
but defer is more flexible because it allows you return any observable (but requires more code)
chijikpijik
@chijikpijik
Mar 21 2016 16:04
@artem-zinnatullin o, just bump RxJava to 1.0.15 and fromCallable has appear
Dorus
@Dorus
Mar 21 2016 16:04
oooh that's probably what i need to do too
Artem Zinnatullin :slowpoke:
@artem-zinnatullin
Mar 21 2016 16:04
yup, I've added it in 1.0.15
chijikpijik
@chijikpijik
Mar 21 2016 16:06
@artem-zinnatullin :thumbsup:
cavemansspa
@cavemansspa
Mar 21 2016 16:52
println Thread.currentThread()       
Observable.from([1, 2, 3, 4])
.flatMap({
    println 'in map thread == ' + Thread.currentThread()
    def val = it
    return Observable.fromCallable({ 
        Thread.sleep(500)
        println 'in Observable within flatMap thread == ' + Thread.currentThread()
        return val + ' mapped'
       }).subscribeOn(Schedulers.io())
}
)   
//.toBlocking()
.subscribe(
    { it -> println "onNext() ${it} ${Thread.currentThread()}" },
    { error -> println "onError() ${error}" },
    { println 'onCompleted' }
)
println '...'
works great with Observable.fromCallable