Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Dorus
    @Dorus
    yeah i already did. Didn't update the gist
    updated
    David Stemmer
    @weefbellington
    I'm running on Android right now and I don't have an environment set up where I can test Rx functions quickly
    I need to build a regular Java project
    Dorus
    @Dorus
    I just have eclipse open here
    do 0
    do 1
    sb 0 1
    do 2
    sb 1 2
    do 3
    sb 2 3
    Thats what i see with share()
    David Stemmer
    @weefbellington
    that seems right, I'm not sure why I wasn't seeing the double emissions in practice
    Dorus
    @Dorus
    I tried to put a Thread.sleep(100); between the lines in call and it doesn't give any problems. Guess that part is synchronized somewhere.
    David Stemmer
    @weefbellington
    thanks for catching that. I really need to set up a minimal Java sandbox environment so I can test this stuff more easily.
    Dorus
    @Dorus
    yeah, it takes some time to figure out all things that are happening within Rx. Some code runs once on creation, some for every subscription, some for every element. If you duplicate your subscription somewhere, you run it twice for every event.
    David Stemmer
    @weefbellington
    I think I see what was happening in my case. The source observable had already been multicasted with share()
    Dorus
    @Dorus
    Yup, that hides the culprit.
    David Stemmer
    @weefbellington
    Thanks again! shame on me for not isolating the component under test
    Dorus
    @Dorus
    Mm, bit annoying that the share in the transformer now effect separate subscriptions also. I wonder if that's avoidable.
    Perhaps with observable.create?
    David Stemmer
    @weefbellington
    it sounds like you're describing something like a defensive copy
    Dorus
    @Dorus
    @Override
    public Observable<R> call(Observable<T> source2) {
        return Observable.create(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber<? super R> t) {
                Observable<T> source = source2.share();
                Observable<T> offsetSource = source.skip(1);
                source.zipWith(offsetSource, mergeFunc).subscribe(t);
            }
        });
    }
    That works
    Feels a little funky
    David Stemmer
    @weefbellington
    I don't see anything easier, it looks like toObservable just hides the type, it doesn't create a copy that forwards the results to another subscriber
    Dorus
    @Dorus
    I'm trying to write it using Lambdas, looks quite a bit cleaner.
    static <T, R> Transformer<T, R> lookback(Func2<T, T, R> mergeFunc) {  
    return source -> Observable.create(result -> {
            Observable<T> left = source.share();
            Observable<T> right = left.skip(1);
            left.zipWith(right, mergeFunc).subscribe(result);
    });
    }
    David Stemmer
    @weefbellington
    good call on the not propagating the multicasting to downstream subscribers...I think it would be good to get a unit test around this
    if you're curious, what I'm using this for is to take a stream of video positions, [0, 60, 120, 150...] and turn it into a stream of video intervals, [ (0,60), (60, 120), (120, 150) ... ], and then log the start and end time of each interval
    Dorus
    @Dorus
    interesting usecase
    David Stemmer
    @weefbellington
    the stream can be started and stopped when the video is paused/played
    well, technically, the stream is completed on pause, and a new stream is created when playback begins again
    I really didn't want to do an awful stateful thing where I was caching the previous timestamp in a mapping function or something
    Dorus
    @Dorus
    you could use scan
    but then you need to make a tuple each time that keeps the last two values
    The mergfunction is much more elegant
    David Stemmer
    @weefbellington
    yeah I considered a scan/reduce with an accumulator
    but it felt weird
    stuffing a lot of state into an accumulator function is a code smell to me
    Dorus
    @Dorus
    yeah, and you also keep more state than you strictly need.
    indeed
    i want a scan that emit two values, one for the next iteration and one to send downstream
    you can wrap it in a tuple, but then you are constantly creating new tuples, as well as keeping the previous element until the next element arrives.
    David Stemmer
    @weefbellington
    I think accumulators that accumulate too much state should be called "katamari accumulators" :)
    Srepfler Srdan
    @schrepfler
    do people convert Twitter's Future to Observables? Are there utility methods for it?
    David Stemmer
    @weefbellington
    looking for suggestions: when you write a custom Observable, Operator or Transformer, where do they live?
    Simon Baslé
    @simonbasle
    in a project using Rx you mean?
    so far in our project we've either implemented ActionX or FuncX, always close to the using code because generally very specific... not an awful lot of Observables/Operators/Transformers so far but the very few live in a com.company.project.util kind of namespace
    although com.company.project.rx would also make sense I guess
    David Stemmer
    @weefbellington
    Our code uses a fair amount of transformers and a couple of operators and custom observables. I've come up with a few adhoc conventions myself:
    1. if it's a mapping function that just extracts data from a data object, or a predicate related to the data, put the function as a final field on the data object itself: api.getResponse().filter(Response.HAS_ERROR).map(Response.ERROR)
    2. I try to avoid new MyTransformer(), new MyAction() calls everywhere in my code and instead create factory methods. So I might have a RxLogger class that has actions related to logging, which you would call like stringObservable.doOnNext(rxLogger.logDebug)
    3. there's a big class that's just called Operators which contains factory methods for creating custom Transformers, Operators, Functions and Actions. Most of these are generic and aren't associated with any particular data type. I'm trying to get rid of this class but am struggling to come up with a good alternative
    (a lot of this could probably mitigated if I could use lambdas, but we're currently using Android without retrolambda)
    Simon Baslé
    @simonbasle
    1. and 2. sound sane to me, 3. I'd probably split it and put all of it into a util.rx package with as many classes as you have generic Operators, Transformers, Func and Action... but that's just me :)
    Rudi Grinberg
    @rgrinberg
    quick question, suppose I’m handling a click with an observable (i’m using rxjava but i don’t think it matters), i want a subsbcription to fire on every click and get the value of another observable in that subscription. how would i accomplish that?
    i think i need withLatestFrom
    Dorus
    @Dorus
    Sample