Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Ivan Schütz
    @i-schuetz
    that's what I do with subscribe too, except that there's more control with disposable... or in the block itself
    Ivan Schütz
    @i-schuetz
    ;)
    Ivan Schütz
    @i-schuetz
    Ivan Schütz
    @i-schuetz
    seems this is what I need Schedulers.single()
    Anuj Middha
    @anujmiddha
    Hi all. I am trying to understand Observable.generate in RxJava2. For this, I have written a sample function https://pastebin.com/dc9pd5DB . Can someone please confirm if I am using this as intended, or correct me if I am wrong?
    Ivan Schütz
    @i-schuetz
    I'm confused about publish().refCount()... someone adviced me a while ago to use this to prevent the observable being called multiple times for each subscription, but it doesn't seem to work exactly like that. The only way I see right now is to call first publish(), to transform the observable in a connectable observable and call connect(), after all the observers subscribed
    e.g.
    public static void main(String[] args) {
    
        ConnectableObservable<String> source = Observable.just("foo").map(par -> {
            System.out.println("called!"); // executed only once
            return par;
        }).publish();
    
        source.subscribe(par ->
            System.out.println("o1: " + par)
        );
    
        source.subscribe(par ->
            System.out.println("o2: " + par)
        );
    
        source.connect();
    }
    Ivan Schütz
    @i-schuetz
    this, on the other side...
    public static void main(String[] args) {
    
        Observable<String> source = Observable.just("foo").map(par -> {
            System.out.println("called!"); // executed twice
            return par;
        }).publish().refCount();
    
    
        source.subscribe(par ->
                System.out.println("o1: " + par)
        );
    
        source.subscribe(par ->
                System.out.println("o2: " + par)
        );
    }
    for which cases is exactly publish().refCount() (or share()) of use?
    Rafael Guillen
    @rguillens
    @i-schuetz , I told you to use publish().refcount() because of the problem you were trying to solve dealing with Hot Observables.
    Rafael Guillen
    @rguillens
    @i-schuetz , as you may know a cold observable become hot with the publish() operator. refCount() operator returns an Observable<T> that is connected to this hot observable as long as there are subscribers to it. This is intended to prevent unnecessary event processing and propagation, because hot observable sources just keep emitting events, you can't control them.
    Rafael Guillen
    @rguillens
    @i-shuetz, By the way, the execution of the map() operator for each subscriber doesn't mean that there are two subscriptions. Don't use map, use doOn operators, with doOnSubscribe()you'll see there is only one subscription, but you'll need to change the observable source to something that lasts a little more.
    In the code below, the subscription and emission of o1 is so fast, that when o2 is subscribed o1 is already disposed. That's why there are two subscriptions, but one after the other.
    Observable<String> source = Observable.just("foo").map(par -> {
                System.out.println("called!"); // executed twice
                return par;
            }).publish().refCount();
    
    
            source.subscribe(par -> System.out.println("o1: " + par),
                    System.out::println,
                    () -> System.out.println("Done!"));
    
            source.subscribe(par -> System.out.println("o2: " + par),
                    System.out::println,
                    () -> System.out.println("Done!"));
    Joshua Street
    @jjstreet
    hello
    i have a hopefully straight-forward question about RxJava, specifically about merging completables
    Completable.merge() according to docs, subscribes to all specified completables
    i want to somehow delay those subscriptions to when the completable created by the merge is subscribed to
    and have functionality that is similar to Completable.fromAction
    im struggling trying to figure out how to achieve this, any thoughts or ideas?
    Joshua Street
    @jjstreet
    forget my question i had issues in my test code that lead me to believe an incorrect assumption about how merge worked with subscriptions
    Nayshawn Danner
    @NayshawnD_twitter
    Hi all,
    Nayshawn Danner
    @NayshawnD_twitter
    whoops!, was wonder if this was some how possible in RxJava:
    Subject<Integer, String> publishSubject = PublishSubject.create()
    PublishSubject<T, T> obviously doesn't like to type transformation, even though the Subject type suggest that this could be possible. Suggestions any one?
    Sourabh Verma
    @sourabhv

    Is there a way to make requests in parellel in batch. Like, if I have a list of ids and I want to get details of those ids 4 at a time. This is what I have till now

    api.users().flatMap(Observable::fromIterable)
                    .buffer(4)
                    .flatMap(users -> Observable.fromIterable(users).flatMap(user -> api.usersData(user.username).subscribeOn(Schedulers.newThread())))
                    .subscribe(user -> System.out.println(user.toStringNew()), Throwable::printStackTrace);

    but this thing makes all requests in parellel, not in batches of 4

    Stergio
    @stergiom
    perhaps try using a fixed size threadpool in place of Schedulers.newThread
    Sourabh Verma
    @sourabhv
    But is there a better way instead of depending on size of theadpool
    Stergio
    @stergiom
    there is the flatMap(func, maxConcurrency) overload
    Paul DeMarco
    @pauldemarco
    Hi everyone, is it possible to listen to an observable but not actually subscribe to it.
    For instance, BLE will begin to scan once it's subscribed to, I'd like to setup a listener for results, but start the scanning from another function
    Ben Christensen
    @benjchristensen
    Compose using the doOn* operators. Those will let to 'peek' at the events as they flow by once subscribed to.
    s/to/you
    Paul DeMarco
    @pauldemarco
    that is awesome. Thanks!
    قوه هاو تشاو
    @BabyDuncan
    awesome
    Ionut Velicu
    @ionutvelicu
    Hello! Kind of a long shot but maybe could clarify this for me. Using RxJava w/ Android and SQLBrite. I understand that SQLBrite is a wrapper around SQLight that listens to changes from the DB and notifies the subscribers. However, my question is how can I check if an entry is in a database or not. Is this something that can be done via Observables (seems kind of unnatural to me) or should I use plain SQLight querying in this case? Thanks!
    Ivan Schütz
    @i-schuetz
    @rguillens thanks again for your help!
    Yannick Lecaillez
    @ylecaillez
    Hi there ! By curiosity: is there any reason why the FlowableTimeoutTimed isn't using HalfSerializer ? https://github.com/ReactiveX/RxJava/blob/2.x/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java#L51
    Is it because AtomicThrowable might propagate CompositeException ?
    David Karnok
    @akarnokd
    It is a pretty old algorithm/file (also predating half-serializer) and there are now different approaches available, such as index-based state transitions that don't need external serialization: see FlowableTimeout of Reactive4.NET.
    Yannick Lecaillez
    @ylecaillez
    Wow, really instructive :) Thanks !
    Paul Tikhonov
    @Pahanuch
    Hi there. Can I set a locale priority for.toSortedList ? For example: I have a values list with latin, cyrilic, number and special symbols. I need next sequence: cyrilic – latin – numbers – special chars. Now - special chars - numbers - latin - cyrilic
    .toSortedList((lhs, rhs) -> lhs.getDisplayName().compareTo(rhs.getDisplayName()))
    millsdude99
    @millsdude99
    Hi, I'm new to rx java, transitioning from rx C#. I want to marshall the observable, via observeOn to another thread. It needs to be a single thread to preserve ordering. In c# world, I used an EventLoopScheduler in my ObserveOn calls.
    What is the corollary in Java. I've seen suggested to used Schedulers.computation, but I am not sure how that is managed. Is it a single thread, or delegate to a thread pool?
    For now i've created a scheduler based on a singleThreadExecutor --- observeOn(Schedulers.from(Executors.newSingleThreadExecutor)
    Does this seem like a coherent approach?
    David Karnok
    @akarnokd
    In RxJava, we use a different style of Schedulers. Our schedulers are designed as a pair of Scheduler and Worker. Each Scheduler has a createWorker() method that returns a Worker that is guaranteed to be FIFO for non-timed tasks. observeOn runs on such worker and thus the item event order is preserved (with the option to make sure errors don't cut ahead as well). I.e., Observable.just("Hello", "world").observeOn(Schedulers.computation()).subscribe(System.out::println); will print the two words in order on the same thread. If you are on 1.x then Schedulers.from() is an option for having a single threaded scheduler. in 2.x there is the Schedulers.single(). I have a 3 part series about RxJava's Schedulers.
    Joshua Street
    @jjstreet
    Hey guys, I'm having a problem with using RxJava in my Spring Boot application. The stack trace is:
    java.lang.IllegalStateException: Illegal access: this web application instance has been stopped already. Could not load [META-INF/services/javax.xml.bind.JAXBContext]. The following stack trace is thrown for debugging purposes as well as to attempt to terminate the thread which caused the illegal access.
        at org.apache.catalina.loader.WebappClassLoaderBase.checkStateForResourceLoading(WebappClassLoaderBase.java:1305) [tomcat-embed-core-8.5.15.jar:8.5.15]
        at org.apache.catalina.loader.WebappClassLoaderBase.getResource(WebappClassLoaderBase.java:986) [tomcat-embed-core-8.5.15.jar:8.5.15]
        at javax.xml.bind.ContextFinder.find(ContextFinder.java:432) [na:1.8.0_121]
        at javax.xml.bind.JAXBContext.newInstance(JAXBContext.java:641) [na:1.8.0_121]
        at javax.xml.bind.JAXBContext.newInstance(JAXBContext.java:584) [na:1.8.0_121]
        at org.springframework.http.converter.xml.AbstractJaxb2HttpMessageConverter.getJaxbContext(AbstractJaxb2HttpMessageConverter.java:111) [spring-web-4.3.9.RELEASE.jar:4.3.9.RELEASE]
        at org.springframework.http.converter.xml.AbstractJaxb2HttpMessageConverter.createMarshaller(AbstractJaxb2HttpMessageConverter.java:50) [spring-web-4.3.9.RELEASE.jar:4.3.9.RELEASE]
        at org.springframework.http.converter.xml.Jaxb2RootElementHttpMessageConverter.writeToResult(Jaxb2RootElementHttpMessageConverter.java:185) [spring-web-4.3.9.RELEASE.jar:4.3.9.RELEASE]
        at org.springframework.http.converter.xml.AbstractXmlHttpMessageConverter.writeInternal(AbstractXmlHttpMessageConverter.java:66) [spring-web-4.3.9.RELEASE.jar:4.3.9.RELEASE]
        at org.springframework.http.converter.AbstractHttpMessageConverter.write(AbstractHttpMessageConverter.java:227) [spring-web-4.3.9.RELEASE.jar:4.3.9.RELEASE]
        at org.springframework.web.client.RestTemplate$HttpEntityRequestCallback.doWithRequest(RestTemplate.java:882) [spring-web-4.3.9.RELEASE.jar:4.3.9.RELEASE]
        at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:650) [spring-web-4.3.9.RELEASE.jar:4.3.9.RELEASE]
        at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:613) [spring-web-4.3.9.RELEASE.jar:4.3.9.RELEASE]
        at org.springframework.web.client.RestTemplate.postForObject(RestTemplate.java:380) [spring-web-4.3.9.RELEASE.jar:4.3.9.RELEASE]
        at com.ford.gforce.rushhour.client.GatewayClient.load(GatewayClient.java:32) [bin/:na]
        at com.ford.gforce.rushhour.fatcat.run.FatcatTestGatewayRunner.warmCache(FatcatTestGatewayRunner.java:35) [bin/:na]
        at com.ford.gforce.rushhour.fatcat.run.FatcatTestGatewayRunner.lambda$1(FatcatTestGatewayRunner.java:39) [bin/:na]
        at io.reactivex.internal.operators.completable.CompletableFromAction.subscribeActual(CompletableFromAction.java:34) ~[rxjava-2.1.0.jar:2.1.0]
        at io.reactivex.Completable.subscribe(Completable.java:1635) ~[rxjava-2.1.0.jar:2.1.0]
        at io.reactivex.internal.operators.completable.CompletableSubscribeOn$SubscribeOnObserver.run(CompletableSubscribeOn.java:64) ~[rxjava-2.1.0.jar:2.1.0]
        at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:452) ~[rxjava-2.1.0.jar:2.1.0]
        at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:61) ~[rxjava-2.1.0.jar:2.1.0]
        at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:52) ~[rxjava-2.1.0.jar:2.1.0]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_121]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_121]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[na:1.8.0_121]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_121]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_121]
        at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_121]
    i've talked to some folks in the spring boot channel and they state that it could be related to what thread context class loader RxJava uses
    it shouldn't be using Tomcat's, and should be using the Application's
    does anyone have insight how i can confirm this is the problem and how to fix it?
    Joshua Street
    @jjstreet
    turns out using the IO scheduler had cached stale threads
    shutting down and starting the IO scheduler fixes the problem