Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Dorus
    @Dorus
    Scan + Publish?
    It does depend on what state you had in mind
    Ilya Arkhanhelsky
    @iarkhanhelsky
    I map some stateless object to statefull container. Then (in way I do it now) i make 2 subscriptions. 1st subscription updates object (in pair with another observable), 2nd reads state (in pair with completly different observable)
    Dorus
    @Dorus
    Ok i didn't understand taht
    Subscriptions only receive events, they do not update the source.
    Ilya Arkhanhelsky
    @iarkhanhelsky
            PublishSubject<Integer> seq = PublishSubject.create();
    
            Observable<Integer> seq2= seq.map(x ->{
                timesCalled += 1;
                return x * 2;
            });
    
    
            seq2.subscribe(e -> System.out.println("Hello 1: " + e));
            seq2.subscribe(e -> System.out.println("Hello 2: " + e));
    
            seq.onNext(1);
    
            System.out.println("Times map called: " + timesCalled);
    This code shows that map in this case called twice. (Is that a error or not?). And I get 2 different objects in each subscription, expected same.
    If it helps to understand my problem
    Dorus
    @Dorus
    Yes that's expected behavior
    I assume timesCalled is declared outside the scope of map, that's where the problem lies.
    Unless that was intended
    Ilya Arkhanhelsky
    @iarkhanhelsky
    No, that's for example
    Dorus
    @Dorus
    You can use Observable.Create to be able to declare a local variable instead
    Or publish seq2 so that side effects are shared between the observables instead.
    Ilya Arkhanhelsky
    @iarkhanhelsky
    It works. But as far as I understand doc, publish creates ConnectableObservable. Which waits connect before emitting somethig. Sounds like not really aimed at current problem, right? Or I get it wrong? Or I need publish just to create new observable which keeps needed objects.
    Until I map it again by the way
    Dorus
    @Dorus
    You can use refCount, or do both subscriptions and then connect() yes.
    Simon Baslé
    @simonbasle
    hi guys, can someone shed some light on how flatMap(Func1<T, Observable<R>> f, int maxConcurrent) works?
    specifically, is that a mean of propagating a backpressure request upstream?
    say I have a bursty source, like Observable.from(aCollectionOfThousandsOfStrings)
    would that flatMap signature with maxConcurrent of, say, 100 ensure that no more than 100 invocations of the map function are made at a time?
    (effectively consuming 100 strings from the source, making the corresponding asynchronous calls, waiting for completion of at least one of them before makin a 101th call?)
    Dorus
    @Dorus
    As far as i know only one invocation is made at a time.
    The maxConcurrent refers to the max number of parallel active sources. It merges 100 collections and only start at #101 once one of them yields onComplete().
    Simon Baslé
    @simonbasle
    ok looks like it's what I'm after, a way of preventing a large list in an Observable.from(...) (or range) to swamp my io layer with too many requests in an instant
    Something like:
    Observable.range(1, 1000)
        .map(id -> buildUrlToResource(id))
        //driver can only cope with around 150 asynchronous requests
        .flatMap(url -> driver.fireRequestAsync(url),
    100)
        .doOnNext(response -> showNotification(response.content())
        .toBlocking().last(); //wait for last response
    Dorus
    @Dorus
    Yes, that'll work. It will only fire up 100 fireRequestAsync in parallel.
    Simon Baslé
    @simonbasle
    cool, thanks @Dorus
    (I made a unit test in my project that demonstrates this behavior ;))
    Andres Mariscal
    @SerialDev
    I'm quite interested in learning RxJ do any of you have any good resources?
    Steve Gury
    @stevegury
    @SerialDev this tutorial https://github.com/Froussios/Intro-To-RxJava is pretty good
    Andres Mariscal
    @SerialDev
    @stevegury thank you :)
    David Stemmer
    @weefbellington
    I made a Transformer to be able to do "lookback" without a behavior subject:
    public class LookbackTransformer<T, R> implements Observable.Transformer<T, R> {
    
        private final Func2<T, T, R> mergeFunc;
    
        public LookbackTransformer(Func2<T, T, R> mergeFunc) {
            this.mergeFunc = mergeFunc;
        }
    
        @Override
        public Observable<R> call(Observable<T> source) {
            Observable<T> offsetSource = source.skip(1);
            return source.zipWith(offsetSource, mergeFunc);
        }
    }
    for example it allows you to take a stream returned by interval which emits [1, 2, 3, 4, 5] and map it to [(1,2),(2,3),(3,4),(4,5)]
    is this a wise think to do or could there be unintended consequences?
    Dorus
    @Dorus
    This is different from Buffer(2,1) ?
    David Stemmer
    @weefbellington
    @Dorus wouldn't buffer yield something like [(1,2),(3,4),(5...)]?
    Dorus
    @Dorus
    not if you use skip 1 instead of skip 2
    David Stemmer
    @weefbellington
    ah interesting. I didn't know you could use it to create overlapping windows, neat
    Dorus
    @Dorus
    It is neat!
    Works both on window() and buffer()
    David Stemmer
    @weefbellington
    the only downside would be you'd have to flatmap it but that's nbd
    Dorus
    @Dorus
    If i had to implement your Lookbacktransformer i would do something like Lookback(Observable<T> source Func2<T, T, R> merg) { return source.buffer(2,1).map(e => merg(e[0], e[1]));}
    (Code is probably all wrong, not writing a whole lot of java lately)
    David Stemmer
    @weefbellington
    yeah that sounds about right to me
    Dorus
    @Dorus
    Actually, small problem is buffer gives a single element array before onComplete. Need to filter those out else you get a ArrayIndexOutOfBoundsException.
    David Stemmer
    @weefbellington
    you can just use flatmap with window
    Dorus
    @Dorus
    Mm actually, show me, i haven't been able to produce that yet.
    David Stemmer
    @weefbellington
    I'll have to do it later, I'm about to leave
    Dorus
    @Dorus
    no problem, leaving also :)
    David Stemmer
    @weefbellington
    is there anything wrong with zipping an observable with itself though? I thought it was quite clever