Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Sudhir Nimavat
    @snimavat
    using filter is not an option probably, as in my case filter cant decide what to filter on to give each subscriber unique items
    What i have is a huge collection that i want to be able to process in parallel by few different subscribers
    Oleh Dokuka
    @OlegDokuka
    try to look at parallel operator
    Sudhir Nimavat
    @snimavat
    @OlegDokuka that works, how would i go about blocking the main thread for the parallel flowable to finish !?
    Sudhir Nimavat
    @snimavat
    sequential()
    sorry for the noise
    Oleh Dokuka
    @OlegDokuka
    hmm
    Sudhir Nimavat
    @snimavat
    .sequential().blockingForEach({ }) -- is what i was looking for. RxJava2
    Thanks @OlegDokuka - Appriciated, I got it working.
    Oleh Dokuka
    @OlegDokuka
    You are welcome :)
    pbagchi
    @pbagchi
    Cross posting from stack overflow, hoping someone will have insight into how to get it working - https://stackoverflow.com/questions/47293445/how-to-passing-traceids-in-hystrix-observables
    lukaszguz
    @lukaszguz

    Hi
    I have problem with RxJavaPlugins.onScheduleHandler.
    At the moment of changing thread pool is invoked

    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
            final Worker w = createWorker();
            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
            DisposeTask task = new DisposeTask(decoratedRun, w);
            w.schedule(task, delay, unit);
            return task;
        }

    my Runnable is wrapped into DisposeTask so I can't check type of action inside scheduleHandler because DisposeTask is package scope.
    Do you have any idea how to solve this problem without reflection? :)

    David Karnok
    @akarnokd
    Unfortunately you can't; it requires exposing the type or at least some API to unwrap the internal wrappers. I don't have much time this week to work the details out though.
    lukaszguz
    @lukaszguz
    Thank you for your answer
    Eugene Popovich
    @httpdispatch

    Have anybody resolved auto removal of the completed subscriptions from the CompositeSubscription task to release memory from unused stuff? The only thing that comes to my mind is something like this:

        public static <T> void subscribeWithAutoRemove(Observable<T> observable, Action1<T> action, CompositeSubscription compositeSubscription) {
            MutableObject<Subscription> subscriptionHolder = new MutableObject<>();
            subscriptionHolder.set(observable
                    .doOnCompleted(() -> {
                        if (subscriptionHolder.isSet()) {
                            compositeSubscription.remove(subscriptionHolder.get());
                        }
                    })
                    .subscribe(action));
            if (!subscriptionHolder.get().isUnsubscribed()) {
                compositeSubscription.add(subscriptionHolder.get());
            }
        }

    Maybe there is a better way?

    David Karnok
    @akarnokd
    A more non-hacky solution is to create a Subscriber upfront and implement its onCompleted and onError to remove itself from the composite:
    import rx.subjects.PublishSubject;
    
    import org.junit.*;
    
    import rx.*;
    import rx.functions.Action1;
    import rx.plugins.RxJavaHooks;
    import rx.subscriptions.CompositeSubscription;
    
    public class AutoRemoveSubscription {
    
        public static <T> void subscribeAutoRelease(
                Observable<T> source, 
                final Action1<T> onNext, 
                CompositeSubscription composite) {
            Subscriber<T> subscriber = new Subscriber<T>() {
                @Override
                public void onCompleted() {
                    composite.remove(this);
                }
                @Override
                public void onError(Throwable e) {
                    composite.remove(this);
                    RxJavaHooks.onError(e);
                }
                @Override
                public void onNext(T t) {
                    try {
                        onNext.call(t);
                    } catch (Throwable ex) {
                        unsubscribe();
                        onError(ex);
                    }
                }
            };
            composite.add(subscriber);
            source.subscribe(subscriber);
        }
    
        @Test
        public void test() {
            CompositeSubscription composite = new CompositeSubscription();
    
            PublishSubject<Integer> ps = PublishSubject.create();
    
            subscribeAutoRelease(ps, System.out::println, composite);
    
            Assert.assertTrue(composite.hasSubscriptions());
    
            ps.onNext(1);
            ps.onCompleted();
    
            Assert.assertFalse(composite.hasSubscriptions());
        }
    }
    Eugene Popovich
    @httpdispatch
    @akarnokd thanks, that looks much better
    Eugene Popovich
    @httpdispatch
    @akarnokd is it possible to extend your solution to auto release if unsubscribe is called instead of onCompleted?
    or in addition to onCompleted
    Is it correct?
    subscriber.add(new Subscription() {
                boolean unsubscribed = false;
    
                @Override public void unsubscribe() {
                    unsubscribed = true;
                    composite.remove(subscriber);
                }
    
                @Override public boolean isUnsubscribed() {
                    return unsubscribed;
                }
            });
    Eugene Popovich
    @httpdispatch
    Looks like that works. Final solution if anybody need it. Thanks @akarnokd for great help
        public static <T> Subscription subscribeAutoRelease(
                Observable<T> source,
                final Action1<T> onNext,
                CompositeSubscription composite) {
            Subscriber<T> subscriber = new Subscriber<T>() {
                @Override
                public void onCompleted() {
                    composite.remove(this);
                }
    
                @Override
                public void onError(Throwable e) {
                    // simulate same behaviour as in the method {@link Observable#subscribe(Action1)}
                    InternalObservableUtils.ERROR_NOT_IMPLEMENTED.call(e);
                }
    
                @Override
                public void onNext(T t) {
                    try {
                        onNext.call(t);
                    } catch (Throwable ex) {
                        unsubscribe();
                        onError(ex);
                    }
                }
            };
            subscriber.add(new Subscription() {
                boolean unsubscribed = false;
    
                @Override public void unsubscribe() {
                    unsubscribed = true;
                    composite.remove(subscriber);
                }
    
                @Override public boolean isUnsubscribed() {
                    return unsubscribed;
                }
            });
            composite.add(subscriber);
            return source.subscribe(subscriber);
        }
    
    @Test
        public void testSubscribeAutoRelease() {
            CompositeSubscription composite = new CompositeSubscription();
    
            PublishSubject<Integer> ps = PublishSubject.create();
    
    
            //
    
            Subscription subscription = RxUtils.subscribeAutoRelease(ps, System.out::println, composite);
    
            Assert.assertTrue(composite.hasSubscriptions());
    
            ps.onNext(1);
    
            subscription.unsubscribe();
    
            Assert.assertFalse(composite.hasSubscriptions());
    
            //
    
            RxUtils.subscribeAutoRelease(ps, System.out::println, composite);
    
            Assert.assertTrue(composite.hasSubscriptions());
    
            ps.onNext(1);
            ps.onCompleted();
    
            Assert.assertFalse(composite.hasSubscriptions());
        }
    Paco
    @pakoito
    what is the most common way of writing stack-safe flatMap chains?
    is any of the constructor operators stack-safe? or maybe fold or any of these?
    Amr Elmasry
    @AmrElmasry
    @akarnokd Hello, Is there any criteria for an operator that makes its implementation must be thread safe or not?
    I am reading the implementation for ObservableJust and ObservableFromArrray, and noticed that ObservableJust creates ScalarDisposable which is thread safe but ObservableFromArrray creates FromArrayDisposable which is not, what is the difference between them?
    Oleh Dokuka
    @OlegDokuka
    @AmrElmasry Observable Just was built with a thought in mind that it may be called from the any thread, so for that purpose the value is wrapped in the Atomic.
    In oposit, FromArray is designed to be synchronous operator
    @Override
            public int requestFusion(int mode) {
                if ((mode & SYNC) != 0) {
                    fusionMode = true;
                    return SYNC;
                }
                return NONE;
            }
    that means that, if FromArrayDisposable will be used as a Queue all pull will be called in the same thread as FromArrayDisposable was created, so there is no Threadsafity required.
    In case if no fusion is supported, we will get synchronous behaviors again since method run, which looks like next:
    void run() {
                T[] a = array;
                int n = a.length;
    
                for (int i = 0; i < n && !isDisposed(); i++) {
                    T value = a[i];
                    if (value == null) {
                        actual.onError(new NullPointerException("The " + i + "th element is null"));
                        return;
                    }
                    actual.onNext(value);
                }
                if (!isDisposed()) {
                    actual.onComplete();
                }
            }
    Oleh Dokuka
    @OlegDokuka
    The only place where this method is being called is subscribe method, which looks like next:
    @Override
        public void subscribeActual(Observer<? super T> s) {
            FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
    
            s.onSubscribe(d);
    
            if (d.fusionMode) {
                return;
            }
    
            d.run();
        }
    So, again there is no additional threadsafety required
    since FromArrayDisposable is created in the same thread as the run method is being called
    David Karnok
    @akarnokd
    @AmrElmasry on Observables, only a certain level of thread safety is usually required. The Disposable you hand to the Observer.onSubscribe must be thread safe and when you call Observer.onNext, onError and onComplete, that has to happen in a sequential manner (no concurrent invocations). FromArrayDisposable's only thread safety comes from the volatile boolean disposed which may be set from dispose() calls coming from any thread. Since there is no backpressure (and ignoring fusion), the emission of the array items happens on the thread which called the operator's subscribe method and doesn't leave it until all items have been emitted or the disposed flag was found to be true. Therefore, no additional thread-safety measures are required. When fusion is enabled, the caller to the pull must ensure sequential call to pull, isEmpty and clear().
    Amr Elmasry
    @AmrElmasry
    @OlegDokuka @akarnokd Thanks a lot :))
    nu007a
    @nu007a
    hello
    • test
    Srepfler Srdan
    @schrepfler
    @akarnokd what’s your view on scalaz’s 8 Race coming out of John De Goes’s IO work?
    David Karnok
    @akarnokd
    Never heard of it. Do you have a link or description?
    Srepfler Srdan
    @schrepfler
    David Karnok
    @akarnokd
    @schrepfler I have a couple of observations:
    • it's in Scala and I don't see how Java or Android would benefit
    • Positions itself relative to Threads, which is a pretty common thing for async solutions lately, completely ignoring the fact that threadpools were invented to help solving the task count vs. thread count problem
    • One value - which maximizes overhead - instead of streaming values
    • Not deferred, can't retry an IO[T] unless you recreate the entire flow?
    • Pretty similar to ReactiveX' fluent API style
    • Fibers - suspension and resumption is the common case - unlike permit-based streaming such as Reactive Streams
    David Karnok
    @akarnokd
    • The "racer" bugs he describes in other libraries is not true for RxJava: amb() will cancel the non-winners immediately.
    • My experience is that when you measure your solution 100x faster, your test might be wrong.
    Srepfler Srdan
    @schrepfler
    :)
    Meena Mana
    @mmanavaz
    ReactiveX/rxjs#3151 I'm facing this issue and have tried the suggested solutions which are not working for me. Any suggestions?