Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Stanislav Shakirov
    @punksta
    Petar Shomov
    @pshomov
    hello
    Petar Shomov
    @pshomov
    fellows, I am using RxJava 1.x and using a ReplaySubject to keep a trailing list of some values my app generates. I need to take the last N of that list (as it stands at the time of the invocation) and use them as another observable (a cold one) and use it to calculate something for one of my APIs. Seems since my original ReplaySubject never gets completed this whole idea falls apart. Any suggestions?
    ViTORossonero
    @ViTORossonero
    you can try .take(N) to create Observable that completes from ReplaySubject
    although if buffer of ReplaySubject is less than N items and it isn't full - .take(N) will wait till N items is emmited from ReplaySubject
    Petar Shomov
    @pshomov
    thank you @ViTORossonero. ReplaySubject has .getValues() which gives me back the items it has buffered for replay but unfortunately the time aspect of it is lost which is sort of a problem for any time-related operations in the "new" cold observable
    Luciano
    @lJoublanc
    @ViTORossonero try observable slidingBuffer (n,1) map ( list => Observable from calculateSomething(list) ) : Observable[ColdObservable[_]]
    You may use some of the other slidingBuffer methods if you want it the elements emitted within the last x seconds, rather than a #. See the API doc.
    Francisco A. Lozano
    @flozano
    Hi, quick q... if I have a Observable(myIterable).toBlocking().subscribe(some::thing), should I expect all the items in the observable to end up in memory?
    Luciano
    @lJoublanc
    @flozano Not sure if I recall correctly, but I don't think you have a 'subscribe' on blocking observable. Check the docs for 'BlockingObservable' - the answer to your Q depends on which call you use instead of 'subscribe'.
    e.g. using latest or next not all elems would be buffered.
    Alex Shpak
    @alex-shpak
    @flozano @lJoublanc
    I think .toList().toBlocking().first() should to the trick. And yes, you don't need to subscribe
    Luciano
    @lJoublanc
    @alex-shpak Not sure the he wants everything to be buffered.. not quite clear in his question.
    mmynsted
    @mmynsted
    What is the best way to repeat until an item emitted satisfies a particular quality? repeatUntil seems to be able to have a BooleanSupplier but BooleanSupplier would not inspect the item emitted. (Unless there is a way that I am not thinking of)
    mmynsted
    @mmynsted
    Here is an example using a callable, but I think that it should be possible to do this more rx like.
    Bojan Šernek
    @baadc0de
    A Flowable is put into a function (let's call it 'A') that returns a Flowable as well (let's call it 'B'). An implicit contract says that the new flowable will emit one item for every item emitted by the original Flowable, but not necessarily soon (there could be a .mapAsync and similar stuff in there). I would like to write a function that gets one item from 'A' and one item from 'B' in such a way that I know that what I got from 'B' was derived from what I got from 'A'. Hopefully that makes sense :)
    Bojan Šernek
    @baadc0de
    Would this work?
      override def apply(t: Flowable[AType]): Flowable[AType] = {
        t.flatMap(buffer => {
          Flowable.zip(f(Flowable.just(buffer)), Flowable.just(buffer), (processed: AType, unprocessed: AType) => {
            if (decision) unprocessed else processed
          })
        })
      }
    in this context f is the other function that returns the new 1:1 flowable
    Exerosis
    @Exerosis
    Is it possible to map one Observable to another using the scheduler specified with subscribeOn?
    Pavel Meledin
    @btbvoy

    Hi everyone, could somebody please give me a hint what a sequence of operators to use in order to have the following task done:

    1. As input for a stream I have pairs of (time and price)
    2. Then I need to group them per each 5 seconds based on time column, so I need to get a pair of (time and a list of prices)
    3. Then I need to convert each of pair of time and prices into the following format: case class Bar(time: Int, o: Int, h: Int, l: Int, c: Int, v: Int) where o - first price during this period of time, h - max price, l - lower price and c - just latest price appeared in a list.
    4. Then each such Bar needs to be sent further by stream to further subscribers.

    Very important to get an updated bar from point 4 for each time when new pair of time and price were pushed into the stream.

    Appreciate any help :-)

    ViTORossonero
    @ViTORossonero
    @btbvoy you can use smth like groupBy + scan
    items()
                .groupBy {
                    it.time / TimeUnit.SECONDS.toMillis(5)
                }
                .flatMap {
                    it.scan(ArrayList<Item>()) {
                        acc, el -> acc.add(el); acc
                    }
                            .filter { it.isNotEmpty() }
                            .map(::toBar)
    
                }
    although there're possible memory issues if items() is infinite
    Pavel Meledin
    @btbvoy
    @ViTORossonero thanks for example. would it work with window operation ?
    ViTORossonero
    @ViTORossonero
    @btbvoy i think that depends on what and when input stream emits:
    does time from input stream event pair somehow correlate with actual time when given event emits?
    If yes, window is what you need
    Paco
    @pakoito
    one question for RxJava 2, on a flowable
    .map(new Function<Pair<String, ?>, T>() {
                        @Override
                        public T apply(Pair<String, ?> stringPair) {
                            return (T) stringPair.second;
                        }
                    })
    in case of a ClassCastException I'm not getting a call to public void onError(Throwable e)
    instead it just goes and fails completely
    in RxJava2 it'll go to onError with a ClasCastException that I'd be testing as
                        @Override
                        public void onError(Throwable e) {
                            if (!(e instanceof ClassCastException)) {
                                Assert.fail(e.getMessage());
                            }
                        }
    Paco
    @pakoito
    it seems like in RxJava1 the subscription to the original value in line 284 never returns an exception, only the one in 266, whereas in RxJava2 it fails for 284 immediately
    book.write(key, wrongValue).test().assertError(ClassCastException.class)
    that just works
    which is weird, because write is subscribed with the correct type, and it's only the forwarding that triggers the exception
    RxJava 2 propagates from here into write, RxJava 1 does it into the updates subject: https://github.com/pakoito/RxPaper/blob/master/library/src/main/java/com/pacoworks/rxpaper/RxPaperBook.java#L151
    and moving the subject call to onNext to a doAfterTerminate doesn't fix it
    oh well, behaviour changes then
    Paco
    @pakoito
    public <T> Completable write(final String key, final T value) {
            return Completable.fromAction(new Action() {
                @Override
                public void run() {
                    book.write(key, value);
                }
            }).andThen(Completable.fromAction(new Action() {
                @Override
                public void run() throws Exception {
                    try {
                        updates.onNext(Pair.create(key, value));
                    } catch (ClassCastException t) {
                        updates.onError(t);
                    }
                }
            })).subscribeOn(scheduler);
        }
    that reproduces the old behaviour
    Igor Trncic
    @igortrncic
    Hi, I'm curios how do you organize code? What is criteria for creating classes that are providing getObservable kind of methods? Or is there some nice open source projects that are handling this in a nice way?
    David Karnok
    @akarnokd
    @pakoito What you experienced sounds like a resource management issue. In 2.x, the Disposable.dispose is not called from downstream on an onComplete() unlike in 1.x where the SafeSubscriber always calls unsubscribe. The standard way of dealing with resources along a stream is using() and performing final action via doFinally.
    Vojtech Polivka
    @vojtapol
    hi guys

    I need to convert some callbacks to ReactiveX elements (Observable, Completable, Subject), but I am not sure what to pick. The problem is that the callback provides no value so it should be Completable but it is triggered multiple times so it should be Observable. What to choose? the normal use is this:

    socket.on("connect", args -> doSomething())

    where I know that args are always empty array.

    Vojtech Polivka
    @vojtapol
    would making Observable<java.lang.Void> be a good idea in this case?
    David Karnok
    @akarnokd
    A version-safe would be Observable<Object> and you ignore the value in the processing.
    Vojtech Polivka
    @vojtapol
    what do you mean by version-safe?
    David Karnok
    @akarnokd
    Void requires null values which are forbidden in 2.x; you'd save a lot of headache with Object such as (Integer)1
    Vojtech Polivka
    @vojtapol
    oh.. snap!