These are chat archives for ReactiveX/RxJava

26th
Nov 2017
Oleh Dokuka
@OlegDokuka
Nov 26 2017 08:52
@AmrElmasry Observable Just was built with a thought in mind that it may be called from the any thread, so for that purpose the value is wrapped in the Atomic.
In oposit, FromArray is designed to be synchronous operator
@Override
        public int requestFusion(int mode) {
            if ((mode & SYNC) != 0) {
                fusionMode = true;
                return SYNC;
            }
            return NONE;
        }
that means that, if FromArrayDisposable will be used as a Queue all pull will be called in the same thread as FromArrayDisposable was created, so there is no Threadsafity required.
In case if no fusion is supported, we will get synchronous behaviors again since method run, which looks like next:
void run() {
            T[] a = array;
            int n = a.length;

            for (int i = 0; i < n && !isDisposed(); i++) {
                T value = a[i];
                if (value == null) {
                    actual.onError(new NullPointerException("The " + i + "th element is null"));
                    return;
                }
                actual.onNext(value);
            }
            if (!isDisposed()) {
                actual.onComplete();
            }
        }
Oleh Dokuka
@OlegDokuka
Nov 26 2017 08:57
The only place where this method is being called is subscribe method, which looks like next:
@Override
    public void subscribeActual(Observer<? super T> s) {
        FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);

        s.onSubscribe(d);

        if (d.fusionMode) {
            return;
        }

        d.run();
    }
So, again there is no additional threadsafety required
since FromArrayDisposable is created in the same thread as the run method is being called
David Karnok
@akarnokd
Nov 26 2017 10:08
@AmrElmasry on Observables, only a certain level of thread safety is usually required. The Disposable you hand to the Observer.onSubscribe must be thread safe and when you call Observer.onNext, onError and onComplete, that has to happen in a sequential manner (no concurrent invocations). FromArrayDisposable's only thread safety comes from the volatile boolean disposed which may be set from dispose() calls coming from any thread. Since there is no backpressure (and ignoring fusion), the emission of the array items happens on the thread which called the operator's subscribe method and doesn't leave it until all items have been emitted or the disposed flag was found to be true. Therefore, no additional thread-safety measures are required. When fusion is enabled, the caller to the pull must ensure sequential call to pull, isEmpty and clear().