Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Oleh Dokuka
    @OlegDokuka
    @AmrElmasry Observable Just was built with a thought in mind that it may be called from the any thread, so for that purpose the value is wrapped in the Atomic.
    In oposit, FromArray is designed to be synchronous operator
    @Override
            public int requestFusion(int mode) {
                if ((mode & SYNC) != 0) {
                    fusionMode = true;
                    return SYNC;
                }
                return NONE;
            }
    that means that, if FromArrayDisposable will be used as a Queue all pull will be called in the same thread as FromArrayDisposable was created, so there is no Threadsafity required.
    In case if no fusion is supported, we will get synchronous behaviors again since method run, which looks like next:
    void run() {
                T[] a = array;
                int n = a.length;
    
                for (int i = 0; i < n && !isDisposed(); i++) {
                    T value = a[i];
                    if (value == null) {
                        actual.onError(new NullPointerException("The " + i + "th element is null"));
                        return;
                    }
                    actual.onNext(value);
                }
                if (!isDisposed()) {
                    actual.onComplete();
                }
            }
    Oleh Dokuka
    @OlegDokuka
    The only place where this method is being called is subscribe method, which looks like next:
    @Override
        public void subscribeActual(Observer<? super T> s) {
            FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
    
            s.onSubscribe(d);
    
            if (d.fusionMode) {
                return;
            }
    
            d.run();
        }
    So, again there is no additional threadsafety required
    since FromArrayDisposable is created in the same thread as the run method is being called
    David Karnok
    @akarnokd
    @AmrElmasry on Observables, only a certain level of thread safety is usually required. The Disposable you hand to the Observer.onSubscribe must be thread safe and when you call Observer.onNext, onError and onComplete, that has to happen in a sequential manner (no concurrent invocations). FromArrayDisposable's only thread safety comes from the volatile boolean disposed which may be set from dispose() calls coming from any thread. Since there is no backpressure (and ignoring fusion), the emission of the array items happens on the thread which called the operator's subscribe method and doesn't leave it until all items have been emitted or the disposed flag was found to be true. Therefore, no additional thread-safety measures are required. When fusion is enabled, the caller to the pull must ensure sequential call to pull, isEmpty and clear().
    Amr Elmasry
    @AmrElmasry
    @OlegDokuka @akarnokd Thanks a lot :))
    nu007a
    @nu007a
    hello
    • test
    Srepfler Srdan
    @schrepfler
    @akarnokd what’s your view on scalaz’s 8 Race coming out of John De Goes’s IO work?
    David Karnok
    @akarnokd
    Never heard of it. Do you have a link or description?
    Srepfler Srdan
    @schrepfler
    David Karnok
    @akarnokd
    @schrepfler I have a couple of observations:
    • it's in Scala and I don't see how Java or Android would benefit
    • Positions itself relative to Threads, which is a pretty common thing for async solutions lately, completely ignoring the fact that threadpools were invented to help solving the task count vs. thread count problem
    • One value - which maximizes overhead - instead of streaming values
    • Not deferred, can't retry an IO[T] unless you recreate the entire flow?
    • Pretty similar to ReactiveX' fluent API style
    • Fibers - suspension and resumption is the common case - unlike permit-based streaming such as Reactive Streams
    David Karnok
    @akarnokd
    • The "racer" bugs he describes in other libraries is not true for RxJava: amb() will cancel the non-winners immediately.
    • My experience is that when you measure your solution 100x faster, your test might be wrong.
    Srepfler Srdan
    @schrepfler
    :)
    Meena Mana
    @mmanavaz
    ReactiveX/rxjs#3151 I'm facing this issue and have tried the suggested solutions which are not working for me. Any suggestions?
    David Karnok
    @akarnokd
    @mmanavaz This is the RxJava room so I don't think many here do RxJS, which is a completely different library on a different platform.
    Meena Mana
    @mmanavaz
    Oh. OK. Thank you anyway.
    Sylvain Daclin 🖱🕹
    @sdaclin_twitter
    @akarnokd "My experience is that when you measure your solution 100x faster, your test might be wrong." TRUE ;)
    Thomas May
    @cogman

    Hope this is the right place, but I'm looking mostly for advice.
    I have a system that I want to retrofit RXJava on. Right now, we have a problem where our job processing engine dies because it very much works in a batchy fashion. It will pull multiple jobs from the DB, load in a ton of data (millions of items), does a bunch of transforms, and then stores the newly transformed data back in the database in one giant batch.

    Now, as you can imagine, we are running into memory AND database issues. The transactions are too big in the database, and if we get a bunch of large jobs our engine poops out with not OOM execeptions, in that case we have to throttle it way back.

    So, to me, this sounds like a good fit for RXJava. We can switch from doing a giant transaction for storage to a bunch of batches of transactions. We can do most of the transformations on the fly. And we can use backpresssure to keep the system from getting overwhelmed.

    The one thing I'm not sure about how to handle properly is pulling the job items from the database. My thinking is taking the job description, having a fairly large buffer, and then a processor that transforms the job info into job items into a fairly large buffer that can be processed by the downstream at its own pace. My concern is what happens when the buffer fills up? I don't really want the item reader to be blocked, because that will stop other processes from reading those elements (not great).

    Sorry, big block of text, and mostly I just want to know, am I on the right path for conversion or should I be thinking about this a different way?
    Fyodor Sherstobitov
    @fsherstobitov

    Hi everyone! I'm trying to implement REST call caching on Android using RxJava. I have this code:

        override fun loadTasks(): Maybe<List<Task>> {
            Log.d(TAG, "Searching tasks in database")
            return boxJobDao.getAll()
                    .map { boxJobs -> boxJobMapper.entityListToTaskList(boxJobs) }
                    .switchIfEmpty(syncTasks())
        }
    
        private fun syncTasks(): Maybe<List<Task>> {
            Log.d(TAG, "Loading tasks from server")
            return api.boxJobs(DEVICE_ID)
                    .doOnSuccess({ boxJobDtoList ->
                        Log.d(TAG, "${boxJobDtoList.size} box jobs loaded from server")
                        saveToDb(boxJobDtoList)
                    })
                    .doOnError({ error -> Log.d(TAG, "Error during tasks loading", error) })
                    .map { boxJobDtoList -> boxJobMapper.dtoListToTaskList(boxJobDtoList) }
                    .toMaybe()
        }
    
        private fun saveToDb(boxJobDtoList: List<BoxJobDto>?) {
            Log.d(TAG, "Saving box jobs to database")
            boxJobDtoList?.forEach { boxJobDto ->
                try {
                    val boxJob = boxJobMapper.dtoToEntity(boxJobDto)
                    boxJobDao.insert(boxJob)
                    val barcodeReadList = boxJobDto.barcodeReadDtos?.map { dto -> barcodeReadMapper.dtoToEntity(dto) }
                    barcodeReadDao.insertAll(barcodeReadList ?: emptyList())
                } catch (e: Throwable) {
                    Log.e(TAG, "Error during box jobs saving", e)
                }
            }
        }

    I return the list of tasks from DB if there are any. If no tasks found in DB, I switch to REST API, get tasks from it, save to DB and return them to caller.

    The problem is that the code never gets to doOnSuccess callback. Am I doing something wrong?

    David Whetstone
    @humblehacker
    Anyone here had experience with doFinally, doOnTerminate, and/or doAfterTerminate not getting called?
    David Whetstone
    @humblehacker
    Here’s roughly what I’m doing:
    button.clicks()
        .compose(bindToLifecycle())
        .doOnNext { something() }
        .flatMap { somethingReturingAPublishSubject() }
        .doFinally { somethingElse() }
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeBy(
            onNext = { theMainThing() },
            onError = { handleError() })
    RxJava2 on Android
    David Karnok
    @akarnokd
    Those operators require a finite sequence, but I guess clicks() never completes.
    David Whetstone
    @humblehacker
    Thanks, @akarnokd. That was it. I assumed flatMap sort of replaced the original observable with the result of somethingReturningAPublishSubject(), but I learned that it merges the two—completing only when both the original observables complete.
    David Whetstone
    @humblehacker
    I see two ways to handle this. First, use take(1) to make clicks() a finite sequence, but this requires setting up the stream again after every click.
    fun setupButton() {
        button.clicks()
          .take(1)
          .flatMap { somethingReturningAnObservable() }
          .observeOn(AndroidSchedulers.mainThread())
          .doFinally { 
            something() 
            setupButton()
          }
          .subscribeBy(
              onNext = { theMainThing() },
              onError = { handleError() }
          )
      }
    Or, chain the streams:
      fun setupButton2() {
        button.clicks()
          .subscribeBy(
              onNext = { 
                somethingReturningAnObservable()
                  .observeOn(AndroidSchedulers.mainThread())
                  .doFinally { something() }
                  .subscribeBy(
                      onNext = { theMainThing() },
                      onError = { handleError() }
                  )
              }
          )
      }
    I’m leaning toward the second approach, though in general it complicates error handling. Anyone have any thoughts on these approaches?
    Boris Maslakov
    @bmaslakov

    Greetings, RxJava community. I was wondering if there's a cleaner alternative for such:

    obj.method1()
      .flatMap { v -> obj.method2(v).map { v } };

    i.e. we get a result of an operation, start another operation, wait for the second operation to complete and discard its result.

    Option 2 is even more verbose:

    create { subscriber ->
      obj.method1()
        .subscribe { v ->
          obj.method2(v)
            .subscribe {
              subscriber.onNext(v);
            }
        }
    }

    What do you think? Is option 1 is the best we could do?

    David Karnok
    @akarnokd
    Nesting subscribe calls like option 2 is definitely discouraged.
    Yannick Lecaillez
    @ylecaillez
    Hi guys ! I've question regarding resource management:
    I've a Single<Connection> which connect to a server each time i'm subscribing to it.
    Given the following code: singleConnection.timeout(10, SECONDS).subscribe().
    If the timeout occurs, it'll forward an error and cancel() the singleConnection().
    My question: what if the connection finally succeeded after the time out triggered but just before the cancel() been invoked ?
    If nothing is done, i think this race condition will leak the connected connection .
    Now i'm wondering what is the best way to handle such case ?
    one way is to close the connection in singleConnection.cancel(), but given that the singleConnection has already emitted the connection, cancel() should be a no-op ?
    Yannick Lecaillez
    @ylecaillez
    Maybe i need some synchronization between cancel() (dispose() actually) and the onSuccess() notification
    Yannick Lecaillez
    @ylecaillez
    Actually i might have solved this issue with an AtomicBoolean: after dispose() it is guaranteed that the connection has been forwarded to the subscriber OR that it'll never be forwarded to it (disconnecting silently it if the connection happened in the mean time)
    Oleh Dokuka
    @OlegDokuka
    @ylecaillez, Actually, you may always write something like next
    Flux
                    .just("")
                    .flatMap(v -> Flux.just("Opened").delaySubscription(Duration.ofSeconds(2)).doFinally(s -> System.out.println(s)))
                    .timeout(Duration.ofSeconds(2), Flux.just("closed"))
                    .subscribe(System.out::println);
    And in that case always have a chance to cleanup your resources if flatted connection has been canceled
    This example like try with finally
    P.S. This example is written using Reactor 3, but do not worry, in RxJava 2 there is an identical API
    David Karnok
    @akarnokd
    Unfortunately, you can't solve this properly with RxJava 2 and Java 6. Either you need a resource-aware flow implementation which only exist on a drawing board or you need Java 9 and the Cleaner API to handle the falling out of scope. However, the latter can get complicated because you may not know all the references to the Connection object.
    Yannick Lecaillez
    @ylecaillez

    I finally wrote something like

            @Override
            public void dispose() {
                if (done.compareAndSet(false, true) && connection != null) {
                    // Handle the case were we've been cancelled while the connection has actually been established but not
                    // yet forwarded to the subscriber (e.g: because of a slow SSL handshake).
                    connection.closeSilently();
                }
            }

    WDYT ?

    Yannick Lecaillez
    @ylecaillez
    Well, i guess this is a bit too abstract out of its context ... :smile: Anyway, thanks !
    Thomas May
    @cogman

    Is this a bug? I would have expected this to eventually terminate but it never does

        ConnectableFlowable<Integer> publish = Flowable.just(1)
                .publish();
    
        Disposable subscribe = publish.subscribe((e)->System.out.println(e));
        publish.connect();
        publish.blockingSubscribe();

    This is the latest rxJava