Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Oleh Dokuka
    @OlegDokuka
    You are welcome :)
    pbagchi
    @pbagchi
    Cross posting from stack overflow, hoping someone will have insight into how to get it working - https://stackoverflow.com/questions/47293445/how-to-passing-traceids-in-hystrix-observables
    lukaszguz
    @lukaszguz

    Hi
    I have problem with RxJavaPlugins.onScheduleHandler.
    At the moment of changing thread pool is invoked

    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
            final Worker w = createWorker();
            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
            DisposeTask task = new DisposeTask(decoratedRun, w);
            w.schedule(task, delay, unit);
            return task;
        }

    my Runnable is wrapped into DisposeTask so I can't check type of action inside scheduleHandler because DisposeTask is package scope.
    Do you have any idea how to solve this problem without reflection? :)

    David Karnok
    @akarnokd
    Unfortunately you can't; it requires exposing the type or at least some API to unwrap the internal wrappers. I don't have much time this week to work the details out though.
    lukaszguz
    @lukaszguz
    Thank you for your answer
    Eugene Popovich
    @httpdispatch

    Have anybody resolved auto removal of the completed subscriptions from the CompositeSubscription task to release memory from unused stuff? The only thing that comes to my mind is something like this:

        public static <T> void subscribeWithAutoRemove(Observable<T> observable, Action1<T> action, CompositeSubscription compositeSubscription) {
            MutableObject<Subscription> subscriptionHolder = new MutableObject<>();
            subscriptionHolder.set(observable
                    .doOnCompleted(() -> {
                        if (subscriptionHolder.isSet()) {
                            compositeSubscription.remove(subscriptionHolder.get());
                        }
                    })
                    .subscribe(action));
            if (!subscriptionHolder.get().isUnsubscribed()) {
                compositeSubscription.add(subscriptionHolder.get());
            }
        }

    Maybe there is a better way?

    David Karnok
    @akarnokd
    A more non-hacky solution is to create a Subscriber upfront and implement its onCompleted and onError to remove itself from the composite:
    import rx.subjects.PublishSubject;
    
    import org.junit.*;
    
    import rx.*;
    import rx.functions.Action1;
    import rx.plugins.RxJavaHooks;
    import rx.subscriptions.CompositeSubscription;
    
    public class AutoRemoveSubscription {
    
        public static <T> void subscribeAutoRelease(
                Observable<T> source, 
                final Action1<T> onNext, 
                CompositeSubscription composite) {
            Subscriber<T> subscriber = new Subscriber<T>() {
                @Override
                public void onCompleted() {
                    composite.remove(this);
                }
                @Override
                public void onError(Throwable e) {
                    composite.remove(this);
                    RxJavaHooks.onError(e);
                }
                @Override
                public void onNext(T t) {
                    try {
                        onNext.call(t);
                    } catch (Throwable ex) {
                        unsubscribe();
                        onError(ex);
                    }
                }
            };
            composite.add(subscriber);
            source.subscribe(subscriber);
        }
    
        @Test
        public void test() {
            CompositeSubscription composite = new CompositeSubscription();
    
            PublishSubject<Integer> ps = PublishSubject.create();
    
            subscribeAutoRelease(ps, System.out::println, composite);
    
            Assert.assertTrue(composite.hasSubscriptions());
    
            ps.onNext(1);
            ps.onCompleted();
    
            Assert.assertFalse(composite.hasSubscriptions());
        }
    }
    Eugene Popovich
    @httpdispatch
    @akarnokd thanks, that looks much better
    Eugene Popovich
    @httpdispatch
    @akarnokd is it possible to extend your solution to auto release if unsubscribe is called instead of onCompleted?
    or in addition to onCompleted
    Is it correct?
    subscriber.add(new Subscription() {
                boolean unsubscribed = false;
    
                @Override public void unsubscribe() {
                    unsubscribed = true;
                    composite.remove(subscriber);
                }
    
                @Override public boolean isUnsubscribed() {
                    return unsubscribed;
                }
            });
    Eugene Popovich
    @httpdispatch
    Looks like that works. Final solution if anybody need it. Thanks @akarnokd for great help
        public static <T> Subscription subscribeAutoRelease(
                Observable<T> source,
                final Action1<T> onNext,
                CompositeSubscription composite) {
            Subscriber<T> subscriber = new Subscriber<T>() {
                @Override
                public void onCompleted() {
                    composite.remove(this);
                }
    
                @Override
                public void onError(Throwable e) {
                    // simulate same behaviour as in the method {@link Observable#subscribe(Action1)}
                    InternalObservableUtils.ERROR_NOT_IMPLEMENTED.call(e);
                }
    
                @Override
                public void onNext(T t) {
                    try {
                        onNext.call(t);
                    } catch (Throwable ex) {
                        unsubscribe();
                        onError(ex);
                    }
                }
            };
            subscriber.add(new Subscription() {
                boolean unsubscribed = false;
    
                @Override public void unsubscribe() {
                    unsubscribed = true;
                    composite.remove(subscriber);
                }
    
                @Override public boolean isUnsubscribed() {
                    return unsubscribed;
                }
            });
            composite.add(subscriber);
            return source.subscribe(subscriber);
        }
    
    @Test
        public void testSubscribeAutoRelease() {
            CompositeSubscription composite = new CompositeSubscription();
    
            PublishSubject<Integer> ps = PublishSubject.create();
    
    
            //
    
            Subscription subscription = RxUtils.subscribeAutoRelease(ps, System.out::println, composite);
    
            Assert.assertTrue(composite.hasSubscriptions());
    
            ps.onNext(1);
    
            subscription.unsubscribe();
    
            Assert.assertFalse(composite.hasSubscriptions());
    
            //
    
            RxUtils.subscribeAutoRelease(ps, System.out::println, composite);
    
            Assert.assertTrue(composite.hasSubscriptions());
    
            ps.onNext(1);
            ps.onCompleted();
    
            Assert.assertFalse(composite.hasSubscriptions());
        }
    Paco
    @pakoito
    what is the most common way of writing stack-safe flatMap chains?
    is any of the constructor operators stack-safe? or maybe fold or any of these?
    Amr Elmasry
    @AmrElmasry
    @akarnokd Hello, Is there any criteria for an operator that makes its implementation must be thread safe or not?
    I am reading the implementation for ObservableJust and ObservableFromArrray, and noticed that ObservableJust creates ScalarDisposable which is thread safe but ObservableFromArrray creates FromArrayDisposable which is not, what is the difference between them?
    Oleh Dokuka
    @OlegDokuka
    @AmrElmasry Observable Just was built with a thought in mind that it may be called from the any thread, so for that purpose the value is wrapped in the Atomic.
    In oposit, FromArray is designed to be synchronous operator
    @Override
            public int requestFusion(int mode) {
                if ((mode & SYNC) != 0) {
                    fusionMode = true;
                    return SYNC;
                }
                return NONE;
            }
    that means that, if FromArrayDisposable will be used as a Queue all pull will be called in the same thread as FromArrayDisposable was created, so there is no Threadsafity required.
    In case if no fusion is supported, we will get synchronous behaviors again since method run, which looks like next:
    void run() {
                T[] a = array;
                int n = a.length;
    
                for (int i = 0; i < n && !isDisposed(); i++) {
                    T value = a[i];
                    if (value == null) {
                        actual.onError(new NullPointerException("The " + i + "th element is null"));
                        return;
                    }
                    actual.onNext(value);
                }
                if (!isDisposed()) {
                    actual.onComplete();
                }
            }
    Oleh Dokuka
    @OlegDokuka
    The only place where this method is being called is subscribe method, which looks like next:
    @Override
        public void subscribeActual(Observer<? super T> s) {
            FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
    
            s.onSubscribe(d);
    
            if (d.fusionMode) {
                return;
            }
    
            d.run();
        }
    So, again there is no additional threadsafety required
    since FromArrayDisposable is created in the same thread as the run method is being called
    David Karnok
    @akarnokd
    @AmrElmasry on Observables, only a certain level of thread safety is usually required. The Disposable you hand to the Observer.onSubscribe must be thread safe and when you call Observer.onNext, onError and onComplete, that has to happen in a sequential manner (no concurrent invocations). FromArrayDisposable's only thread safety comes from the volatile boolean disposed which may be set from dispose() calls coming from any thread. Since there is no backpressure (and ignoring fusion), the emission of the array items happens on the thread which called the operator's subscribe method and doesn't leave it until all items have been emitted or the disposed flag was found to be true. Therefore, no additional thread-safety measures are required. When fusion is enabled, the caller to the pull must ensure sequential call to pull, isEmpty and clear().
    Amr Elmasry
    @AmrElmasry
    @OlegDokuka @akarnokd Thanks a lot :))
    nu007a
    @nu007a
    hello
    • test
    Srepfler Srdan
    @schrepfler
    @akarnokd what’s your view on scalaz’s 8 Race coming out of John De Goes’s IO work?
    David Karnok
    @akarnokd
    Never heard of it. Do you have a link or description?
    Srepfler Srdan
    @schrepfler
    David Karnok
    @akarnokd
    @schrepfler I have a couple of observations:
    • it's in Scala and I don't see how Java or Android would benefit
    • Positions itself relative to Threads, which is a pretty common thing for async solutions lately, completely ignoring the fact that threadpools were invented to help solving the task count vs. thread count problem
    • One value - which maximizes overhead - instead of streaming values
    • Not deferred, can't retry an IO[T] unless you recreate the entire flow?
    • Pretty similar to ReactiveX' fluent API style
    • Fibers - suspension and resumption is the common case - unlike permit-based streaming such as Reactive Streams
    David Karnok
    @akarnokd
    • The "racer" bugs he describes in other libraries is not true for RxJava: amb() will cancel the non-winners immediately.
    • My experience is that when you measure your solution 100x faster, your test might be wrong.
    Srepfler Srdan
    @schrepfler
    :)
    Meena Mana
    @mmanavaz
    ReactiveX/rxjs#3151 I'm facing this issue and have tried the suggested solutions which are not working for me. Any suggestions?
    David Karnok
    @akarnokd
    @mmanavaz This is the RxJava room so I don't think many here do RxJS, which is a completely different library on a different platform.
    Meena Mana
    @mmanavaz
    Oh. OK. Thank you anyway.
    Sylvain Daclin 🖱🕹
    @sdaclin_twitter
    @akarnokd "My experience is that when you measure your solution 100x faster, your test might be wrong." TRUE ;)
    Thomas May
    @cogman

    Hope this is the right place, but I'm looking mostly for advice.
    I have a system that I want to retrofit RXJava on. Right now, we have a problem where our job processing engine dies because it very much works in a batchy fashion. It will pull multiple jobs from the DB, load in a ton of data (millions of items), does a bunch of transforms, and then stores the newly transformed data back in the database in one giant batch.

    Now, as you can imagine, we are running into memory AND database issues. The transactions are too big in the database, and if we get a bunch of large jobs our engine poops out with not OOM execeptions, in that case we have to throttle it way back.

    So, to me, this sounds like a good fit for RXJava. We can switch from doing a giant transaction for storage to a bunch of batches of transactions. We can do most of the transformations on the fly. And we can use backpresssure to keep the system from getting overwhelmed.

    The one thing I'm not sure about how to handle properly is pulling the job items from the database. My thinking is taking the job description, having a fairly large buffer, and then a processor that transforms the job info into job items into a fairly large buffer that can be processed by the downstream at its own pace. My concern is what happens when the buffer fills up? I don't really want the item reader to be blocked, because that will stop other processes from reading those elements (not great).

    Sorry, big block of text, and mostly I just want to know, am I on the right path for conversion or should I be thinking about this a different way?
    Fyodor Sherstobitov
    @fsherstobitov

    Hi everyone! I'm trying to implement REST call caching on Android using RxJava. I have this code:

        override fun loadTasks(): Maybe<List<Task>> {
            Log.d(TAG, "Searching tasks in database")
            return boxJobDao.getAll()
                    .map { boxJobs -> boxJobMapper.entityListToTaskList(boxJobs) }
                    .switchIfEmpty(syncTasks())
        }
    
        private fun syncTasks(): Maybe<List<Task>> {
            Log.d(TAG, "Loading tasks from server")
            return api.boxJobs(DEVICE_ID)
                    .doOnSuccess({ boxJobDtoList ->
                        Log.d(TAG, "${boxJobDtoList.size} box jobs loaded from server")
                        saveToDb(boxJobDtoList)
                    })
                    .doOnError({ error -> Log.d(TAG, "Error during tasks loading", error) })
                    .map { boxJobDtoList -> boxJobMapper.dtoListToTaskList(boxJobDtoList) }
                    .toMaybe()
        }
    
        private fun saveToDb(boxJobDtoList: List<BoxJobDto>?) {
            Log.d(TAG, "Saving box jobs to database")
            boxJobDtoList?.forEach { boxJobDto ->
                try {
                    val boxJob = boxJobMapper.dtoToEntity(boxJobDto)
                    boxJobDao.insert(boxJob)
                    val barcodeReadList = boxJobDto.barcodeReadDtos?.map { dto -> barcodeReadMapper.dtoToEntity(dto) }
                    barcodeReadDao.insertAll(barcodeReadList ?: emptyList())
                } catch (e: Throwable) {
                    Log.e(TAG, "Error during box jobs saving", e)
                }
            }
        }

    I return the list of tasks from DB if there are any. If no tasks found in DB, I switch to REST API, get tasks from it, save to DB and return them to caller.

    The problem is that the code never gets to doOnSuccess callback. Am I doing something wrong?

    David Whetstone
    @humblehacker
    Anyone here had experience with doFinally, doOnTerminate, and/or doAfterTerminate not getting called?
    David Whetstone
    @humblehacker
    Here’s roughly what I’m doing:
    button.clicks()
        .compose(bindToLifecycle())
        .doOnNext { something() }
        .flatMap { somethingReturingAPublishSubject() }
        .doFinally { somethingElse() }
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeBy(
            onNext = { theMainThing() },
            onError = { handleError() })
    RxJava2 on Android
    David Karnok
    @akarnokd
    Those operators require a finite sequence, but I guess clicks() never completes.
    David Whetstone
    @humblehacker
    Thanks, @akarnokd. That was it. I assumed flatMap sort of replaced the original observable with the result of somethingReturningAPublishSubject(), but I learned that it merges the two—completing only when both the original observables complete.