Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    David Stemmer
    @weefbellington
    it sounds like you're describing something like a defensive copy
    Dorus
    @Dorus
    @Override
    public Observable<R> call(Observable<T> source2) {
        return Observable.create(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber<? super R> t) {
                Observable<T> source = source2.share();
                Observable<T> offsetSource = source.skip(1);
                source.zipWith(offsetSource, mergeFunc).subscribe(t);
            }
        });
    }
    That works
    Feels a little funky
    David Stemmer
    @weefbellington
    I don't see anything easier, it looks like toObservable just hides the type, it doesn't create a copy that forwards the results to another subscriber
    Dorus
    @Dorus
    I'm trying to write it using Lambdas, looks quite a bit cleaner.
    static <T, R> Transformer<T, R> lookback(Func2<T, T, R> mergeFunc) {  
    return source -> Observable.create(result -> {
            Observable<T> left = source.share();
            Observable<T> right = left.skip(1);
            left.zipWith(right, mergeFunc).subscribe(result);
    });
    }
    David Stemmer
    @weefbellington
    good call on the not propagating the multicasting to downstream subscribers...I think it would be good to get a unit test around this
    if you're curious, what I'm using this for is to take a stream of video positions, [0, 60, 120, 150...] and turn it into a stream of video intervals, [ (0,60), (60, 120), (120, 150) ... ], and then log the start and end time of each interval
    Dorus
    @Dorus
    interesting usecase
    David Stemmer
    @weefbellington
    the stream can be started and stopped when the video is paused/played
    well, technically, the stream is completed on pause, and a new stream is created when playback begins again
    I really didn't want to do an awful stateful thing where I was caching the previous timestamp in a mapping function or something
    Dorus
    @Dorus
    you could use scan
    but then you need to make a tuple each time that keeps the last two values
    The mergfunction is much more elegant
    David Stemmer
    @weefbellington
    yeah I considered a scan/reduce with an accumulator
    but it felt weird
    stuffing a lot of state into an accumulator function is a code smell to me
    Dorus
    @Dorus
    yeah, and you also keep more state than you strictly need.
    indeed
    i want a scan that emit two values, one for the next iteration and one to send downstream
    you can wrap it in a tuple, but then you are constantly creating new tuples, as well as keeping the previous element until the next element arrives.
    David Stemmer
    @weefbellington
    I think accumulators that accumulate too much state should be called "katamari accumulators" :)
    Srepfler Srdan
    @schrepfler
    do people convert Twitter's Future to Observables? Are there utility methods for it?
    David Stemmer
    @weefbellington
    looking for suggestions: when you write a custom Observable, Operator or Transformer, where do they live?
    Simon Baslé
    @simonbasle
    in a project using Rx you mean?
    so far in our project we've either implemented ActionX or FuncX, always close to the using code because generally very specific... not an awful lot of Observables/Operators/Transformers so far but the very few live in a com.company.project.util kind of namespace
    although com.company.project.rx would also make sense I guess
    David Stemmer
    @weefbellington
    Our code uses a fair amount of transformers and a couple of operators and custom observables. I've come up with a few adhoc conventions myself:
    1. if it's a mapping function that just extracts data from a data object, or a predicate related to the data, put the function as a final field on the data object itself: api.getResponse().filter(Response.HAS_ERROR).map(Response.ERROR)
    2. I try to avoid new MyTransformer(), new MyAction() calls everywhere in my code and instead create factory methods. So I might have a RxLogger class that has actions related to logging, which you would call like stringObservable.doOnNext(rxLogger.logDebug)
    3. there's a big class that's just called Operators which contains factory methods for creating custom Transformers, Operators, Functions and Actions. Most of these are generic and aren't associated with any particular data type. I'm trying to get rid of this class but am struggling to come up with a good alternative
    (a lot of this could probably mitigated if I could use lambdas, but we're currently using Android without retrolambda)
    Simon Baslé
    @simonbasle
    1. and 2. sound sane to me, 3. I'd probably split it and put all of it into a util.rx package with as many classes as you have generic Operators, Transformers, Func and Action... but that's just me :)
    Rudi Grinberg
    @rgrinberg
    quick question, suppose I’m handling a click with an observable (i’m using rxjava but i don’t think it matters), i want a subsbcription to fire on every click and get the value of another observable in that subscription. how would i accomplish that?
    i think i need withLatestFrom
    Dorus
    @Dorus
    Sample
    David Stemmer
    @weefbellington
    withLatestFrom looks right to me, but I haven't used it much
    Dorus
    @Dorus
    The difference between sample and withLatestFrom is that sample does not combine the sequences, and does not emit duplicate values from the other subscription if it did not fire since the last click. This might or might not be what you want.
    David Stemmer
    @weefbellington
    to me, the use case that seems pretty natural for withLatestFrom is:
    1. I have a stream of button click events (an Observable<View>)
    2. I have a data stream (an Observable<Data>)
    3. every time the button is clicked, I want to emit the most recent data:clickStream.withLatestFrom(dataStream, (left, right) -> right).subscribe(parseData)
    @Dorus ^^ is that a sane thing to do...?
    Dorus
    @Dorus
    Looks right. I might name left and right click and data or even _ instead of click (.net style). I'm used to .net and the poor thing doesnt even have a withLatestFrom, so i was stuck with Sample.
    David Stemmer
    @weefbellington
    so, a Java-specific question. A class exposes a public static final Observable. Objects from other classes can subscribe to the static Observable. Is this an antipattern? It doesn't smell right to me, but I wanted a second opinion. Normally I'm very wary about memory leaks in these kinds of scenarios, and implications for unit testing.
    David Stemmer
    @weefbellington
    unrelated question:
    it seems like that if an Exception is thrown in a Subscriber's onNext method, the Observable's doOnTerminate callbacks never fire
    is there a way to get around this?
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    @weefbellington are you sure that you've subscribed to the Observable returned after applying doOnTerminate()?
    David Stemmer
    @weefbellington
    @artem-zinnatullin fairly certain, yes. What's happening in my case is that the source observable is emitting some data which comes from a remote API. The data returned by the API side may be malformed. If I try to do something with the malformed data in the onNext method, a RuntimeException may be thrown (NullPointerException, etc.). The behavior I'm seeing is that if a RuntimeExceptionis thrown in the subscriber onNext (or a doOnNext action), the onError callback is firing, but the doOnTerminate actions are not.
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    Looks like a bug, can you please post a failing test as issue to the RxJava repository? https://github.com/ReactiveX/RxJava
    David Stemmer
    @weefbellington
    @artem-zinnatullin I'll do that, thanks
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    :+1:
    fAns1k
    @fAns1k
    hi all, how to get data from some operation above, e.g.
    .flatMap(data -> )
                        .map(data -> mapped)
                        .compose(RxUtil.applyApiSchedulers())
                        .subscribe(nextData -> {
    //here i need to use data from flatMap
                                   },
                                throwable -> {
                                    Timber.e(throwable, "scan was fault");
                                }));