Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Hiếu Slash
    @hieutrtr
    im new in rx
    could you guys tell me how RX compares with spark. Any use cases for both ?
    Vladimir Baryshnikov
    @vovkab
    Any ideas about what is wrong with backpressure and control flow? Why both rx1 and rx2 doesn't follow request(n)?
    Vladimir Baryshnikov
    @vovkab
    To ensure order of items, I tried to use concatMap() but for some reasons it prefetches 2 items by default and completely ignores request(n) :(
    Jeremy Lyman
    @maarek
    I have a queue that I am wanting to wrap in a Flowable. Items are added to the queue and emitted one at a time. The part that I'm trying to figure out is if I can emit only until an awk occurs. In my current impl, the awk is the removal of the currently emitted item from the queue. I don't want the end subscriber to handle the removal of the item from the queue but they should process the emitted item. Once their onNext is finished can something upstream handle the queue.remove() and the observable emit a new item.
    Does that make sense? Is it possible to have something like a consumer/subscriber that other subscribers are attached to and knows when the downstream subscriber is finished before doing more work?
    Vladimir Baryshnikov
    @vovkab
    @maarek I'm trying to solve something like this too. I need to handle items strictly one by one. I thought I could use backpressure and reactive pull, but it doesn't work the way I would expect.
    Jeremy Lyman
    @maarek
    That's what I was playing with as well.
    But I don't want a ResourceSubscriber to know about the disk cache that I'm working with.
    I'd like a doAfterNext that occured after each item is nabbed by the subscriber, not during emission.
    Jeremy Lyman
    @maarek
    I suppose in my case, a new item doesn't emit until some action occurs. So I don't need to manage the request(1) mechanism. I just need to know when a subscriber is for sure finished with its onNext callback.
    Vladimir Baryshnikov
    @vovkab
    I think this is where request(1) should work, basically saying I'm ready to handle more items. And it works up to some degree, except some operators overproduce and don't follow request(n).
    And then we would be able to do something like Observable.from(queue), and handle items one by one, control flow...
    David Karnok
    @akarnokd
    @vovkab 1.x is sometimes odd, you have to call request(0) in the constructor to suppress the default Long.MAX_VALUE that may get requested before onStart even runs.
    request(n) is a negotiation between subsequent operators and there is no guarantee that any request amount reaches the top producer unchanged. Many operators have their own batching and requestig patterns: even if you request 2 items, observeOn will request 128 and flatMap may go Long.MAX_VALUE
    Vladimir Baryshnikov
    @vovkab
    is there a way to enforce it? observables could be very expensive, and we don't want 128 if we only ever need is 1
    David Karnok
    @akarnokd
    Many operators can be parametrized on buffer size that often translates to prefetch amounts, observeOn and flatMap can be limited to 1 but you should probably rethink the design of your flows.
    Vladimir Baryshnikov
    @vovkab
    Requirements is pretty simple - consume items on demand. I think the issue here is that many operators over produce for no reason, even when control flow explicitly requested.
    David Karnok
    @akarnokd
    RxJava was designed for streaming, that means prefetching some data so in case of a backpressure situation, the pipeline is kept busy as much as possible. Sounds like you want some ping-pong like request-response processing over a longer and modulated chain.
    Vladimir Baryshnikov
    @vovkab
    I understand, but how you suppose to control flow if most operators doesn't follow it?
    Maybe we need something like maxRequestN?
    Jeremy Lyman
    @maarek
    @akarnokd I think I see your answer to my question on StackOverflow. :P This wasn't my SO question but it's sort of what I was looking for. http://stackoverflow.com/questions/27197915/make-sure-item-is-delivered-to-onnext-listener-in-rxjava
    Some mechanism on to know whether a Consumer had actually consumed an event
    Neil Davies
    @neild001
    Hi, I've got a timed buffer, is there anyway I can force this buffer to complete before the timespan? So I can get the items in the list immediately and not have to wait for the full timespan.
    Vladimir Baryshnikov
    @vovkab
    @neild001 you can also specify count, if you need even more control you can provide closing observable.
    Neil Davies
    @neild001
    I'll have a look at the closing observable
    I need something were the user can just say give me what you've got now without waiting for the full length of the timespan. So count won't work, I'll try take a look at closing observables. Hopefully that will so what I need. Thanks. 👍
    Vladimir Baryshnikov
    @vovkab
    If you have for example observabe for user actions, you can connect it as closing observable, and everytime user does something, i.e. click a button, it will emit everything that is currently in buffer.
    Neil Davies
    @neild001
    Ok, that sounds promising. That's pretty​ much what I want to do. Thanks again for pointing me in the right direction.
    Neil Davies
    @neild001
    FYI , this worked, added a timeout to the closing observable so if nobody clicks the button, the buffer empties after the timeout expires.
    Using onErrorReturnItem to transform the timeout error.
    Abhiroj
    @abhiroj
    How to begin RxJava?
    Abhiroj
    @abhiroj
    Hi I am an androis Dev.
    I want a few resources to start RxJava
    Heikki Vesalainen
    @hvesalai
    Hi all! I'm new to RxJava
    I'm looking for something like a Flowable.reduceWith, but instead of immediate values, it would work with Singles... ie. where reduceWith of Flowable<T> takes a BiFunction<R,T,R> and returns Single<R>, I would like to give it BiFunction<R,T,Single<R>>and return Single<R>
    Any ideas if this can be done with the currently available primitives or do I need to implement it my self?
    Heikki Vesalainen
    @hvesalai
    maybe something involving scan and concatMap?
    David Karnok
    @akarnokd
    If not doing custom operator, you need an out-of-band storage for R (and thus defer the whole stream if more than one Subscriber is expected):
    AtomicReference<R> accumulator = new AtomicReference<R>(initialR);
    source.concatMap(t ->
        function.apply(accumulator.get(), t)
            .doOnSuccess(r -> accumulator.set(r))
            .toFlowable()
    )
    .ignoreElements()
    .andThen(Flowable.fromCallable(() -> accumulator.get()))
    ;
    Forgot that there is no ignoreElement in Single.
    Heikki Vesalainen
    @hvesalai
    Ok, so I have to have the external variable
    Heikki Vesalainen
    @hvesalai
    What about the other way round, if I want to generate asynchronously, i.e. like generate, but where my generator function doesn't call onNext synchronously
    I tried doing it with generate, but am getting some IllegalStateException's
    David Karnok
    @akarnokd
    I don't understrand what you want.
    Heikki Vesalainen
    @hvesalai

    I want a function with the signature

    static Flowable<T> generate(T initialState, Function<T, Maybe<T>> generator)

    Where the value of the Maybe<T> is fed to generator to get the next value

    David Karnok
    @akarnokd
    There is an expand operator that works with Flowables.
    Heikki Vesalainen
    @hvesalai
    and furthermore I would like it to be a cold stream, i.e. only generate values on request
    This message was deleted
    Heikki Vesalainen
    @hvesalai
    So like this?
    Flowable.just(initialState).compose(FlowableTransformers.expand(s -> generator(s).toFlowable)).skip(1)