Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    cavemansspa
    @cavemansspa
    how do you get flatMap request to use multiple threads from Schedulers.io()?
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    @cavemansspa apply subscribeOn() to Observable that you return in the flatMap, like flatMap(value -> Observable.something().subscribeOn())
    Dorus
    @Dorus
    @cavemansspa use observeOn inside flatMap.
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    @Dorus then the computation of the Observable returned in flatMap() will happen on initial (iothread in this case) and only emission down to chain after flatMap will happen on observeOn scheduler
    Dorus
    @Dorus
    yeah, it depend on the structure of your inner observalbe. Sometimes you might need subscribeOn like you mentioned.
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    yup :)
    cavemansspa
    @cavemansspa
    ok -- giving that a go.
    Dorus
    @Dorus

    You can also do

    .flatMap({
        Schedulers.io().createWorker().schedule(() -> {
        println 'in map thread == ' + Thread.currentThread()
        Thread.sleep(500)
        return Observable.just(it + ' mapped')
        })})

    (Not 100% sure if i use worker correct here, might still need to dispose it in the end somehow.)

    actually nvm, that wont even return anything :P
    cavemansspa
    @cavemansspa
    println Thread.currentThread()       
    Observable.from([1, 2, 3, 4])
    .flatMap({
        println 'in map thread == ' + Thread.currentThread()
        def val = it
        return Observable.create({ obs ->
            Thread.sleep(500)
            println 'in Observable within flatMap thread == ' + Thread.currentThread()
            obs.onNext(val + ' mapped')
            obs.onCompleted()
        }).subscribeOn(Schedulers.io())
    }
    )   
    .toBlocking()
    .subscribe(
        { it -> println "onNext() ${it} ${Thread.currentThread()}" },
        { error -> println "onError() ${error}" },
        { println 'onCompleted' }
    )
    println '...'
    Thread[Thread-133,6,main]
    in map thread == Thread[Thread-133,6,main]
    in map thread == Thread[Thread-133,6,main]
    in map thread == Thread[Thread-133,6,main]
    in map thread == Thread[Thread-133,6,main]
    in Observable within flatMap thread == Thread[RxCachedThreadScheduler-1,6,main]
    in Observable within flatMap thread == Thread[RxCachedThreadScheduler-2,6,main]
    in Observable within flatMap thread == Thread[RxCachedThreadScheduler-3,6,main]
    in Observable within flatMap thread == Thread[RxCachedThreadScheduler-4,6,main]
    onNext() 1 mapped Thread[Thread-133,6,main]
    onNext() 2 mapped Thread[Thread-133,6,main]
    onNext() 4 mapped Thread[Thread-133,6,main]
    onNext() 3 mapped Thread[Thread-133,6,main]
    onCompleted
    ...
    that does the trick
    Dorus
    @Dorus
    Or
    .flatMap({
        println 'in map thread == ' + Thread.currentThread()
        def val = it
        Observable.just(0)
            .observeOn(Schedulers.io())
            .map({ 
              Thread.sleep(500)
              println 'in Observable within flatMap thread == ' + Thread.currentThread()
              val + ' mapped'
            })
    }
    This is scala right?
    cavemansspa
    @cavemansspa
    groovy
    Dorus
    @Dorus
    ah okay, not familiar with groovy :P
    In schala you could leave out the return commands if they are the last thing in a function.
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    @cavemansspa btw, better use Observable.fromCallable() instead of create() just to not deal with Observable contract (in case you need to emit one value)
    cavemansspa
    @cavemansspa
    same with groovy. the groovyConsole makes testing these scenarios interactively very easy.
    @Dorus -- okay, will have a closer look at Observable.fromCallable
    Dorus
    @Dorus
    fromCallable is a smart one too yes.
    cavemansspa
    @cavemansspa
    @Dorus -- looks good, I should be able to pass a groovy closure to that.
    Dorus
    @Dorus
    I believe those functions should be able to take a Scheduler directly
    Or not, cant find it now :P
    Anyway i'm sure you figure it out from here.
    cavemansspa
    @cavemansspa
    @Dorus / @artem-zinnatullin -- thank you for the help.
    chijikpijik
    @chijikpijik
    what difference between fromCallable and defer with just?
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    no difference in your case
    but defer is more flexible because it allows you return any observable (but requires more code)
    chijikpijik
    @chijikpijik
    @artem-zinnatullin o, just bump RxJava to 1.0.15 and fromCallable has appear
    Dorus
    @Dorus
    oooh that's probably what i need to do too
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    yup, I've added it in 1.0.15
    chijikpijik
    @chijikpijik
    @artem-zinnatullin :thumbsup:
    cavemansspa
    @cavemansspa
    println Thread.currentThread()       
    Observable.from([1, 2, 3, 4])
    .flatMap({
        println 'in map thread == ' + Thread.currentThread()
        def val = it
        return Observable.fromCallable({ 
            Thread.sleep(500)
            println 'in Observable within flatMap thread == ' + Thread.currentThread()
            return val + ' mapped'
           }).subscribeOn(Schedulers.io())
    }
    )   
    //.toBlocking()
    .subscribe(
        { it -> println "onNext() ${it} ${Thread.currentThread()}" },
        { error -> println "onError() ${error}" },
        { println 'onCompleted' }
    )
    println '...'
    works great with Observable.fromCallable
    Tushar Mathur
    @tusharmath
    I have the following code —
    import rx.lang.scala.Observable
    import scala.concurrent.duration._
    
    object Main {
      def main(args: Array[String]): Unit = {
        val a = Observable.interval(1000 millis).map(_ => "A")
        val d = Observable.interval(3000 millis).map(_ => "D")
        val e = Observable.interval(5000 millis).map(_ => "E")
        a.merge(d).merge(e)
          .take(100)
          .subscribe(x => println(x))
      }
    }
    But when I do sbt run it outputs nothing
    Can someone tell me whats wrong? Why is my process ending without waiting for 100 events?
    cavemansspa
    @cavemansspa
    @tusharmath -- why are you doing multiple merges -- don't you want Observable.merge(a, d, e).take(100)?
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    @tusharmath you're not blocking main thread
    Dmitriy Zaitsev
    @DmitriyZaitsev
    @tusharmath you have to synchronize your code
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    Observable.interval() works on Schedulers.computation() by default, so your chain works on 3 threads (each Observable.interval() on separate thread) and main thread ends before chain processing
    Dorus
    @Dorus
    I know you guys always use toBlocking and toList stuff to make queries blocking, but that's just tutorial useage. In real world scenario's your thread usually wont kill your application if you allow it to continue to do stuff. Better to follow the query with a readLine or so.
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    Right, but for tutorial/sample usage like this it's ok to use toBlocking() I guess
    Dorus
    @Dorus
    Or add synchronization to the onCompleted method of your subscription.
    If you are writing a testcase, you can rely on testScheduler to block.
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    which is basically manual toBlocking() :) (it was about synchronization)
    Dmitriy Zaitsev
    @DmitriyZaitsev
    @tusharmath try smth like this:
      def main(args: Array[String]) {
        val lock: AnyRef = new AnyRef
        val a = Observable.interval(1000 millis).map(_ => "A")
        val d = Observable.interval(3000 millis).map(_ => "D")
        val e = Observable.interval(5000 millis).map(_ => "E")
        a.merge(d).merge(e)
          .take(100)
          .subscribe(new Subscriber[String]() {
            def onCompleted() {
              lock synchronized {
                lock.notifyAll()
              }
            }
            def onError(throwable: Throwable) {}
            def onNext(s: String) = println(s)
          })
        lock synchronized {
          lock.wait()
        }
      }
    }
    Dorus
    @Dorus
    excelent
    systemfreund
    @systemfreund
    i am using an io scheduler's createWorker method to do work with a worker... but apparently the created (daemon-)threads are never terminated, as the thread-count keeps rising. but i am pretty sure that the code which does the work is actually terminating