Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Dorus
    @Dorus
    Anyway i think i like map + filter more in this instance. It communicate clearly what's intended.
    I do have the feeling the flatMap one could be written cleaner also.
    o.flatMap(x -> Observalbe.just(f(x)).filter(y -> y != null) )
    lol :)
    map is still cleaner.
    Dorus
    @Dorus
    or write an transformer .map(x -> f(x)).compose(ignoreNull()) or even .compose(ignoreNull(x -> f(x))).
    Artem Kholodnyi
    @defHLT
    @Dorus according to my simple benchmark, not surprisingly, map + filter 10x faster than using flatMap for the same purpose. And I agree that it's much more clear (and even shorter with lambda)
    Dorus
    @Dorus
    <3 simple benchmarks. Looks like we have a clear winner then =D
    Artem Kholodnyi
    @defHLT
    I'm not taking that benchmark too serious; both ways have negligible overhead it seems :)
    Dorus
    @Dorus
    very true
    dwursteisen
    @dwursteisen
    Do you know any article which explain the backpressure with Rx, and how to write a correct Producer ? I'm reading articles from the blog http://akarnokd.blogspot.fr/. But I'm only understood that writing a Producer is hard and complicated.
    If you understand the operators, producers should be easier. Continue with none-backpresure producers and then backpresure shouldnt be too hard. Still i dont know any article for just that.
    dwursteisen
    @dwursteisen
    None-backpressure producer ? All producer I'm just lock into the Rx code base is for backpressure. Do you know any non backpressure producer ?
    I may have not fully understood operators, as I don't feel that producers are easy :smile:
    Tushar Mathur
    @tusharmath
    @cavemansspa Is there a method for Observable.merge?
    @DmitriyZaitsev Sorry for the late response. Though your code works just perfectly find, I am unable to understand what is lock and how is it working here?
    whst notifyAll doing?
    Dmitriy Zaitsev
    @DmitriyZaitsev
    @tusharmath lock.wait() causes the current thread to wait until another thread invokes lock.notify() or lock.notifyAll().
    Since the reactive code executes asynchronously on a different thread, method main() can not be finished until rx.Observable emits elements.
    As soon as the Observable is completed, it unlocks the lock object, which will cause the main thread to continue till the end.
    Tushar Mathur
    @tusharmath
    is it documented somewhere?
    Dorus
    @Dorus
    You mean the java language specs? yeah, those are documented.
    Tushar Mathur
    @tusharmath
    I mean this particular lock.wait thingy... sorry if it sounds stupid, I am new to scala
    Samuel Tardieu
    @samueltardieu
    @tusharmath Those are Java methods available on Object and thus any Java (or Scala) object, as toString() is for example.
    Tushar Mathur
    @tusharmath
    okay
    Dorus
    @Dorus
    Ah sorry, i got distracted. But looks like you already have an answer. I was going to point at the Java guarded blocks tutorial as these are Java methods available on every Object. The only scala part is using new AnyRef instead of new Object().
    Tushar Mathur
    @tusharmath
    I thought Rx was single threaded?
    How many threads would I have created in the above code?
    is it one per interval call?
    Dmitriy Zaitsev
    @DmitriyZaitsev
    interval implicitly subscribes on computation scheduler
    your observables a, b, c emit elements on different threads
    Tushar Mathur
    @tusharmath
    But the actual excution is still happening on the main thread right?
    every interval creates a new thread, which is quite essentially idle and waiting for a timeout. Eventually when it fires it goes to the main thread to get consumed?
    Dorus
    @Dorus
    I always understood it runs on a threadpool, so most of the time it will actually run singlethreaded, as not all timeouts are expected to happen at the same time.
    However, the only thing it guarantees is that events run serialized, different events might still run parallel for different steps, and flatMap might even run everything concurrent (just that it starts serialized but if the inner observable has a delay, it already starts the next one).
    Im not sure what the computation does, but @DmitriyZaitsev might be right that it starts multiple threads.
    As that's often optimal for computation work.
    Tushar Mathur
    @tusharmath
    Which part would be pushed to the thread pool ?
    In the above example
    Will the map functions be called parallely ?
    or would it be done on the main thread?
    Or is it something we can control?
    Dorus
    @Dorus
    Can you point back at the example?
    You can give it an scheduler yes
    Tushar Mathur
    @tusharmath
      def main(args: Array[String]) {
        val lock: AnyRef = new AnyRef
        val a = Observable.interval(1000 millis).map(_ => "A")
        val d = Observable.interval(3000 millis).map(_ => "D")
        val e = Observable.interval(5000 millis).map(_ => "E")
        a.merge(d).merge(e)
          .take(100)
          .subscribe(new Subscriber[String]() {
            def onCompleted() {
              lock synchronized {
                lock.notifyAll()
              }
            }
            def onError(throwable: Throwable) {}
            def onNext(s: String) = println(s)
          })
        lock synchronized {
          lock.wait()
        }
      }
    }
    Dorus
    @Dorus
    What part needs to run on one thread?
    There are multiple options, but 90% of the time the default will perform best and adding too many hints isn't optimal.
    Tushar Mathur
    @tusharmath
    I don't know how this would actually work
    what is off loaded to the thread pool and what is not
    For instance — in this case