Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Ilya Arkhanhelsky
    @iarkhanhelsky
    I have another question. If I map something to something I do map for each signal and each subscription. It's ok in sense of immutability. But what is the propper way of maping and subscribeing signals with statefull objects in RX way?
    Dorus
    @Dorus
    Scan + Publish?
    It does depend on what state you had in mind
    Ilya Arkhanhelsky
    @iarkhanhelsky
    I map some stateless object to statefull container. Then (in way I do it now) i make 2 subscriptions. 1st subscription updates object (in pair with another observable), 2nd reads state (in pair with completly different observable)
    Dorus
    @Dorus
    Ok i didn't understand taht
    Subscriptions only receive events, they do not update the source.
    Ilya Arkhanhelsky
    @iarkhanhelsky
            PublishSubject<Integer> seq = PublishSubject.create();
    
            Observable<Integer> seq2= seq.map(x ->{
                timesCalled += 1;
                return x * 2;
            });
    
    
            seq2.subscribe(e -> System.out.println("Hello 1: " + e));
            seq2.subscribe(e -> System.out.println("Hello 2: " + e));
    
            seq.onNext(1);
    
            System.out.println("Times map called: " + timesCalled);
    This code shows that map in this case called twice. (Is that a error or not?). And I get 2 different objects in each subscription, expected same.
    If it helps to understand my problem
    Dorus
    @Dorus
    Yes that's expected behavior
    I assume timesCalled is declared outside the scope of map, that's where the problem lies.
    Unless that was intended
    Ilya Arkhanhelsky
    @iarkhanhelsky
    No, that's for example
    Dorus
    @Dorus
    You can use Observable.Create to be able to declare a local variable instead
    Or publish seq2 so that side effects are shared between the observables instead.
    Ilya Arkhanhelsky
    @iarkhanhelsky
    It works. But as far as I understand doc, publish creates ConnectableObservable. Which waits connect before emitting somethig. Sounds like not really aimed at current problem, right? Or I get it wrong? Or I need publish just to create new observable which keeps needed objects.
    Until I map it again by the way
    Dorus
    @Dorus
    You can use refCount, or do both subscriptions and then connect() yes.
    Simon Baslé
    @simonbasle
    hi guys, can someone shed some light on how flatMap(Func1<T, Observable<R>> f, int maxConcurrent) works?
    specifically, is that a mean of propagating a backpressure request upstream?
    say I have a bursty source, like Observable.from(aCollectionOfThousandsOfStrings)
    would that flatMap signature with maxConcurrent of, say, 100 ensure that no more than 100 invocations of the map function are made at a time?
    (effectively consuming 100 strings from the source, making the corresponding asynchronous calls, waiting for completion of at least one of them before makin a 101th call?)
    Dorus
    @Dorus
    As far as i know only one invocation is made at a time.
    The maxConcurrent refers to the max number of parallel active sources. It merges 100 collections and only start at #101 once one of them yields onComplete().
    Simon Baslé
    @simonbasle
    ok looks like it's what I'm after, a way of preventing a large list in an Observable.from(...) (or range) to swamp my io layer with too many requests in an instant
    Something like:
    Observable.range(1, 1000)
        .map(id -> buildUrlToResource(id))
        //driver can only cope with around 150 asynchronous requests
        .flatMap(url -> driver.fireRequestAsync(url),
    100)
        .doOnNext(response -> showNotification(response.content())
        .toBlocking().last(); //wait for last response
    Dorus
    @Dorus
    Yes, that'll work. It will only fire up 100 fireRequestAsync in parallel.
    Simon Baslé
    @simonbasle
    cool, thanks @Dorus
    (I made a unit test in my project that demonstrates this behavior ;))
    Andres Mariscal
    @SerialDev
    I'm quite interested in learning RxJ do any of you have any good resources?
    Steve Gury
    @stevegury
    @SerialDev this tutorial https://github.com/Froussios/Intro-To-RxJava is pretty good
    Andres Mariscal
    @SerialDev
    @stevegury thank you :)
    David Stemmer
    @weefbellington
    I made a Transformer to be able to do "lookback" without a behavior subject:
    public class LookbackTransformer<T, R> implements Observable.Transformer<T, R> {
    
        private final Func2<T, T, R> mergeFunc;
    
        public LookbackTransformer(Func2<T, T, R> mergeFunc) {
            this.mergeFunc = mergeFunc;
        }
    
        @Override
        public Observable<R> call(Observable<T> source) {
            Observable<T> offsetSource = source.skip(1);
            return source.zipWith(offsetSource, mergeFunc);
        }
    }
    for example it allows you to take a stream returned by interval which emits [1, 2, 3, 4, 5] and map it to [(1,2),(2,3),(3,4),(4,5)]
    is this a wise think to do or could there be unintended consequences?
    Dorus
    @Dorus
    This is different from Buffer(2,1) ?
    David Stemmer
    @weefbellington
    @Dorus wouldn't buffer yield something like [(1,2),(3,4),(5...)]?
    Dorus
    @Dorus
    not if you use skip 1 instead of skip 2
    David Stemmer
    @weefbellington
    ah interesting. I didn't know you could use it to create overlapping windows, neat
    Dorus
    @Dorus
    It is neat!
    Works both on window() and buffer()
    David Stemmer
    @weefbellington
    the only downside would be you'd have to flatmap it but that's nbd
    Dorus
    @Dorus
    If i had to implement your Lookbacktransformer i would do something like Lookback(Observable<T> source Func2<T, T, R> merg) { return source.buffer(2,1).map(e => merg(e[0], e[1]));}
    (Code is probably all wrong, not writing a whole lot of java lately)
    David Stemmer
    @weefbellington
    yeah that sounds about right to me
    Dorus
    @Dorus
    Actually, small problem is buffer gives a single element array before onComplete. Need to filter those out else you get a ArrayIndexOutOfBoundsException.
    David Stemmer
    @weefbellington
    you can just use flatmap with window
    Dorus
    @Dorus
    Mm actually, show me, i haven't been able to produce that yet.
    David Stemmer
    @weefbellington
    I'll have to do it later, I'm about to leave
    Dorus
    @Dorus
    no problem, leaving also :)