Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    nu007a
    @nu007a
    hello
    • test
    Srepfler Srdan
    @schrepfler
    @akarnokd what’s your view on scalaz’s 8 Race coming out of John De Goes’s IO work?
    David Karnok
    @akarnokd
    Never heard of it. Do you have a link or description?
    Srepfler Srdan
    @schrepfler
    David Karnok
    @akarnokd
    @schrepfler I have a couple of observations:
    • it's in Scala and I don't see how Java or Android would benefit
    • Positions itself relative to Threads, which is a pretty common thing for async solutions lately, completely ignoring the fact that threadpools were invented to help solving the task count vs. thread count problem
    • One value - which maximizes overhead - instead of streaming values
    • Not deferred, can't retry an IO[T] unless you recreate the entire flow?
    • Pretty similar to ReactiveX' fluent API style
    • Fibers - suspension and resumption is the common case - unlike permit-based streaming such as Reactive Streams
    David Karnok
    @akarnokd
    • The "racer" bugs he describes in other libraries is not true for RxJava: amb() will cancel the non-winners immediately.
    • My experience is that when you measure your solution 100x faster, your test might be wrong.
    Srepfler Srdan
    @schrepfler
    :)
    Meena Mana
    @mmanavaz
    ReactiveX/rxjs#3151 I'm facing this issue and have tried the suggested solutions which are not working for me. Any suggestions?
    David Karnok
    @akarnokd
    @mmanavaz This is the RxJava room so I don't think many here do RxJS, which is a completely different library on a different platform.
    Meena Mana
    @mmanavaz
    Oh. OK. Thank you anyway.
    Sylvain Daclin 🖱🕹
    @sdaclin_twitter
    @akarnokd "My experience is that when you measure your solution 100x faster, your test might be wrong." TRUE ;)
    Thomas May
    @cogman

    Hope this is the right place, but I'm looking mostly for advice.
    I have a system that I want to retrofit RXJava on. Right now, we have a problem where our job processing engine dies because it very much works in a batchy fashion. It will pull multiple jobs from the DB, load in a ton of data (millions of items), does a bunch of transforms, and then stores the newly transformed data back in the database in one giant batch.

    Now, as you can imagine, we are running into memory AND database issues. The transactions are too big in the database, and if we get a bunch of large jobs our engine poops out with not OOM execeptions, in that case we have to throttle it way back.

    So, to me, this sounds like a good fit for RXJava. We can switch from doing a giant transaction for storage to a bunch of batches of transactions. We can do most of the transformations on the fly. And we can use backpresssure to keep the system from getting overwhelmed.

    The one thing I'm not sure about how to handle properly is pulling the job items from the database. My thinking is taking the job description, having a fairly large buffer, and then a processor that transforms the job info into job items into a fairly large buffer that can be processed by the downstream at its own pace. My concern is what happens when the buffer fills up? I don't really want the item reader to be blocked, because that will stop other processes from reading those elements (not great).

    Sorry, big block of text, and mostly I just want to know, am I on the right path for conversion or should I be thinking about this a different way?
    Fyodor Sherstobitov
    @fsherstobitov

    Hi everyone! I'm trying to implement REST call caching on Android using RxJava. I have this code:

        override fun loadTasks(): Maybe<List<Task>> {
            Log.d(TAG, "Searching tasks in database")
            return boxJobDao.getAll()
                    .map { boxJobs -> boxJobMapper.entityListToTaskList(boxJobs) }
                    .switchIfEmpty(syncTasks())
        }
    
        private fun syncTasks(): Maybe<List<Task>> {
            Log.d(TAG, "Loading tasks from server")
            return api.boxJobs(DEVICE_ID)
                    .doOnSuccess({ boxJobDtoList ->
                        Log.d(TAG, "${boxJobDtoList.size} box jobs loaded from server")
                        saveToDb(boxJobDtoList)
                    })
                    .doOnError({ error -> Log.d(TAG, "Error during tasks loading", error) })
                    .map { boxJobDtoList -> boxJobMapper.dtoListToTaskList(boxJobDtoList) }
                    .toMaybe()
        }
    
        private fun saveToDb(boxJobDtoList: List<BoxJobDto>?) {
            Log.d(TAG, "Saving box jobs to database")
            boxJobDtoList?.forEach { boxJobDto ->
                try {
                    val boxJob = boxJobMapper.dtoToEntity(boxJobDto)
                    boxJobDao.insert(boxJob)
                    val barcodeReadList = boxJobDto.barcodeReadDtos?.map { dto -> barcodeReadMapper.dtoToEntity(dto) }
                    barcodeReadDao.insertAll(barcodeReadList ?: emptyList())
                } catch (e: Throwable) {
                    Log.e(TAG, "Error during box jobs saving", e)
                }
            }
        }

    I return the list of tasks from DB if there are any. If no tasks found in DB, I switch to REST API, get tasks from it, save to DB and return them to caller.

    The problem is that the code never gets to doOnSuccess callback. Am I doing something wrong?

    David Whetstone
    @humblehacker
    Anyone here had experience with doFinally, doOnTerminate, and/or doAfterTerminate not getting called?
    David Whetstone
    @humblehacker
    Here’s roughly what I’m doing:
    button.clicks()
        .compose(bindToLifecycle())
        .doOnNext { something() }
        .flatMap { somethingReturingAPublishSubject() }
        .doFinally { somethingElse() }
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeBy(
            onNext = { theMainThing() },
            onError = { handleError() })
    RxJava2 on Android
    David Karnok
    @akarnokd
    Those operators require a finite sequence, but I guess clicks() never completes.
    David Whetstone
    @humblehacker
    Thanks, @akarnokd. That was it. I assumed flatMap sort of replaced the original observable with the result of somethingReturningAPublishSubject(), but I learned that it merges the two—completing only when both the original observables complete.
    David Whetstone
    @humblehacker
    I see two ways to handle this. First, use take(1) to make clicks() a finite sequence, but this requires setting up the stream again after every click.
    fun setupButton() {
        button.clicks()
          .take(1)
          .flatMap { somethingReturningAnObservable() }
          .observeOn(AndroidSchedulers.mainThread())
          .doFinally { 
            something() 
            setupButton()
          }
          .subscribeBy(
              onNext = { theMainThing() },
              onError = { handleError() }
          )
      }
    Or, chain the streams:
      fun setupButton2() {
        button.clicks()
          .subscribeBy(
              onNext = { 
                somethingReturningAnObservable()
                  .observeOn(AndroidSchedulers.mainThread())
                  .doFinally { something() }
                  .subscribeBy(
                      onNext = { theMainThing() },
                      onError = { handleError() }
                  )
              }
          )
      }
    I’m leaning toward the second approach, though in general it complicates error handling. Anyone have any thoughts on these approaches?
    Boris Maslakov
    @bmaslakov

    Greetings, RxJava community. I was wondering if there's a cleaner alternative for such:

    obj.method1()
      .flatMap { v -> obj.method2(v).map { v } };

    i.e. we get a result of an operation, start another operation, wait for the second operation to complete and discard its result.

    Option 2 is even more verbose:

    create { subscriber ->
      obj.method1()
        .subscribe { v ->
          obj.method2(v)
            .subscribe {
              subscriber.onNext(v);
            }
        }
    }

    What do you think? Is option 1 is the best we could do?

    David Karnok
    @akarnokd
    Nesting subscribe calls like option 2 is definitely discouraged.
    Yannick Lecaillez
    @ylecaillez
    Hi guys ! I've question regarding resource management:
    I've a Single<Connection> which connect to a server each time i'm subscribing to it.
    Given the following code: singleConnection.timeout(10, SECONDS).subscribe().
    If the timeout occurs, it'll forward an error and cancel() the singleConnection().
    My question: what if the connection finally succeeded after the time out triggered but just before the cancel() been invoked ?
    If nothing is done, i think this race condition will leak the connected connection .
    Now i'm wondering what is the best way to handle such case ?
    one way is to close the connection in singleConnection.cancel(), but given that the singleConnection has already emitted the connection, cancel() should be a no-op ?
    Yannick Lecaillez
    @ylecaillez
    Maybe i need some synchronization between cancel() (dispose() actually) and the onSuccess() notification
    Yannick Lecaillez
    @ylecaillez
    Actually i might have solved this issue with an AtomicBoolean: after dispose() it is guaranteed that the connection has been forwarded to the subscriber OR that it'll never be forwarded to it (disconnecting silently it if the connection happened in the mean time)
    Oleh Dokuka
    @OlegDokuka
    @ylecaillez, Actually, you may always write something like next
    Flux
                    .just("")
                    .flatMap(v -> Flux.just("Opened").delaySubscription(Duration.ofSeconds(2)).doFinally(s -> System.out.println(s)))
                    .timeout(Duration.ofSeconds(2), Flux.just("closed"))
                    .subscribe(System.out::println);
    And in that case always have a chance to cleanup your resources if flatted connection has been canceled
    This example like try with finally
    P.S. This example is written using Reactor 3, but do not worry, in RxJava 2 there is an identical API
    David Karnok
    @akarnokd
    Unfortunately, you can't solve this properly with RxJava 2 and Java 6. Either you need a resource-aware flow implementation which only exist on a drawing board or you need Java 9 and the Cleaner API to handle the falling out of scope. However, the latter can get complicated because you may not know all the references to the Connection object.
    Yannick Lecaillez
    @ylecaillez

    I finally wrote something like

            @Override
            public void dispose() {
                if (done.compareAndSet(false, true) && connection != null) {
                    // Handle the case were we've been cancelled while the connection has actually been established but not
                    // yet forwarded to the subscriber (e.g: because of a slow SSL handshake).
                    connection.closeSilently();
                }
            }

    WDYT ?

    Yannick Lecaillez
    @ylecaillez
    Well, i guess this is a bit too abstract out of its context ... :smile: Anyway, thanks !
    Thomas May
    @cogman

    Is this a bug? I would have expected this to eventually terminate but it never does

        ConnectableFlowable<Integer> publish = Flowable.just(1)
                .publish();
    
        Disposable subscribe = publish.subscribe((e)->System.out.println(e));
        publish.connect();
        publish.blockingSubscribe();

    This is the latest rxJava

    David Karnok
    @akarnokd
    You deadlock because the publish completed and is awaiting for the next connect() call. Otherwise, it is a shortcoming of the ConnectableX API we aim to fix in 3.x: ReactiveX/RxJava#5628
    Thomas May
    @cogman
    What would be the proposed behavior of this in 3.0? Would the post connect subscriber just get a onComplete/onError if the publisher is finished or would it get something else?
    David Karnok
    @akarnokd
    Please read the linked issue.
    Restmad
    @restmad
    Wow!
    Thomas May
    @cogman

    @akarnokd I guess I was misreading it.

    This paragraph I think is the one you are refering to

    In the fresh state, consumers can pile up and be ready to receive events. An atomic state change to running will begin streaming events until a terminal event is reached. Consumers subscribing in this state will always receive the terminal event, and in case of replay, the cached items as well.

    And I'm guessing the "this state" in the final sentence is referring to the terminal state and not the running state, correct?

    David Karnok
    @akarnokd
    Yes. I've updated the sentence to clear up this ambiguity.
    Matteo Moci
    @mox601
    Hi everyone! I am trying to have a Single start only when a list of other Single completes. The following code seem to work with Flowable, but I can’t find a way to translate to Single:
    <T> Flowable<T> whenAll(List<Flowable<T>> flowables) {
            return Flowable.fromIterable(flowables).flatMap(a -> a);
        }
    I am not sure that your Flowable code works as you expect, I'd use something like zip for it too
    Oleh Dokuka
    @OlegDokuka
    @mox601 In case if you need to run all Single in parallel and wait until all have been completed you should use following API
    <T> Completable whenAll(List<Single<T>> singles) {
        return Single.merge(singles).ignoreElements();
    }