Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Volkan Yazıcı
    @vy
    Hey all! I have a stream of HttpResponse's of type Observable<HttpResponse>. I want to return the first success in the first 3 items, otherwise return the 3rd error message. (Basically, I am trying to implement a "retry at most 3 times if fails" HTTP driver.) Is there a way to achieve this via RxJava sorcery?
    Dorus
    @Dorus
    @AdrianoCelentano why not one constant subscriber?
    @vy did you try retry(3)?
    Adrian
    @AdrianoCelentano
    @Dorus you mean reusing one single suscriber ?
    Dorus
    @Dorus
    @AdrianoCelentano Yes: You put an eventListener on the button, and each time it's clicked you merge the resulting actions into the stream, leaving lifetime management to that single subscription.
    I could give some examples, but i'm not sure what you are subscribing at right now.
    Volkan Yazıcı
    @vy
    @Dorus retry(3) does not work as I expect it to: https://gist.github.com/vy/f3998912d3649f3201f6350bd081b3b5 throws an exception.
    Dorus
    @Dorus
    Because you resubscribe to concat, and concat will then emit the first observable (including exception) again.
    Volkan Yazıcı
    @vy
    @Dorus I was afraid of that. Any other way to test my impl?
    Dorus
    @Dorus
    i think you need a different operator, i beliece catch would work
    o0.catch(o1).catch(os) if i'm right.
    instead of retry.
    *onErrorResumeNext i mean
    Volkan Yazıcı
    @vy
    @Dorus But in my actual code, "Observable<Integer> os" is provided by another function. Hence, I have no idea (and I better shouldn't) on how it is generated.
    Dorus
    @Dorus
    But if you subscribe a second time, it might return a different (no exception) value?
    ooh you want to test that. ok
    int count = 0;
    Observable<Integer> source = Observable.create(ob -> {
       switch(count++) {
          case 0:
              Observable.error(new Exception()).subscribe(ob);
              break;
          case 1:
              Observable.just(1).subscribe(ob);
              break;
         case 2:
              Observable.concat(o0, o0, o1).subscribe(ob);
              break;
       }
    });
    Something like that
    Volkan Yazıcı
    @vy
            AtomicInteger counter = new AtomicInteger(0);
            Observable<Integer> os = Observable.create(subscriber -> {
                int count = counter.getAndIncrement();
                if (count < 3) subscriber.onError(new Exception());
                else subscriber.onNext(count);
            });
            Integer value = os.retry(3).toSingle().toBlocking().value();
    @Dorus This hangs after 3rd call. Any ideas?
    Dorus
    @Dorus
    because your source never completes.
    Volkan Yazıcı
    @vy
    Fsck! Prepending a first() before toSingle() solved the problem. Thanks.
    Dorus
    @Dorus
    else {
      subscriber.onNext(count);
      subscriber.onCompleted();
    }
    Volkan Yazıcı
    @vy
    Nope, I do not want to complete the stream.
    Dorus
    @Dorus
    Yeah that would work too, first() will also close the stream for you
    Well, i mean, a propperly set up Observable.create should always finish with a onCompleted or onError messages
    Alessandro Vermeulen
    @spockz
    Forgive my ignorance but shouldn't Observable.error(someThrowable).doOnComplete(println("done")) always print "done"?
    This is what I'm actually running but what doesn't produce any output:
        val obs = rx.Observable.error(new IllegalStateException("foo"))
        obs.doOnError(new Action1[Throwable] {
          override def call(t: Throwable): Unit = println(t)
        })
        obs.doOnCompleted(new Action0 {
          override def call(): Unit = println("DOne")
        })
    David Stemmer
    @weefbellington
    there's been some talk on ASG (Android Study Group, an invite-only Android slack channel) about having a better place for rxjava public chat / technical discussion
    ASG is very active and has a very enthusiastic rxjava community
    but because it's invite-only and Android-specific, it's kind of isolated from the rest of the RxJava community
    Alessandro Vermeulen
    @spockz
    @weefbellington , was that a remark regarding my question?
    Dorus
    @Dorus
    @spockz No, that wont print because the observable did not end in onCompleted, it ended in onError
    Also you are creating 2 new observables
    So you can only subscribe to one like that
    The correct way is
    val obs = rx.Observable.error(new IllegalStateException("foo"))
        .doOnError(new Action1[Throwable] {
          override def call(t: Throwable): Unit = println(t)
        })
        .doOnCompleted(new Action0 {
          override def call(): Unit = println("DOne")
        });
    Alessandro Vermeulen
    @spockz
    @Dorus of course!!!
    Alessandro Vermeulen
    @spockz
    This message was deleted
    David Stemmer
    @weefbellington
    @spockz not related :)
    just some rxjava community gossip
    Alessandro Vermeulen
    @spockz
    @Dorus suppose I want to only create one Observable per resource, and have that resource freed when all subscribers of the Observable are unsubscribed. Could I do that by using Observable.using? Like below. Or is that way to convoluted?
    val watcher: java.nio.file.WatchService
    var outputStreams: Map[WatchKey, Subject[WatchEvent[Path]] = Map.empty
    var queries: Map[Path, Observable[WatchEvent[Path]] = Map.empty
    
    def watch(dir: Path) =
       // if dir is contained in queries return it, otherwise create it by using watch2
    
    def watch2(dir: Path): Observable[WatchEvent[Path]] = 
        Observable.using(dir.register(watcher), key => createObservable(dir, key), key => key.cancel())
    
    def createObservable(dir:Path, watchKey: WatchKey): Observable[WatchEvent[Path]] = {
      if (!outputStreams.contains(watchKey)) {
         // return create publish subject and add it to outputStreams
         // add the subject to queries as well
      } else {
         // return the subject from the map
      }
    @weefbellington I do keep reading beef wellington...
    Dorus
    @Dorus
    @spockz That sounds like the scenario where you use .publish().refCount()
    Observable.using still creates a new resource every time you subscribe. But .publish().refCount() will make you multicast.
    Alessandro Vermeulen
    @spockz
    would that be Observable.using(...).publish().refCount() then?
    Dorus
    @Dorus
    You still need to think about what you want to push onto your subscribers. If it's only live data, publish would work. If they also need to get the most recent element on subscription you could use replay(1) etc.
    Alessandro Vermeulen
    @spockz
    @Dorus only live data is necessary
    Dorus
    @Dorus
    using is only used to tie the lifetime of the resource to the subscription.
    Shouldn't it be Observable.using(() -> dir.register(watcher), ....?
    There are other ways to solve it, but using would certainly be valid.
    Alessandro Vermeulen
    @spockz
    So I can change it to as follows:
    val watcher: java.nio.file.WatchService
    var outputStreams: Map[WatchKey, Subject[WatchEvent[Path]] = Map.empty
    var queries: Map[Path, Observable[WatchEvent[Path]] = Map.empty
    
    def watch(dir: Path) =
       // if dir is contained in queries return it, otherwise create it by using watch2 and add it to queries
    
    def watch2(dir: Path): Observable[WatchEvent[Path]] = 
        Observable.using(dir.register(watcher), key => createObservable(dir, key), key => key.cancel()).publish().refCount()
    
    def createObservable(dir:Path, watchKey: WatchKey): Observable[WatchEvent[Path]] = {
         // return create publish subject and add it to outputStreams
         // we assume that publish() and refCount() will take of having only 1 direct subscriber
    }
    @Dorus and yes, in Java, but in RxScala the using method is defined with a call by name argument for the resource factory.