Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Ilya Arkhanhelsky
    @iarkhanhelsky
    It works. But as far as I understand doc, publish creates ConnectableObservable. Which waits connect before emitting somethig. Sounds like not really aimed at current problem, right? Or I get it wrong? Or I need publish just to create new observable which keeps needed objects.
    Until I map it again by the way
    Dorus
    @Dorus
    You can use refCount, or do both subscriptions and then connect() yes.
    Simon Baslé
    @simonbasle
    hi guys, can someone shed some light on how flatMap(Func1<T, Observable<R>> f, int maxConcurrent) works?
    specifically, is that a mean of propagating a backpressure request upstream?
    say I have a bursty source, like Observable.from(aCollectionOfThousandsOfStrings)
    would that flatMap signature with maxConcurrent of, say, 100 ensure that no more than 100 invocations of the map function are made at a time?
    (effectively consuming 100 strings from the source, making the corresponding asynchronous calls, waiting for completion of at least one of them before makin a 101th call?)
    Dorus
    @Dorus
    As far as i know only one invocation is made at a time.
    The maxConcurrent refers to the max number of parallel active sources. It merges 100 collections and only start at #101 once one of them yields onComplete().
    Simon Baslé
    @simonbasle
    ok looks like it's what I'm after, a way of preventing a large list in an Observable.from(...) (or range) to swamp my io layer with too many requests in an instant
    Something like:
    Observable.range(1, 1000)
        .map(id -> buildUrlToResource(id))
        //driver can only cope with around 150 asynchronous requests
        .flatMap(url -> driver.fireRequestAsync(url),
    100)
        .doOnNext(response -> showNotification(response.content())
        .toBlocking().last(); //wait for last response
    Dorus
    @Dorus
    Yes, that'll work. It will only fire up 100 fireRequestAsync in parallel.
    Simon Baslé
    @simonbasle
    cool, thanks @Dorus
    (I made a unit test in my project that demonstrates this behavior ;))
    Andres Mariscal
    @SerialDev
    I'm quite interested in learning RxJ do any of you have any good resources?
    Steve Gury
    @stevegury
    @SerialDev this tutorial https://github.com/Froussios/Intro-To-RxJava is pretty good
    Andres Mariscal
    @SerialDev
    @stevegury thank you :)
    David Stemmer
    @weefbellington
    I made a Transformer to be able to do "lookback" without a behavior subject:
    public class LookbackTransformer<T, R> implements Observable.Transformer<T, R> {
    
        private final Func2<T, T, R> mergeFunc;
    
        public LookbackTransformer(Func2<T, T, R> mergeFunc) {
            this.mergeFunc = mergeFunc;
        }
    
        @Override
        public Observable<R> call(Observable<T> source) {
            Observable<T> offsetSource = source.skip(1);
            return source.zipWith(offsetSource, mergeFunc);
        }
    }
    for example it allows you to take a stream returned by interval which emits [1, 2, 3, 4, 5] and map it to [(1,2),(2,3),(3,4),(4,5)]
    is this a wise think to do or could there be unintended consequences?
    Dorus
    @Dorus
    This is different from Buffer(2,1) ?
    David Stemmer
    @weefbellington
    @Dorus wouldn't buffer yield something like [(1,2),(3,4),(5...)]?
    Dorus
    @Dorus
    not if you use skip 1 instead of skip 2
    David Stemmer
    @weefbellington
    ah interesting. I didn't know you could use it to create overlapping windows, neat
    Dorus
    @Dorus
    It is neat!
    Works both on window() and buffer()
    David Stemmer
    @weefbellington
    the only downside would be you'd have to flatmap it but that's nbd
    Dorus
    @Dorus
    If i had to implement your Lookbacktransformer i would do something like Lookback(Observable<T> source Func2<T, T, R> merg) { return source.buffer(2,1).map(e => merg(e[0], e[1]));}
    (Code is probably all wrong, not writing a whole lot of java lately)
    David Stemmer
    @weefbellington
    yeah that sounds about right to me
    Dorus
    @Dorus
    Actually, small problem is buffer gives a single element array before onComplete. Need to filter those out else you get a ArrayIndexOutOfBoundsException.
    David Stemmer
    @weefbellington
    you can just use flatmap with window
    Dorus
    @Dorus
    Mm actually, show me, i haven't been able to produce that yet.
    David Stemmer
    @weefbellington
    I'll have to do it later, I'm about to leave
    Dorus
    @Dorus
    no problem, leaving also :)
    David Stemmer
    @weefbellington
    is there anything wrong with zipping an observable with itself though? I thought it was quite clever
    Dorus
    @Dorus
    not really, but if subscribing twice has side effects, you could consider to publish it first
    David Stemmer
    @weefbellington
    well you're not subscribing in the transformer
    Dorus
    @Dorus
    I believe zip subscribes to both sources
    Or does the transformer handle that? Again, something i'm not familiar with
    David Stemmer
    @weefbellington
    @Dorus for the sake of argument, I think this would be how you would do the LookbackTransformer with window instead of buffer:
    public class LookbackTransformer<T, R> implements Observable.Transformer<T, R> {
    
        private final Func2<T, T, R> mergeFunc;
    
        public LookbackTransformer(Func2<T, T, R> mergeFunc) {
            this.mergeFunc = mergeFunc;
        }
    
        @Override
        public Observable<R> call(Observable<T> source) {
            return source.window(2, 1).flatMap(pair -> {
                Observable<T> left = pair.first();
                Observable<T> right = pair.last();
                return left.zipWith(right, mergeFunc);
            }
        }
    }
    well, actually, this still has the single-emission problem
    David Stemmer
    @weefbellington
    probably this would be better:
    public class LookbackTransformer<T, R> implements Observable.Transformer<T, R> {
    
        private final Func2<T, T, R> mergeFunc;
    
        public LookbackTransformer(Func2<T, T, R> mergeFunc) {
            this.mergeFunc = mergeFunc;
        }
    
        @Override
        public Observable<R> call(Observable<T> source) {
            return source.window(2, 1).flatMap(pair -> {
                Observable<T> left = pair.elementAt(1);
                Observable<T> right = pair.elementAt(2);
                return left.zipWith(right, mergeFunc);
            }
        };
    }
    that way, if there were only one element in source, right would never resolve and the final merge would never occur
    dammit, that wouldn't work either, elementAt throws an IndexOutOfBoundsException :D
    David Stemmer
    @weefbellington
    last try:
    public class LookbackTransformer<T, R> implements Observable.Transformer<T, R> {
    
        private final Func2<T, T, R> mergeFunc;
    
        public LookbackTransformer(Func2<T, T, R> mergeFunc) {
            this.mergeFunc = mergeFunc;
        }
    
        @Override
        public Observable<R> call(Observable<T> source) {
            return source.window(2, 1).flatMap(pair -> {
                try {
                    Observable<T> left = pair.elementAt(1);
                    Observable<T> right = pair.elementAt(2);
                    return left.zipWith(right, mergeFunc);
                } catch (IndexOutOfBoundsException e) {
                    return Observable.never();
                }
            }
        };