These are chat archives for ReactiveX/RxJava

8th
Mar 2017
Exerosis
@Exerosis
Mar 08 2017 01:02
if I had an Observable o = source.replay(1, TimeUnit.MINUTES).autoConnect();
then immediately called o.subscribe(...); then two minutes later called o.subscribe(...); again. Would source.subscribe be called a second time or have I completely misunderstood the replay operator?
Zak Taccardi
@ZakTaccardi
Mar 08 2017 01:06
the source is subscribed once
and is then multicasted
you can easily test that behavior with a junit test and logging
Exerosis
@Exerosis
Mar 08 2017 01:08
Thank you very much!
Zak Taccardi
@ZakTaccardi
Mar 08 2017 01:08
anything you add to the stream after autoConnect will be executed again for each subscription
Vladimir Tagakov
@Tagakov
Mar 08 2017 17:58

Hi guys! I am trying to use backpressure to handle data paging. I have an observable which requests N new chunks of data from server for every request(N). I wish to fetch a new chunk of data after user clicks a button. I've tried to use this approach

Observable.zip(dataFromServer(), userActions(), (data, ø) -> data).subscribe(showData());

But because of RxRingBuffer with the size of 16 inside the zip operator the upper construction after subscription immediately requests 16 chunks of data from the server. This is totally inappropriate. Even if I could change the buffer size to 1 the behavior of the upper construction is inapropriate. The data would be requested before the first user action, but I want it to be first requested after the first user action. It seems like my approach is totally wrong. Can you help?

Vladimir Tagakov
@Tagakov
Mar 08 2017 18:29
I've managed to accomplish appropriate behavior, but it looks ugly. Maybe there is a better way?
val userActions = Observable.interval(1000, TimeUnit.MILLISECONDS).share();//user clicks every second//
val dataFromServer = Observable.range(1, 10);//data has 10 pages//

dataFromServer
        .doOnNext(it -> System.out.println("Received from server" + it))
        .flatMap(it -> userActions.take(1).map(ø -> it), 1)
        .doOnNext(it -> System.out.println("Before data showing" + it))
        .subscribe(showData());
Yilin Wei
@yilinwei
Mar 08 2017 19:44
@Tagakov I think you want a Observable[Action] and then use a scan to keep track of the state (the page which you're on), then flatMap after your showData call to put it in the widen stream if you want a Observable[Data].
Vladimir Tagakov
@Tagakov
Mar 08 2017 19:59
Thank you @yilinwei . Unfortunately I have a stateful server and can't request arbitrary page without requesting all previous pages. Although I can track state in some class and provide api to get Single with the next page, I'm trying to keep state tracking in my Observable. Maybe I misunderstood your suggestion. Can you please provide a little snippet of code using userActions and dataFromServer observables from my previous snippet, they are fairly accurate models of my situation, or could you change them in a way you think is best?
Yilin Wei
@yilinwei
Mar 08 2017 20:00
Sure, gimme a sec.
Vladimir Tagakov
@Tagakov
Mar 08 2017 20:03
Is this what you mean?
userActions
        .onBackpressureDrop()
        .scan(0, (acc, ø) -> ++acc)
        .concatMap(it -> requestPage(it))
        .subscribe(showData());
Yilin Wei
@yilinwei
Mar 08 2017 20:04
Yeah pretty much; using scan as the state holder.
I think you can do flatMap instead of concatMap though.
Vladimir Tagakov
@Tagakov
Mar 08 2017 20:07
This approach don't use backpressure mechanics at all and it forces me to create method requesting new page. I can rewrite this code like this:
userActions.
    .flatMap(ø -> requestNextPage(), 1)
    .subscribe(showData());
Yilin Wei
@yilinwei
Mar 08 2017 20:08
I had assumed your userActions wasn't tied to your request
so it was the number of clicks
rather than what was in them
(which is why the scan increments the number)
Also, where do you want the backpressure? surely it's just after showData?
Vladimir Tagakov
@Tagakov
Mar 08 2017 20:10
gimme a sec
Yilin Wei
@yilinwei
Mar 08 2017 20:11
(and also don't you want to buffer rather than drop...?)
Vladimir Tagakov
@Tagakov
Mar 08 2017 20:11
No, if the user clicks when data still loading the click should have no effect
Yilin Wei
@yilinwei
Mar 08 2017 20:13
eh...? Is that what you're trying to use backpressure for with your 1 thread?
Vladimir Tagakov
@Tagakov
Mar 08 2017 20:15

The ugly api

interface Api {
        void openConnection();
        void hasNextPage();
        Single<Data> nextPage();
        void closeConnection();
    }

Can be totally replaced by Observable contract. openConnection() would be replaced by 'subscription', hasNextPage() would be replaced by completion, nextPage() would be replaced by request(1) from subscriber and closeConnection() would be replaced by unsubscription

Yilin Wei
@yilinwei
Mar 08 2017 20:16
I see what you're trying to do now.
Vladimir Tagakov
@Tagakov
Mar 08 2017 20:17
Is it a bad idea to use backpressure mechanics?
Yilin Wei
@yilinwei
Mar 08 2017 20:18
My own personal opinion, in this particular case where you want finegrained control, it's not a good idea.
the backpressure in this case will also come from showData rather than anything else I think.
I would be tempted to change the API as an Observable[(Data, Option[Single[Data]])]
Rather than just a Observable[Data]
Vladimir Tagakov
@Tagakov
Mar 08 2017 20:29
I can't imagine the way to collect all pages from server with your approach. In my case it would be
dataWithBackpressureSupport().toList().subscribe(processAllPagesList());
Yilin Wei
@yilinwei
Mar 08 2017 20:31
Hmm? It's more I'd expose a paged API with that particular signature so I could lazily pull the data from within the Observable by unwrapping the Single. The None represents no more pages. In the case that you didn't need to care about the fact that you have pages you can just expose an Observable[Data]
Vladimir Tagakov
@Tagakov
Mar 08 2017 20:34
Can you please show me the snippet which unwraps the Single?
Yilin Wei
@yilinwei
Mar 08 2017 20:36
? .toObservable() of the Single within a flatMap or concatMap will unwrap it into the wider stream?
Vladimir Tagakov
@Tagakov
Mar 08 2017 20:36
Yes but it will only work if we have only two pages
Yilin Wei
@yilinwei
Mar 08 2017 20:41
  val  o: Observable[(Int, Option[Single[Int]])] = ???
  val o2: Observable[Int] = o.flatMap {
    case (data, Some(single)) =>
      single.asJava.toObservable.asScala
    case (data, None) => Observable(data)  
  }
Ignore the fact that this is a separate wrapper which I'm doing, but this lifts them all into the top level stream no?
Vladimir Tagakov
@Tagakov
Mar 08 2017 20:52
I am not sure that I correctly understand what you was written. But if my understanding of your code correct you lose the first page (which goes in pair with Option[Single]) if you have none-empty Option. And I can't see where you keep info for third and subsequent pages. Let me translate your code to Java
Observable<Pair<Integer, Optional<Single<Integer>>> o = ???
Observable<Integer> o2 = o.flatMap(it -> {
  Integer data = it.first;
  if (it.second.isPresent()) {
    return it.second.get().toObservable();
  }
  return Observable.just(data);
});
Yilin Wei
@yilinwei
Mar 08 2017 20:53
Oh yeah, just do a apply and concat for the first page
Vladimir Tagakov
@Tagakov
Mar 08 2017 20:53
It will not change the fact that the third page would be never fetched.
Yilin Wei
@yilinwei
Mar 08 2017 20:54
The Observable should emit more than one item?
Vladimir Tagakov
@Tagakov
Mar 08 2017 20:55
I don't know that was your example of api with paging. I assumed that your Observable only emits the first page and the Option with Single for the second page.
Yilin Wei
@yilinwei
Mar 08 2017 21:01
oic, apologies, I'm being a twit; You're right you, want to include the handle to the next one emitted in the Single as well.
(though you'd need either a recursive data structure or upcasts, like this Single[(A, Option[Single[_]])]
@Tagakov This is what I mean
Vladimir Tagakov
@Tagakov
Mar 08 2017 21:36
looks awful, sorry. I think my example was much more laconic. Providing api which forces user to use such constructions is a crime
I mean that on Java your construction would be nightmarish
Yilin Wei
@yilinwei
Mar 08 2017 21:39
haha. That's certainly true. Do you have java users primarily?
(note recurse would be hidden)
Vladimir Tagakov
@Tagakov
Mar 08 2017 21:41
yep, only Java =(
Yilin Wei
@yilinwei
Mar 08 2017 21:43
I see. You can do something similar to the groupBy operator (expose a custom observable with a data and a tail). Or, you could probably use a custom subject to get the behaviour you want.
My own preference is to be explicit about these types of behaviours so I wouldn't rely on any implicit behaviours of the streams in this type of use case; but I'm not your user so you should do what you think they'll use most :wink: