Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Samuel Tardieu
    @samueltardieu
    For example, let's imagine that you have 1000 caches from 1 to 1000 with the first and the last fullfilling the criterium, the others won't. By using a regular concat, I'll have to go through the list, 1 will get downloaded, 2 to 999 will be put in the non-preferred observable, we will get to 1000 eventually and download it, then we will download 2 to 999. What I would like is download 1, put 2 to 600 aside for example, then the subscriber has a network slot available so it will get 2 because there is no preferred geocache at this time, scanning will continue from 601 to 1000, then 1000 will be handed to the subscriber then 3 to 999.
    On the other hand, if geocaches 1 to 998 meet the criteria and 999 and 1000 don't, we will hand 1 to the network downloader, continue to scan 2 to 6 then stop because the downloader has no slot available so it will backpressure, then when it requests another geocache to download it will be handed 2 and we will scan 7 and pause again, etc. So we follow the demand and don't do useless work by analyzing the geocaches in this case, if the user cancels we won't have trashed the disk retrieving useless information.
    dwursteisen
    @dwursteisen
    Have you a test which reproduce the issue ? I don't have the solution, but it's a interesting use case.
    Samuel Tardieu
    @samueltardieu
    Not yet, but I'll have one soon if I write such an operator :-)
    dwursteisen
    @dwursteisen
    (and the test may help to fully understand the issue)
    :D
    Samuel Tardieu
    @samueltardieu
    @dwursteisen I'm out now, but you can look at the test cases at https://github.com/cgeo/cgeo/blob/master/tests/src/cgeo/geocaching/utils/RxUtilsTest.java#L117
    cavemansspa
    @cavemansspa
    these examples https://github.com/ReactiveX/RxGroovy/tree/1.x/src/examples/groovy/rx/lang/groovy/examples were created a while ago. is this still the correct approach to implementing non-blocking Observables -- any recent Rx changes that would approach this differently?
    cavemansspa
    @cavemansspa
    would the new Completable replace the thread based non-blocking Observable in these examples?
    Completable completable1 = Completable.fromAction({ Thread.sleep(5000) println 'hello world'}) .subscribeOn(Schedulers.io());
    fAns1k
    @fAns1k
    hi guys, please help me with this snippet, it looks simple but for some reasone it hasn’t work as expected
    Observable.from(videoStreams)
                            .delay(videoStream -> playlistSubject.startWith((Void) null)
                                    .doOnNext(new Action1<Void>() {
                                        @Override
                                        public void call(Void aVoid) {
                                            Timber.d("Emit delay");
                                        }
                                    }))
    my point is to emit items one by one, to play in media player
    Dorus
    @Dorus
    delay merely delays the current one item untill the inner observable emit. This happens instantly because of startWith.
    fAns1k
    @fAns1k
    i tried to fix it this way =)
    Dorus
    @Dorus
    What's expected?
    fAns1k
    @fAns1k
    i get list of items (e.g. 5) and i start to play first immideately
    others should play after first, etc
    media player gives me event that video is ended, and then i play next one
    Dorus
    @Dorus
    So you need to buffer the items untill the previouse finishes. Try concatMap.
    fAns1k
    @fAns1k
    i cannot interact with player inside the stream
    that’s why i use subject (PublishSubject) to send event to push new item
    Dorus
    @Dorus
    So if i understand you correctly, you have 2 streams, one that emit 5 items and should reemit them 1 at the time, each time another stream emits the previous item is finished playing?
    I'm not even 100% sure if this is an reactive problem.
    fAns1k
    @fAns1k
    i guess it does
    Dorus
    @Dorus
    anyway, you can even use zip. source.zip(playerReady(startWith(ready))).
    fAns1k
    @fAns1k
    ok, will try, thanks for response =)
    Dorus
    @Dorus
    zip will wait for both streams to have emited something.
    We append a start value to playerReady so that it always starts as ready, so the first item from source is emitted.
    fAns1k
    @fAns1k
    yep, it connects one to one from streams
    Dorus
    @Dorus
    Then the next item from source will be forwarded as soon as playerReadyyields again.
    fAns1k
    @fAns1k
    thanks
    Dorus
    @Dorus
    Good luck :)
    fAns1k
    @fAns1k
    it seems you are right =)
    thanks for your help
    really simple issue (
    @Dorus
    Dorus
    @Dorus
    np ;)
    cavemansspa
    @cavemansspa
    @Dorus -- did you see those groovy github examples -- is that still the way to implement a non-blocking Observable?
    Dorus
    @Dorus
    @cavemansspa I'm not sure where to look at. Observables are normally never blocking anyway.
    You mean using Thread.start stuff? Well, i would prefer to have cancelation implemented and also use the Schedulers class (pass in a scheduler to those functions). But overall it's not against the Rx contract to do it like that.
    Dorus
    @Dorus
    Also, it totally IS possible to unsubscribe from customObservableBlocking. Just do customObservableBlocking.take(2). Unsubscribes after 2 items :-)
    (just that it will then ignore all subsequent calls to onNext)
    cavemansspa
    @cavemansspa
    @Dorus - yes, i was referring to the Thread.start approach. i thought that maybe the schedulers functionality came after those examples were created.
    Dorus
    @Dorus
    I have no idea who made those examples or when :)
    cavemansspa
    @cavemansspa
    looks like ben christensen
    two years ago
    Dorus
    @Dorus
    Looks like it. Like i said, they're not wrong.
    cavemansspa
    @cavemansspa
    okay thanks for input -- wasn't sure if there was a better way since a lot of stuff has been added since those were put together.
    Dorus
    @Dorus
    The most important part is to stay within the contract: Serialize calls to onNext and finish with either onCompleted or onError.
    Other than that you can call code from any thread.