These are chat archives for ReactiveX/RxJava

31st
Oct 2016
Serban Balamaci
@balamaci
Oct 31 2016 12:10
Hi, I'm trying to convert rxjava v1 code to v2, and I'm kinda stuck on
https://gist.github.com/balamaci/802ce99d910bdbfc7a98ec0f9f5de45b
I get a compile error: Error:(111, 49) java: incompatible types: no instance(s) of type variable(s) R,K,V exist so that io.reactivex.Single<R> conforms to org.reactivestreams.Publisher<? extends R>
the error is highlighted over the map. The count() returned Single<Long> but I don't see what the problem is to map it to a Pair, and then the result of flatMap is Flowable<Pair<String, Long>> right?
Serban Balamaci
@balamaci
Oct 31 2016 12:19
I think it's probably because Single is not extending Publisher inside flatMap
Serban Balamaci
@balamaci
Oct 31 2016 13:40
was clarified here ReactiveX/RxJava#4788
Alessandro Vermeulen
@spockz
Oct 31 2016 16:12
I thought flatMap and concatMap where memory friendly. However, when I run the code below for a while I end up with many rx.internal.operators.OperatorMerge$MergeSubscriber and Object[] in memory, with all kind of Operators in memory as well. Is there a way I can implement the functionality below in a more memory-friendly manner?
public class Pinging {

  public Observable<Int> ping(final Integer errors) {
    final rx.Observable<Integer> pingObservable =
              rx.Observable.just(occurredErrors).delay(pingInterval, pingIntervalTimeUnit);

    pingObservable.flatMap(numberOfOccurredErrors ->
      final Observable<Response> pingResponse =
        callMethod(..)

      pingResponse.flatMap( response ->
        ...
        if (response isOk) {
          ping(numberOfOccurredErrors + 1)
        } else {
          ping(0)
        }
      )
    )      
  }

  ..
  ping(0)
}
Alessandro Vermeulen
@spockz
Oct 31 2016 16:43
My fear is that I have to do something like trampoline out of the recursion, but it seems like that shouldn't be necessary for something as simple as this
Alessandro Vermeulen
@spockz
Oct 31 2016 17:28
I now solved this 'manually' by using a publish subject that is used as workqueue, a delayed stream based on the work-queue and the worker that places work items back on the queue.
Mark Paluch
@mp911de
Oct 31 2016 18:15
@here RxJava 1 Single is pure [1|error], not [0|1|error], right?