Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Eugene Popovich
    @httpdispatch

    @akarnokd I have 2 event bus like processors. One is for requests second is for results. There may be multiple request handlers which will post a results to the result processor. This may be either synchronously or asynchronously. I want to make a request and wait for the result in the same flowable

    Simplified case

    PublishProcessor<Integer> requestBus = PublishProcessor.create();
    PublishProcessor<Pair<Integer, Integer>> resultBus = PublishProcessor.create();
    requestBus.filter(value -> value == 1)
            .map(value -> Pair.create(value, 10))
            .subscribe(resultBus);
    
    Integer request = 1;
    Flowable<Integer> resultFlowable = resultBus
            .filter(requestResultPair -> requestResultPair.first == request)
            .doOnSubscribe(__ -> requestBus.onNext(request));
    Gautam
    @im-gautam
    Nn
    James Nugent
    @jen20
    Hi, is anyone aware of an operator with semantics similar to Timeout, but where you can mutate the last value and remain subscribed on a timeout?
    David Karnok
    @akarnokd
    There is no standard operator for that, however, you could adapt this answer by using a BehaviorSubject flatMapped in instead of the constant and subscribe the BehaviorSubject to the shared observable in publish.
    James Nugent
    @jen20
    Thanks @akarnokd, that looks like it shold work
    Andrei Sereda
    @asereda-gs

    Hello.

    I'm trying to implement a functionality which can be described as "striped" Observer. The idea is pretty simple, I want Observers (subscriptions) for a “stripe” to be single-threaded.

    Stated differently, I would like to select a Scheduler (for observeOn operation) based on current event. Think something like observeOn(Function<T, Scheduler>) instead of current observeOn(Scheduler).

    Simple example which seem to work but not when called multiple times:

    @Test
    public void example() throws Exception {
    final List<Scheduler> schedulers = Stream.generate(() -> Schedulers.from(Executors.newSingleThreadExecutor())).limit(10).collect(Collectors.toList());
    final Function<String, String> keyFn = s -> s;

    // select scheduler for each element
    final Function<String, Scheduler> schedulerFn = key -> schedulers.get(Math.abs(key.hashCode()) % schedulers.size());
    
    Observable.just("one", "two", "three", "one", "two", "four")
            .groupBy(i -> i) // this is value -> key function
            .flatMap(g -> g.subscribeOn(schedulerFn.apply(g.getKey())))
            .subscribe(e -> System.out.printf("key=%s value=%s thread=%s\n", e, e, Thread.currentThread().getName()));
    
    Thread.sleep(1_000);

    }

    Unfortunately this doesn’t work if called multiple times. I have created a test:
    Any help would be appreciated
    Eugene Popovich
    @httpdispatch
    @asereda-gs use observeOn insteadof subscribeOn
    or create new observable in the flatMap
    Observable.just(g).subscribeOn(schedulerFn.apply(g.getKey())
    David Karnok
    @akarnokd
    g.subscribeOn moves the subscription to another thread but g is a hot Observable and therefore subscribeOn has no practical effect on it. Use observeOn but note that your setup will still execute the onNext handler of subscribe sequentially. You should add computation after g.observeOn(...).op().op() before merging the results. Also note that Observable.just(g) will likely still not work as there is no reason to turn g into a nested source.
    Andrei Sereda
    @asereda-gs
    @akarnokd something like this ?
    observable.groupBy(keyFn::apply)
    .flatMap(o -> o.observeOn(fn.apply(o.getKey())).map(dummy -> dummy))
    .subscribe(e -> {
    state.computeIfAbsent(e, ignore -> new CopyOnWriteArraySet<>()).add(Thread.currentThread().getName());
    latch.countDown();
    });
    if the problem in Observable.just() for test as the source ?
    @httpdispatch I did try with both observeOn() and subscribeOn() same result
    Andrei Sereda
    @asereda-gs
    Observable.just("one", "two", "three", "one", "two", "four")
            .groupBy(i -> i)
            .flatMap(g -> g.observeOn(schedulerFn.apply(g.getKey())).map(dummy -> dummy))
            .subscribe(e -> System.out.printf("key=%s value=%s thread=%s\n", e, e, Thread.currentThread().getName()));
    key=one value=one thread=pool-3-thread-1
    key=one value=one thread=pool-3-thread-1
    key=two value=two thread=pool-3-thread-1
    key=two value=two thread=pool-3-thread-1
    key=four value=four thread=pool-3-thread-1
    key=three value=three thread=pool-3-thread-1
    I would like subscription to happen in different threads
    David Karnok
    @akarnokd
    FlatMap can reuse the same thread to emit items coming from different threads. You should put the computation into that map(dummy -> dummy) part.
    Andrei Sereda
    @asereda-gs
    How can I "isolate" that functionality as
    Observable Transformer / Operation ?
    Observable.compose( .... ).subscribe( )
    David Karnok
    @akarnokd
    .flatMap(g -> g.observeOn(schedulerFn.apply(g.getKey())).doOnNext(e -> System.out.printf("key=%s value=%s thread=%s\n", e, e, Thread.currentThread().getName())))
    Andrei Sereda
    @asereda-gs
    Thanks David. What I'm looking is something like this:
    // consumer is "single-threaded"
    Consumer<String> consumer = str -> {
      state.computeIfAbsent(str, ignore -> ConcurrentHashMap.newKeySet()).add(Thread.currentThread().getName());
      latch.countDown();
    };
    
    observable.compose(dynamicSchedulerSubscription(fn)).subscribe(consumer::accept);
    private static <T, K> Observable.Transformer<T, T> dynamicSchedulerSubscription(Function<T, Scheduler> fn) {
    // this doesn't work
    return obs -> obs.flatMap(o -> Observable.just(o).observeOn(fn.apply(o)));
    }
    with doOnNext() I can't really hook Observers.
    Perhaps I'm missing something ?
    David Karnok
    @akarnokd
    <T, K> ObservableTransformer<GroupedObservable<T>, T> dynamicSchedulerSubscription(Function<K, Scheduler> fn,
            Consumer<? super T> consumer {
        return o -> o.flatMap(g -> g.observeOn(schedulerFn.apply(g.getKey())).doOnNext(consumer));
    }
    
        Observable.just("one", "two", "three", "one", "two", "four")
                .groupBy(i -> i)
                .compose(dynamicSchedulerSubscription(fn, consumer))
                .subscribe(v -> { }, Throwable::printStackTrace);
    Andrei Sereda
    @asereda-gs
    But what if I don't know consumer in advance ?
    ideally, I would like to return
    observable.compose(dynamicSchedulerSubscription(fn))
    then clients subscribe with (perhaps) different consumers
    Andrei Sereda
    @asereda-gs
    ie I would like to have a Transformer (or Operator) which would force onNext() execution on particular Scheduler
    maybe there will be multiple subscribers.
    But coupling Observable.Transformer and Consumer doesn't feel RXish
    pls correct me if I'm wrong
    Observable<String> singleThreaded = observable.compose(dynamicSchedulerSubscription(fn));
    // somewhere (later) in the code
    singleThreaded.subscribe(consumer1);
    singleThreaded.subscribe(consumer2);
    singleThreaded.subscribe(consumer3);
    David Karnok
    @akarnokd
    Those consumers will be invoked in a sequential manner, so thread hopping between two invocation serves no purpose as those still won't happen concurrently with each other within a subscriber.
    Andrei Sereda
    @asereda-gs
    what about something like that :
    // compose() or lift()
    observable1.compose(dynamicSchedulerSubscription(fn)).subscribe(consumer::accept);
    observable2.compose(dynamicSchedulerSubscription(fn)).subscribe(consumer::accept);
    observable3.compose(dynamicSchedulerSubscription(fn)).subscribe(consumer::accept);
    David Karnok
    @akarnokd
    Makes no sense if you just fork and join back the flow without doing anything inside the forks, which would require some external callbacks provided to the dynamicSchedulerSubscription, i.e., consumer.
    Andrei Sereda
    @asereda-gs
    thanks for clarification, David. And sorry if I ask too many questions.
    the usecase I have is that my consumer(s) is single-threaded (for a particular key) so I can either wrap consumer in StripedLock or schedule execution on a striped single-thread scheduler.
    Ideally I didn't want to expose consumer inside dynamicSchedulerSubscription transformer since it makes it less generic.
    But it seems there is no way around it ?
    Jeff Morin
    @adempus
    What exactly is RxJava?
    Yannick Lecaillez
    @ylecaillez
    Hi all !
    For information, we've run some benchmark of our product and we noticed some contention on CompositeDisposable add/delete (from FlowableFlatMapCompletableCompletable .FlatMapCompletableMainSubscriber). These methods are in synchronized block.
    We switched to standard FlowableFlatMap which solved the contention (but i think it is adding a bit more pressure on GC because of the queue allocation. But i did not check that point).
    This is just for informative purpose in case you're looking for some area to optimize ;-)
    Yannick Lecaillez
    @ylecaillez
    I was thinking about doing it myself and was thinking about using (i think) the same approach than ConcurrentHashMap: use multiple OpenHashSet instance and distribute the add/remove/lock on these instances based on the disposable hashCode. Disposable.hashCode would then be invoked twice but i think it should actually be Object.hashCode() most of the time so it should not be that costly.
    WDYT ?
    Sthg like (pseudo code) void add(Disposable d) { OpenHashSet h = getHashSet(d.hashCode() % concurrencyFactor); synchronized(h) { h.add(d) }; }
    or maybe synchronized on another object from a lock table so that we can create hash set on-demand... anyway.
    Would you accept such PR or maybe you have a better way to solve it ?
    Eugene Popovich
    @httpdispatch

    I've experienced unexpected behaviour using share operator

    @Test public void testConcatShare(){
            Maybe<Boolean> maybe1 = Maybe.defer(() -> {
                System.out.println("Call from maybe1");
                return Maybe.empty();
            });
            Maybe<Boolean> maybe2 = Maybe.defer(() -> {
                System.out.println("Call from maybe2");
                return Maybe.empty();
            });
            Flowable<Boolean> flowable= Maybe.concat(maybe1, maybe2).share();
            flowable.subscribe(System.out::println);
            flowable.subscribe(System.out::println);
        }

    unexpectedly prints

    Call from maybe1
    Call from maybe2
    Call from maybe1
    Call from maybe2

    Should i use cache() operator in such case instead of share to avoid repeating calls in defer method?

    David Karnok
    @akarnokd
    @ylecaillez lock striping could improve performance, but requires a known number of stripes so that all of them can be cancelled. The problem is that the particular CompositeDisposable is somewhat deep inside that operator. Also arbitrarily forcing striping in this operator may be not what the users want or expect. If that flatMapCompletable is a performance bottleneck for you, you could implement a variant which features more than one CompositeDisposable set (as individual fields or as an array). There you don't have to change CompositeDisposable but the locations of set.add(), set.delete() and set.dispose() calls to pick the right stripe.
    Otherwise, I don't think a change in RxJava is necessary.
    Yannick Lecaillez
    @ylecaillez
    Got it, thank you.
    Actually using flatMap() rather than flatMapCompletable() already solved the problem for us. It is just counter intuitive that a more "specialized" method (flatMapCompletable) is actually less efficient then the more "general" one (flatMapCompletable).
    for our use-case, at least :)
    sorry, the more "general" one being flatMap() not flatMapCompletable()