Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Rafael Guillen
    @rguillens
    Why you need two subjects to do that??
    Ivan Schütz
    @i-schuetz
    why not ?
    I have a subject with a general state (includes many things) and in some place of the app I want another subject to react to a change in one specific part of this state
    Rafael Guillen
    @rguillens
    You can subscribe, why map them??
    Ivan Schütz
    @i-schuetz
    I can, yes, but why not map? It also makes sense, concept-wise IMO
    okay, the reason, then, that Subject doesn't provide operators that work "on itself" is because they shouldn't be used so often...?
    (not trying to make a point, genuinely trying to understand)
    Rafael Guillen
    @rguillens
    Think for a moment, you apply mapping functions to values, not to Observables ...

    In this case I think subscribe is a better solution

    Disposable d = generalSubject.subscribe(isASelectedSubject::onNext);

    This way, isASelectedSubject will re-emit items from generalSubject and you can even dispose the subscription.

    Ivan Schütz
    @i-schuetz
    that's right, I think it also reflects clearer the intent...
    Rafael Guillen
    @rguillens
    Subjects are useful, but think for a moment, propagating it's onNextmethod all the way down is very dangerous. Anyone could modify the chain of events at any point... I'm not sure if this is what you meant.
    Ivan Schütz
    @i-schuetz
    that's what I do with subscribe too, except that there's more control with disposable... or in the block itself
    Ivan Schütz
    @i-schuetz
    ;)
    Ivan Schütz
    @i-schuetz
    Ivan Schütz
    @i-schuetz
    seems this is what I need Schedulers.single()
    Anuj Middha
    @anujmiddha
    Hi all. I am trying to understand Observable.generate in RxJava2. For this, I have written a sample function https://pastebin.com/dc9pd5DB . Can someone please confirm if I am using this as intended, or correct me if I am wrong?
    Ivan Schütz
    @i-schuetz
    I'm confused about publish().refCount()... someone adviced me a while ago to use this to prevent the observable being called multiple times for each subscription, but it doesn't seem to work exactly like that. The only way I see right now is to call first publish(), to transform the observable in a connectable observable and call connect(), after all the observers subscribed
    e.g.
    public static void main(String[] args) {
    
        ConnectableObservable<String> source = Observable.just("foo").map(par -> {
            System.out.println("called!"); // executed only once
            return par;
        }).publish();
    
        source.subscribe(par ->
            System.out.println("o1: " + par)
        );
    
        source.subscribe(par ->
            System.out.println("o2: " + par)
        );
    
        source.connect();
    }
    Ivan Schütz
    @i-schuetz
    this, on the other side...
    public static void main(String[] args) {
    
        Observable<String> source = Observable.just("foo").map(par -> {
            System.out.println("called!"); // executed twice
            return par;
        }).publish().refCount();
    
    
        source.subscribe(par ->
                System.out.println("o1: " + par)
        );
    
        source.subscribe(par ->
                System.out.println("o2: " + par)
        );
    }
    for which cases is exactly publish().refCount() (or share()) of use?
    Rafael Guillen
    @rguillens
    @i-schuetz , I told you to use publish().refcount() because of the problem you were trying to solve dealing with Hot Observables.
    Rafael Guillen
    @rguillens
    @i-schuetz , as you may know a cold observable become hot with the publish() operator. refCount() operator returns an Observable<T> that is connected to this hot observable as long as there are subscribers to it. This is intended to prevent unnecessary event processing and propagation, because hot observable sources just keep emitting events, you can't control them.
    Rafael Guillen
    @rguillens
    @i-shuetz, By the way, the execution of the map() operator for each subscriber doesn't mean that there are two subscriptions. Don't use map, use doOn operators, with doOnSubscribe()you'll see there is only one subscription, but you'll need to change the observable source to something that lasts a little more.
    In the code below, the subscription and emission of o1 is so fast, that when o2 is subscribed o1 is already disposed. That's why there are two subscriptions, but one after the other.
    Observable<String> source = Observable.just("foo").map(par -> {
                System.out.println("called!"); // executed twice
                return par;
            }).publish().refCount();
    
    
            source.subscribe(par -> System.out.println("o1: " + par),
                    System.out::println,
                    () -> System.out.println("Done!"));
    
            source.subscribe(par -> System.out.println("o2: " + par),
                    System.out::println,
                    () -> System.out.println("Done!"));
    Joshua Street
    @jjstreet
    hello
    i have a hopefully straight-forward question about RxJava, specifically about merging completables
    Completable.merge() according to docs, subscribes to all specified completables
    i want to somehow delay those subscriptions to when the completable created by the merge is subscribed to
    and have functionality that is similar to Completable.fromAction
    im struggling trying to figure out how to achieve this, any thoughts or ideas?
    Joshua Street
    @jjstreet
    forget my question i had issues in my test code that lead me to believe an incorrect assumption about how merge worked with subscriptions
    Nayshawn Danner
    @NayshawnD_twitter
    Hi all,
    Nayshawn Danner
    @NayshawnD_twitter
    whoops!, was wonder if this was some how possible in RxJava:
    Subject<Integer, String> publishSubject = PublishSubject.create()
    PublishSubject<T, T> obviously doesn't like to type transformation, even though the Subject type suggest that this could be possible. Suggestions any one?
    Sourabh Verma
    @sourabhv

    Is there a way to make requests in parellel in batch. Like, if I have a list of ids and I want to get details of those ids 4 at a time. This is what I have till now

    api.users().flatMap(Observable::fromIterable)
                    .buffer(4)
                    .flatMap(users -> Observable.fromIterable(users).flatMap(user -> api.usersData(user.username).subscribeOn(Schedulers.newThread())))
                    .subscribe(user -> System.out.println(user.toStringNew()), Throwable::printStackTrace);

    but this thing makes all requests in parellel, not in batches of 4

    Stergio
    @stergiom
    perhaps try using a fixed size threadpool in place of Schedulers.newThread
    Sourabh Verma
    @sourabhv
    But is there a better way instead of depending on size of theadpool
    Stergio
    @stergiom
    there is the flatMap(func, maxConcurrency) overload
    Paul DeMarco
    @pauldemarco
    Hi everyone, is it possible to listen to an observable but not actually subscribe to it.
    For instance, BLE will begin to scan once it's subscribed to, I'd like to setup a listener for results, but start the scanning from another function
    Ben Christensen
    @benjchristensen
    Compose using the doOn* operators. Those will let to 'peek' at the events as they flow by once subscribed to.
    s/to/you
    Paul DeMarco
    @pauldemarco
    that is awesome. Thanks!
    قوه هاو تشاو
    @BabyDuncan
    awesome
    Ionut Velicu
    @ionutvelicu
    Hello! Kind of a long shot but maybe could clarify this for me. Using RxJava w/ Android and SQLBrite. I understand that SQLBrite is a wrapper around SQLight that listens to changes from the DB and notifies the subscribers. However, my question is how can I check if an entry is in a database or not. Is this something that can be done via Observables (seems kind of unnatural to me) or should I use plain SQLight querying in this case? Thanks!
    Ivan Schütz
    @i-schuetz
    @rguillens thanks again for your help!
    Yannick Lecaillez
    @ylecaillez
    Hi there ! By curiosity: is there any reason why the FlowableTimeoutTimed isn't using HalfSerializer ? https://github.com/ReactiveX/RxJava/blob/2.x/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java#L51
    Is it because AtomicThrowable might propagate CompositeException ?
    David Karnok
    @akarnokd
    It is a pretty old algorithm/file (also predating half-serializer) and there are now different approaches available, such as index-based state transitions that don't need external serialization: see FlowableTimeout of Reactive4.NET.
    Yannick Lecaillez
    @ylecaillez
    Wow, really instructive :) Thanks !