Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Dorus
    @Dorus
    Anyway i'm sure you figure it out from here.
    cavemansspa
    @cavemansspa
    @Dorus / @artem-zinnatullin -- thank you for the help.
    chijikpijik
    @chijikpijik
    what difference between fromCallable and defer with just?
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    no difference in your case
    but defer is more flexible because it allows you return any observable (but requires more code)
    chijikpijik
    @chijikpijik
    @artem-zinnatullin o, just bump RxJava to 1.0.15 and fromCallable has appear
    Dorus
    @Dorus
    oooh that's probably what i need to do too
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    yup, I've added it in 1.0.15
    chijikpijik
    @chijikpijik
    @artem-zinnatullin :thumbsup:
    cavemansspa
    @cavemansspa
    println Thread.currentThread()       
    Observable.from([1, 2, 3, 4])
    .flatMap({
        println 'in map thread == ' + Thread.currentThread()
        def val = it
        return Observable.fromCallable({ 
            Thread.sleep(500)
            println 'in Observable within flatMap thread == ' + Thread.currentThread()
            return val + ' mapped'
           }).subscribeOn(Schedulers.io())
    }
    )   
    //.toBlocking()
    .subscribe(
        { it -> println "onNext() ${it} ${Thread.currentThread()}" },
        { error -> println "onError() ${error}" },
        { println 'onCompleted' }
    )
    println '...'
    works great with Observable.fromCallable
    Tushar Mathur
    @tusharmath
    I have the following code —
    import rx.lang.scala.Observable
    import scala.concurrent.duration._
    
    object Main {
      def main(args: Array[String]): Unit = {
        val a = Observable.interval(1000 millis).map(_ => "A")
        val d = Observable.interval(3000 millis).map(_ => "D")
        val e = Observable.interval(5000 millis).map(_ => "E")
        a.merge(d).merge(e)
          .take(100)
          .subscribe(x => println(x))
      }
    }
    But when I do sbt run it outputs nothing
    Can someone tell me whats wrong? Why is my process ending without waiting for 100 events?
    cavemansspa
    @cavemansspa
    @tusharmath -- why are you doing multiple merges -- don't you want Observable.merge(a, d, e).take(100)?
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    @tusharmath you're not blocking main thread
    Dmitriy Zaitsev
    @DmitriyZaitsev
    @tusharmath you have to synchronize your code
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    Observable.interval() works on Schedulers.computation() by default, so your chain works on 3 threads (each Observable.interval() on separate thread) and main thread ends before chain processing
    Dorus
    @Dorus
    I know you guys always use toBlocking and toList stuff to make queries blocking, but that's just tutorial useage. In real world scenario's your thread usually wont kill your application if you allow it to continue to do stuff. Better to follow the query with a readLine or so.
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    Right, but for tutorial/sample usage like this it's ok to use toBlocking() I guess
    Dorus
    @Dorus
    Or add synchronization to the onCompleted method of your subscription.
    If you are writing a testcase, you can rely on testScheduler to block.
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    which is basically manual toBlocking() :) (it was about synchronization)
    Dmitriy Zaitsev
    @DmitriyZaitsev
    @tusharmath try smth like this:
      def main(args: Array[String]) {
        val lock: AnyRef = new AnyRef
        val a = Observable.interval(1000 millis).map(_ => "A")
        val d = Observable.interval(3000 millis).map(_ => "D")
        val e = Observable.interval(5000 millis).map(_ => "E")
        a.merge(d).merge(e)
          .take(100)
          .subscribe(new Subscriber[String]() {
            def onCompleted() {
              lock synchronized {
                lock.notifyAll()
              }
            }
            def onError(throwable: Throwable) {}
            def onNext(s: String) = println(s)
          })
        lock synchronized {
          lock.wait()
        }
      }
    }
    Dorus
    @Dorus
    excelent
    systemfreund
    @systemfreund
    i am using an io scheduler's createWorker method to do work with a worker... but apparently the created (daemon-)threads are never terminated, as the thread-count keeps rising. but i am pretty sure that the code which does the work is actually terminating
    Dorus
    @Dorus
    Workers have an unsubscribe method
    @systemfreund Use that.
    systemfreund
    @systemfreund
    i do, but didn't change much... everytime i trigger that worker the thread-count still keeps getting higher
    Dorus
    @Dorus
    I kinda dislike the interface, shame there is no 'schedule final' that let all current work finish and then kill the worker thread. I guess you could use worker.schedule(() => worker.unsubscribe())
    systemfreund
    @systemfreund
    yes, that's basically what i am doing ... () -> doRealWork(); worker.unsubscribe();
    Dorus
    @Dorus
    Mm then i'm clueless :S
    systemfreund
    @systemfreund
    @Dorus looks like i forgot to unsubscribe at another place :) now everything appears to be fine
    Artem Kholodnyi
    @defHLT
    hey guys, do you have any data on whether it faster to use map + filter combo or flatMap instead?
    Artem Kholodnyi
    @defHLT
    it actually makes more sense to compare against concatMap as it preserved order
    Dorus
    @Dorus

    @mlatu Why do you need map+filter? Just filter can already do the filtering and possible data manipulations (just nothing persistent).

    flatMap is for one to many tranformations, map for one to one.

    About preserving order, do you really need that in a reactive system? concatMap also evaluate everything serial i think, flatMap has the possibility to go concurrent if you need it.

    Artem Kholodnyi
    @defHLT
    @Dorus One pattern I saw that instead of
    o.map( x -> f(x)).filter(x -> x != null)
    the following is used:
    o.flatMap(x -> y = f(x); y != null ? Observable.just(y) : Observable.empty())
    It helps greatly with boilerplate on Android where lambda is not (easily) available
    Dorus
    @Dorus
    flatMap like this isn't doing anything concurrent, so it wont change order.
    Identical code, just schematic difference.
    Oh right, flatMap is one to zero or more, here it's used for 0 or 1 element. Map is also one on one.
    Very hard to say what's faster. You're probably talking about nanoseconds here, but you can run it 10.000.000 times in a loop and see.
    Just measure Observable.range(10000000)./* map or flatmap */.subscribe(() -> endMeasure())
    @mlatu
    Dorus
    @Dorus
    Anyway i think i like map + filter more in this instance. It communicate clearly what's intended.
    I do have the feeling the flatMap one could be written cleaner also.
    o.flatMap(x -> Observalbe.just(f(x)).filter(y -> y != null) )
    lol :)
    map is still cleaner.
    Dorus
    @Dorus
    or write an transformer .map(x -> f(x)).compose(ignoreNull()) or even .compose(ignoreNull(x -> f(x))).
    Artem Kholodnyi
    @defHLT
    @Dorus according to my simple benchmark, not surprisingly, map + filter 10x faster than using flatMap for the same purpose. And I agree that it's much more clear (and even shorter with lambda)