These are chat archives for ReactiveX/RxJava

17th
Nov 2015
Javier Domingo Cansino
@txomon
Nov 17 2015 11:37
So I have this, which in theory should be triggered once at the beginning, but it isn't being triggered for some reason. I tried to follow the guidelines with the subscribeOn and the observeOn, but this is just not getting executed...
    Observable
            .fromCallable(
                    new Callable<Boolean>() {
                        @Override
                        public Boolean call() throws Exception {
                            Log.d(TAG, "Calling client.update");
                            return client.update();
                        }
                    }
            )
            .retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
                @Override
                public Observable<?> call(Observable<? extends Throwable> observable) {
                    if (observable.toBlocking().first() instanceof ClientException)
                        return Observable.timer(5, TimeUnit.SECONDS);
                    return Observable.empty();
                }
            })
            .observeOn(AndroidSchedulers.mainThread())
            .doOnError(new Action1<Throwable>() {
                @Override
                public void call(Throwable throwable) {
                    if (throwable instanceof ClientException) {
                        Toast.makeText(getApplicationContext(), "Exception while gathering the items", Toast.LENGTH_SHORT);
                    }
                }
            })
            .subscribeOn(Schedulers.io())
            .subscribe();
that's on the activity onCreate
Javier Domingo Cansino
@txomon
Nov 17 2015 11:45
maybe because .fromCallable() is experimental, that it doesn't work? although that doesn't make sense...
Dorus
@Dorus
Nov 17 2015 11:46
Looks correct as far as i can tell, but i'm not very familiar with fromCallable
Dorus
@Dorus
Nov 17 2015 11:53
Perhapts you need to try Defer instead. If i read the docs right, fromCallable and all it's siblings only run the enclosing feature once anyway, and buffer the result.
Javier Domingo Cansino
@txomon
Nov 17 2015 11:55
@Dorus but it should run once, shouldn't it?
Dorus
@Dorus
Nov 17 2015 11:55
You want it to buffer and send you the same error when retry triggers?
Try this:
    Observable.defer(() -> {
        try {
            return Observable.just(client.update());
        } catch (Throwable t) {
            return Observable.error(t);
        }
    });
Javier Domingo Cansino
@txomon
Nov 17 2015 11:56
I just want to run it once until it succeeds
ok
Dorus
@Dorus
Nov 17 2015 11:57
Also i would place subscribeOn right after defer, keep operators that belong together close together.
Javier Domingo Cansino
@txomon
Nov 17 2015 12:01
good idea
so, look at this:
    Observable
            .defer(new Func0<Observable<Boolean>>() {
                @Override
                public Observable<Boolean> call() {
                    Log.d(TAG, "Launching client update");
                    try {
                        return Observable.just(currentClient.update());
                    } catch (Throwable e) {
                        return Observable.error(e);
                    }
                }
            })
            .subscribeOn(Schedulers.io())
            .retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
                @Override
                public Observable<?> call(Observable<? extends Throwable> observable) {
                    Log.d(TAG, "Retry called");
                    if (observable.toBlocking().first() instanceof UbusRpcException)
                        return Observable.timer(5, TimeUnit.SECONDS);
                    return Observable.empty();
                }
            })
            .subscribe();
in the log, I get 11-17 12:02:34.044 8717-8717/com.txomon.openwrt.android D/OpenwrtMainActivity: Retry called
so for some reason, it's going straight to the retry, without even logging the "Launching client update"
(as you may have noticed, I have switched to a simpler code, just run the update() until it succeeds without any feedback)
Javier Domingo Cansino
@txomon
Nov 17 2015 12:07
and on top of that, it doesn't seem that even the retry works...
Dorus
@Dorus
Nov 17 2015 12:51
@txomon Can you also log the type of observable.toBlocking().first()?
Javier Domingo Cansino
@txomon
Nov 17 2015 12:51
sure
Dorus
@Dorus
Nov 17 2015 12:51
I'm stumped how it can skip Log.d(TAG, "Launching client update");
That seems impossible if retry is called.
Javier Domingo Cansino
@txomon
Nov 17 2015 12:52
same for me, the only way would be have an exception in Log.d
Dorus
@Dorus
Nov 17 2015 12:52
You should place Log.d inside the try then. Else it wont even emit an observable.
This message was deleted
This message was deleted
Dorus
@Dorus
Nov 17 2015 12:58
nvm
Javier Domingo Cansino
@txomon
Nov 17 2015 12:58
xD
yeah you said so =)
Dorus
@Dorus
Nov 17 2015 12:58
i read it as observable.call for a second :)
Javier Domingo Cansino
@txomon
Nov 17 2015 13:54
for some reason the Observable.just() is not firing anything ...
I have now switched to
    Observable
            .just(true)
            .retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
                @Override
                public Observable<?> call(Observable<? extends Throwable> observable) {
                    if (observable.toBlocking().first() instanceof UbusRpcException)
                        return Observable.timer(5, TimeUnit.SECONDS);
                    return Observable.empty();
                }
            })
            .subscribeOn(Schedulers.io())
            .subscribe(new Action1<Boolean>() {
                @Override
                public void call(Boolean aBoolean) {
                    Log.d(TAG, "Boolean received in action");
                }
            });
Javier Domingo Cansino
@txomon
Nov 17 2015 14:00
and is not working xD
Dorus
@Dorus
Nov 17 2015 14:01
You're not making any call
Javier Domingo Cansino
@txomon
Nov 17 2015 14:01
I can't thing on anything more simple...
@Dorus what do you mean?
Dorus
@Dorus
Nov 17 2015 14:01
you should see a log line "Boolean received in action"
Javier Domingo Cansino
@txomon
Nov 17 2015 14:01
yeah, I don't xD
that's why I am here hahaha
Dorus
@Dorus
Nov 17 2015 14:01
is this code even run?
Dorus @Dorus whispers something about /tmp
Javier Domingo Cansino
@txomon
Nov 17 2015 14:02
not this time
let me prove it
Dorus
@Dorus
Nov 17 2015 14:02
Just remove all Rx stuff and only leave the log line.
Or add a log line before it
Javier Domingo Cansino
@txomon
Nov 17 2015 14:03
    Log.d(TAG, "Doing the observer");
    Observable
            .just(true)
            .retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
                @Override
                public Observable<?> call(Observable<? extends Throwable> observable) {
                    if (observable.toBlocking().first() instanceof UbusRpcException)
                        return Observable.timer(5, TimeUnit.SECONDS);
                    return Observable.empty();
                }
            })
            .subscribeOn(Schedulers.io())
            .subscribe(new Action1<Boolean>() {
                @Override
                public void call(Boolean aBoolean) {
                    Log.d(TAG, "Boolean received in action");
                }
            });
    Log.d(TAG, "Finished the observer");
11-17 14:02:49.955 26974-26974/com.txomon.openwrt.android D/OpenwrtMainActivity: Doing the observer
11-17 14:02:49.971 26974-26974/com.txomon.openwrt.android D/OpenwrtMainActivity: Finished the observer
Dorus
@Dorus
Nov 17 2015 14:03
Perhaps Schedulers.io() is bussy?
Javier Domingo Cansino
@txomon
Nov 17 2015 14:03
let me take out the subscribeon()
This gets blocked
    Log.d(TAG, "Doing the observer");
    Observable
            .just(true)
            .retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
                @Override
                public Observable<?> call(Observable<? extends Throwable> observable) {
                    if (observable.toBlocking().first() instanceof UbusRpcException)
                        return Observable.timer(5, TimeUnit.SECONDS);
                    return Observable.empty();
                }
            })
            .subscribe(new Action1<Boolean>() {
                @Override
                public void call(Boolean aBoolean) {
                    Log.d(TAG, "Boolean received in action");
                }
            });
    Log.d(TAG, "Finished the observer");
11-17 14:04:09.876 28309-28309/com.txomon.openwrt.android D/OpenwrtMainActivity: Doing the observer
and nothing after
Dorus
@Dorus
Nov 17 2015 14:05
Yup, deadlock or something
Javier Domingo Cansino
@txomon
Nov 17 2015 14:05
but this is the main thread
Dorus
@Dorus
Nov 17 2015 14:05
mm, might be something wrong with retryWhen
Javier Domingo Cansino
@txomon
Nov 17 2015 14:05
ok, I take it out
maybe the .toBlocking() ?
@Dorus yeah, that retryWhen was the blocking thing
Dorus
@Dorus
Nov 17 2015 14:07
how did you get that code? Did somebody suggest it?
Javier Domingo Cansino
@txomon
Nov 17 2015 14:08
the .toBlocking was @artem-zinnatullin
I needed to get the UbusRpcException
I mean, the exception thrown
I wanted a retry() but with timeout
but I wasn't receiving the Throwable
Dorus
@Dorus
Nov 17 2015 14:09
yeah retryWhen is the correct function to use. But somehow the code you used for it is wrong.
I'm trying to figure out what would be correct.
I got this example:
Observable.create((Subscriber<? super String> s) -> {
      System.out.println("subscribing");
      s.onError(new RuntimeException("always fails"));
  }).retryWhen(attempts -> {
      return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
          System.out.println("delay retry by " + i + " second(s)");
          return Observable.timer(i, TimeUnit.SECONDS);
      });
  }).toBlocking().forEach(System.out::println);
Javier Domingo Cansino
@txomon
Nov 17 2015 14:10
yeah, but the problem is, where do you get the exception from?
I need to get it
Dorus
@Dorus
Nov 17 2015 14:10
attempts
Javier Domingo Cansino
@txomon
Nov 17 2015 14:11
but there is no exception in there
I mean, you receive an Observable<? extends Throwable>
Dorus
@Dorus
Nov 17 2015 14:12
Try
.retryWhen(attempts -> {
    return attempts.filter(t -> t instanceof UbusRpcException)
        .flatMap(__ ->  Observable.timer(5, TimeUnit.SECONDS));
});
I'm not sure if that is corret but i have the feeling it would work.
Javier Domingo Cansino
@txomon
Nov 17 2015 14:14
that should be .map(), shouldn't it?
Dorus
@Dorus
Nov 17 2015 14:14
The example used flatMap
Javier Domingo Cansino
@txomon
Nov 17 2015 14:14
I will use flatmap too just in case..
@Dorus you nailed it
Dorus
@Dorus
Nov 17 2015 14:22
I'm not sure if this filter stuff is allowed.
Yup, i was wrong. Currect function is takeWhile
.retryWhen(attempts -> {
    return attempts.takeWhile(t -> t instanceof UbusRpcException)
        .flatMap(__ ->  Observable.timer(5, TimeUnit.SECONDS));
});
mm
Javier Domingo Cansino
@txomon
Nov 17 2015 14:23
but it's working...
Dorus
@Dorus
Nov 17 2015 14:23
or not, now it swallows the error
Javier Domingo Cansino
@txomon
Nov 17 2015 14:24
there should be a way to extract the item from the observable
attempts is of type Observable<? extends Throwable> observable
Dorus
@Dorus
Nov 17 2015 14:25
i think we need it to end it with onError
problem is that takeWhile end it with an onCompleted
Doh
.retryWhen(attempts -> {
    return attempts.flatMap(t -> {
            if (t instanceof UbusRpcException)
                return Observable.timer(5, TimeUnit.SECONDS);
            return Observable.error(t);
        });
    })
Javier Domingo Cansino
@txomon
Nov 17 2015 14:27
hmmm
Dorus
@Dorus
Nov 17 2015 14:27
That's how you do it
My first code was correct for any UbusRpcException but not for other errors :)
Dorus
@Dorus
Nov 17 2015 14:38
I'm reading the old code with toblocking I think that one was correct for errors but not for normal flow :P
Actually this (the flatMap one) is very similar to what @artem-zinnatullin gave you as second example for retryWhen
:point_up: 16 november 2015 01:36
Javier Domingo Cansino
@txomon
Nov 17 2015 14:42
true...