These are chat archives for ReactiveX/RxJava

26th
Apr 2016
Adrian
@AdrianoCelentano
Apr 26 2016 07:33
i have a Composite subscription in my Viewmodel, each time a button is clicked i subscribe to an observer. After clicking severall times i have a lot of subscription in my compositeSubscription. Should i unsubscribe after each request ?
Volkan Yazıcı
@vy
Apr 26 2016 08:24
Hey all! I have a stream of HttpResponse's of type Observable<HttpResponse>. I want to return the first success in the first 3 items, otherwise return the 3rd error message. (Basically, I am trying to implement a "retry at most 3 times if fails" HTTP driver.) Is there a way to achieve this via RxJava sorcery?
Dorus
@Dorus
Apr 26 2016 08:37
@AdrianoCelentano why not one constant subscriber?
@vy did you try retry(3)?
Adrian
@AdrianoCelentano
Apr 26 2016 08:43
@Dorus you mean reusing one single suscriber ?
Dorus
@Dorus
Apr 26 2016 09:47
@AdrianoCelentano Yes: You put an eventListener on the button, and each time it's clicked you merge the resulting actions into the stream, leaving lifetime management to that single subscription.
I could give some examples, but i'm not sure what you are subscribing at right now.
Volkan Yazıcı
@vy
Apr 26 2016 12:40
@Dorus retry(3) does not work as I expect it to: https://gist.github.com/vy/f3998912d3649f3201f6350bd081b3b5 throws an exception.
Dorus
@Dorus
Apr 26 2016 12:41
Because you resubscribe to concat, and concat will then emit the first observable (including exception) again.
Volkan Yazıcı
@vy
Apr 26 2016 12:41
@Dorus I was afraid of that. Any other way to test my impl?
Dorus
@Dorus
Apr 26 2016 12:42
i think you need a different operator, i beliece catch would work
o0.catch(o1).catch(os) if i'm right.
instead of retry.
*onErrorResumeNext i mean
Volkan Yazıcı
@vy
Apr 26 2016 12:43
@Dorus But in my actual code, "Observable<Integer> os" is provided by another function. Hence, I have no idea (and I better shouldn't) on how it is generated.
Dorus
@Dorus
Apr 26 2016 12:44
But if you subscribe a second time, it might return a different (no exception) value?
ooh you want to test that. ok
int count = 0;
Observable<Integer> source = Observable.create(ob -> {
   switch(count++) {
      case 0:
          Observable.error(new Exception()).subscribe(ob);
          break;
      case 1:
          Observable.just(1).subscribe(ob);
          break;
     case 2:
          Observable.concat(o0, o0, o1).subscribe(ob);
          break;
   }
});
Something like that
Volkan Yazıcı
@vy
Apr 26 2016 12:52
        AtomicInteger counter = new AtomicInteger(0);
        Observable<Integer> os = Observable.create(subscriber -> {
            int count = counter.getAndIncrement();
            if (count < 3) subscriber.onError(new Exception());
            else subscriber.onNext(count);
        });
        Integer value = os.retry(3).toSingle().toBlocking().value();
@Dorus This hangs after 3rd call. Any ideas?
Dorus
@Dorus
Apr 26 2016 13:02
because your source never completes.
Volkan Yazıcı
@vy
Apr 26 2016 13:03
Fsck! Prepending a first() before toSingle() solved the problem. Thanks.
Dorus
@Dorus
Apr 26 2016 13:03
else {
  subscriber.onNext(count);
  subscriber.onCompleted();
}
Volkan Yazıcı
@vy
Apr 26 2016 13:03
Nope, I do not want to complete the stream.
Dorus
@Dorus
Apr 26 2016 13:03
Yeah that would work too, first() will also close the stream for you
Well, i mean, a propperly set up Observable.create should always finish with a onCompleted or onError messages
Alessandro Vermeulen
@spockz
Apr 26 2016 17:49
Forgive my ignorance but shouldn't Observable.error(someThrowable).doOnComplete(println("done")) always print "done"?
This is what I'm actually running but what doesn't produce any output:
    val obs = rx.Observable.error(new IllegalStateException("foo"))
    obs.doOnError(new Action1[Throwable] {
      override def call(t: Throwable): Unit = println(t)
    })
    obs.doOnCompleted(new Action0 {
      override def call(): Unit = println("DOne")
    })
David Stemmer
@weefbellington
Apr 26 2016 17:52
there's been some talk on ASG (Android Study Group, an invite-only Android slack channel) about having a better place for rxjava public chat / technical discussion
ASG is very active and has a very enthusiastic rxjava community
but because it's invite-only and Android-specific, it's kind of isolated from the rest of the RxJava community
Alessandro Vermeulen
@spockz
Apr 26 2016 18:20
@weefbellington , was that a remark regarding my question?
Dorus
@Dorus
Apr 26 2016 19:25
@spockz No, that wont print because the observable did not end in onCompleted, it ended in onError
Also you are creating 2 new observables
So you can only subscribe to one like that
The correct way is
val obs = rx.Observable.error(new IllegalStateException("foo"))
    .doOnError(new Action1[Throwable] {
      override def call(t: Throwable): Unit = println(t)
    })
    .doOnCompleted(new Action0 {
      override def call(): Unit = println("DOne")
    });
Alessandro Vermeulen
@spockz
Apr 26 2016 19:44
@Dorus of course!!!
Alessandro Vermeulen
@spockz
Apr 26 2016 20:08
This message was deleted
David Stemmer
@weefbellington
Apr 26 2016 20:09
@spockz not related :)
just some rxjava community gossip
Alessandro Vermeulen
@spockz
Apr 26 2016 20:16
@Dorus suppose I want to only create one Observable per resource, and have that resource freed when all subscribers of the Observable are unsubscribed. Could I do that by using Observable.using? Like below. Or is that way to convoluted?
val watcher: java.nio.file.WatchService
var outputStreams: Map[WatchKey, Subject[WatchEvent[Path]] = Map.empty
var queries: Map[Path, Observable[WatchEvent[Path]] = Map.empty

def watch(dir: Path) =
   // if dir is contained in queries return it, otherwise create it by using watch2

def watch2(dir: Path): Observable[WatchEvent[Path]] = 
    Observable.using(dir.register(watcher), key => createObservable(dir, key), key => key.cancel())

def createObservable(dir:Path, watchKey: WatchKey): Observable[WatchEvent[Path]] = {
  if (!outputStreams.contains(watchKey)) {
     // return create publish subject and add it to outputStreams
     // add the subject to queries as well
  } else {
     // return the subject from the map
  }
@weefbellington I do keep reading beef wellington...
Dorus
@Dorus
Apr 26 2016 20:17
@spockz That sounds like the scenario where you use .publish().refCount()
Observable.using still creates a new resource every time you subscribe. But .publish().refCount() will make you multicast.
Alessandro Vermeulen
@spockz
Apr 26 2016 20:18
would that be Observable.using(...).publish().refCount() then?
Dorus
@Dorus
Apr 26 2016 20:19
You still need to think about what you want to push onto your subscribers. If it's only live data, publish would work. If they also need to get the most recent element on subscription you could use replay(1) etc.
Alessandro Vermeulen
@spockz
Apr 26 2016 20:19
@Dorus only live data is necessary
Dorus
@Dorus
Apr 26 2016 20:19
using is only used to tie the lifetime of the resource to the subscription.
Shouldn't it be Observable.using(() -> dir.register(watcher), ....?
There are other ways to solve it, but using would certainly be valid.
Alessandro Vermeulen
@spockz
Apr 26 2016 20:22
So I can change it to as follows:
val watcher: java.nio.file.WatchService
var outputStreams: Map[WatchKey, Subject[WatchEvent[Path]] = Map.empty
var queries: Map[Path, Observable[WatchEvent[Path]] = Map.empty

def watch(dir: Path) =
   // if dir is contained in queries return it, otherwise create it by using watch2 and add it to queries

def watch2(dir: Path): Observable[WatchEvent[Path]] = 
    Observable.using(dir.register(watcher), key => createObservable(dir, key), key => key.cancel()).publish().refCount()

def createObservable(dir:Path, watchKey: WatchKey): Observable[WatchEvent[Path]] = {
     // return create publish subject and add it to outputStreams
     // we assume that publish() and refCount() will take of having only 1 direct subscriber
}
@Dorus and yes, in Java, but in RxScala the using method is defined with a call by name argument for the resource factory.
Dorus
@Dorus
Apr 26 2016 20:23
ooh cool. I'm not that familiar with Scala :)
Alessandro Vermeulen
@spockz
Apr 26 2016 20:24
@Dorus its a nice feature which enables 'laziness' transparently :)
@Dorus there is also share():
 public final Observable<T> share() {
        return publish().refCount();
    }
Dorus
@Dorus
Apr 26 2016 20:26
yeah i know. Forgot if that was a RxJava thing ^_^
Alessandro Vermeulen
@spockz
Apr 26 2016 20:27
@Dorus how do you use RxJava ?
Dorus
@Dorus
Apr 26 2016 20:27
not really
Using Rx.Net mostly, and even that not much right now. Mostly hanging out in these gitter channels to help people out :)
RxJS is the most active, so i'm getting more familiar with that now.
Alessandro Vermeulen
@spockz
Apr 26 2016 20:28
@Dorus amazing! it is much appreciated. :)
@Dorus is there a different channel for all of the dialects?
Dorus
@Dorus
Apr 26 2016 20:28
yeah
I'm not sure why RxJava is not connected to the rest.
Alessandro Vermeulen
@spockz
Apr 26 2016 20:29
different repos?
in different orgs
ReactiveJS is pretty busy with almost 1k participant
Dorus
@Dorus
Apr 26 2016 20:30
ReactiveJS has nothing to do with Rx right?
Alessandro Vermeulen
@spockz
Apr 26 2016 20:31
@Dorus sorry meant RxJS :)
Dorus
@Dorus
Apr 26 2016 20:31
yeah, very active.
Alessandro Vermeulen
@spockz
Apr 26 2016 20:32
I could imagine that React and Angular would use RxJS for their streaming purposes instead of their own stuff..
David Stemmer
@weefbellington
Apr 26 2016 20:54
@Dorus are you an Rx contributor?
Dorus
@Dorus
Apr 26 2016 20:55
?
David Stemmer
@weefbellington
Apr 26 2016 20:55
I mean do you contribute to one of the frameworks itself
Dorus
@Dorus
Apr 26 2016 20:55
code? Not really, i did fill some bugs
David Stemmer
@weefbellington
Apr 26 2016 20:56
I was just curious, I know Artem Zinnatullin works on RxJava, just thought you might be a contributor since you're the most active in this channel
Dorus
@Dorus
Apr 26 2016 21:06
I haven't seen many contributors yes in these channels, they usually do respond quickly to github issues.
David Stemmer
@weefbellington
Apr 26 2016 21:08
unfortunately, it seems like the number of rxjava contributors has dwindled over time
Dorus
@Dorus
Apr 26 2016 21:09
@weefbellington I've seen your name around a few times before, what's your tie to Rx?
David Stemmer
@weefbellington
Apr 26 2016 21:09
no tie just a user
but I think that Ben Christensen is pouring his efforts into Reactive Extensions right now
Reactive Streams I mean
I believe that David Karnok is the core contributor now but he spends a lot of his time on nowadays on Project Reactor rather than RxJava
Dorus
@Dorus
Apr 26 2016 21:14
ic. I did notice Rx development is kinda slow. Good thing the current library is pretty solid.
RxJS does seem to have a lot of progress on v5.
And Rx.Net is completely silent.
David Stemmer
@weefbellington
Apr 26 2016 21:15
do you know if v5 using the Reactive Streams specification?
Dorus
@Dorus
Apr 26 2016 21:22
All i know is that the migration document mentiones "Compliance with the ES7 Observable Spec"
(And also that i like onNext a lot more than next)