Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
    Rafael Guillen
    @i-schuetz , I told you to use publish().refcount() because of the problem you were trying to solve dealing with Hot Observables.
    Rafael Guillen
    @i-schuetz , as you may know a cold observable become hot with the publish() operator. refCount() operator returns an Observable<T> that is connected to this hot observable as long as there are subscribers to it. This is intended to prevent unnecessary event processing and propagation, because hot observable sources just keep emitting events, you can't control them.
    Rafael Guillen
    @i-shuetz, By the way, the execution of the map() operator for each subscriber doesn't mean that there are two subscriptions. Don't use map, use doOn operators, with doOnSubscribe()you'll see there is only one subscription, but you'll need to change the observable source to something that lasts a little more.
    In the code below, the subscription and emission of o1 is so fast, that when o2 is subscribed o1 is already disposed. That's why there are two subscriptions, but one after the other.
    Observable<String> source = Observable.just("foo").map(par -> {
                System.out.println("called!"); // executed twice
                return par;
            source.subscribe(par -> System.out.println("o1: " + par),
                    () -> System.out.println("Done!"));
            source.subscribe(par -> System.out.println("o2: " + par),
                    () -> System.out.println("Done!"));
    Joshua Street
    i have a hopefully straight-forward question about RxJava, specifically about merging completables
    Completable.merge() according to docs, subscribes to all specified completables
    i want to somehow delay those subscriptions to when the completable created by the merge is subscribed to
    and have functionality that is similar to Completable.fromAction
    im struggling trying to figure out how to achieve this, any thoughts or ideas?
    Joshua Street
    forget my question i had issues in my test code that lead me to believe an incorrect assumption about how merge worked with subscriptions
    Nayshawn Danner
    Hi all,
    Nayshawn Danner
    whoops!, was wonder if this was some how possible in RxJava:
    Subject<Integer, String> publishSubject = PublishSubject.create()
    PublishSubject<T, T> obviously doesn't like to type transformation, even though the Subject type suggest that this could be possible. Suggestions any one?
    Sourabh Verma

    Is there a way to make requests in parellel in batch. Like, if I have a list of ids and I want to get details of those ids 4 at a time. This is what I have till now

                    .flatMap(users -> Observable.fromIterable(users).flatMap(user -> api.usersData(user.username).subscribeOn(Schedulers.newThread())))
                    .subscribe(user -> System.out.println(user.toStringNew()), Throwable::printStackTrace);

    but this thing makes all requests in parellel, not in batches of 4

    perhaps try using a fixed size threadpool in place of Schedulers.newThread
    Sourabh Verma
    But is there a better way instead of depending on size of theadpool
    there is the flatMap(func, maxConcurrency) overload
    Paul DeMarco
    Hi everyone, is it possible to listen to an observable but not actually subscribe to it.
    For instance, BLE will begin to scan once it's subscribed to, I'd like to setup a listener for results, but start the scanning from another function
    Ben Christensen
    Compose using the doOn* operators. Those will let to 'peek' at the events as they flow by once subscribed to.
    Paul DeMarco
    that is awesome. Thanks!
    قوه هاو تشاو
    Ionut Velicu
    Hello! Kind of a long shot but maybe could clarify this for me. Using RxJava w/ Android and SQLBrite. I understand that SQLBrite is a wrapper around SQLight that listens to changes from the DB and notifies the subscribers. However, my question is how can I check if an entry is in a database or not. Is this something that can be done via Observables (seems kind of unnatural to me) or should I use plain SQLight querying in this case? Thanks!
    Ivan Schütz
    @rguillens thanks again for your help!
    Yannick Lecaillez
    Hi there ! By curiosity: is there any reason why the FlowableTimeoutTimed isn't using HalfSerializer ? https://github.com/ReactiveX/RxJava/blob/2.x/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java#L51
    Is it because AtomicThrowable might propagate CompositeException ?
    David Karnok
    It is a pretty old algorithm/file (also predating half-serializer) and there are now different approaches available, such as index-based state transitions that don't need external serialization: see FlowableTimeout of Reactive4.NET.
    Yannick Lecaillez
    Wow, really instructive :) Thanks !
    Paul Tikhonov
    Hi there. Can I set a locale priority for.toSortedList ? For example: I have a values list with latin, cyrilic, number and special symbols. I need next sequence: cyrilic – latin – numbers – special chars. Now - special chars - numbers - latin - cyrilic
    .toSortedList((lhs, rhs) -> lhs.getDisplayName().compareTo(rhs.getDisplayName()))
    Hi, I'm new to rx java, transitioning from rx C#. I want to marshall the observable, via observeOn to another thread. It needs to be a single thread to preserve ordering. In c# world, I used an EventLoopScheduler in my ObserveOn calls.
    What is the corollary in Java. I've seen suggested to used Schedulers.computation, but I am not sure how that is managed. Is it a single thread, or delegate to a thread pool?
    For now i've created a scheduler based on a singleThreadExecutor --- observeOn(Schedulers.from(Executors.newSingleThreadExecutor)
    Does this seem like a coherent approach?
    David Karnok
    In RxJava, we use a different style of Schedulers. Our schedulers are designed as a pair of Scheduler and Worker. Each Scheduler has a createWorker() method that returns a Worker that is guaranteed to be FIFO for non-timed tasks. observeOn runs on such worker and thus the item event order is preserved (with the option to make sure errors don't cut ahead as well). I.e., Observable.just("Hello", "world").observeOn(Schedulers.computation()).subscribe(System.out::println); will print the two words in order on the same thread. If you are on 1.x then Schedulers.from() is an option for having a single threaded scheduler. in 2.x there is the Schedulers.single(). I have a 3 part series about RxJava's Schedulers.
    Joshua Street
    Hey guys, I'm having a problem with using RxJava in my Spring Boot application. The stack trace is:
    java.lang.IllegalStateException: Illegal access: this web application instance has been stopped already. Could not load [META-INF/services/javax.xml.bind.JAXBContext]. The following stack trace is thrown for debugging purposes as well as to attempt to terminate the thread which caused the illegal access.
        at org.apache.catalina.loader.WebappClassLoaderBase.checkStateForResourceLoading(WebappClassLoaderBase.java:1305) [tomcat-embed-core-8.5.15.jar:8.5.15]
        at org.apache.catalina.loader.WebappClassLoaderBase.getResource(WebappClassLoaderBase.java:986) [tomcat-embed-core-8.5.15.jar:8.5.15]
        at javax.xml.bind.ContextFinder.find(ContextFinder.java:432) [na:1.8.0_121]
        at javax.xml.bind.JAXBContext.newInstance(JAXBContext.java:641) [na:1.8.0_121]
        at javax.xml.bind.JAXBContext.newInstance(JAXBContext.java:584) [na:1.8.0_121]
        at org.springframework.http.converter.xml.AbstractJaxb2HttpMessageConverter.getJaxbContext(AbstractJaxb2HttpMessageConverter.java:111) [spring-web-4.3.9.RELEASE.jar:4.3.9.RELEASE]
        at org.springframework.http.converter.xml.AbstractJaxb2HttpMessageConverter.createMarshaller(AbstractJaxb2HttpMessageConverter.java:50) [spring-web-4.3.9.RELEASE.jar:4.3.9.RELEASE]
        at org.springframework.http.converter.xml.Jaxb2RootElementHttpMessageConverter.writeToResult(Jaxb2RootElementHttpMessageConverter.java:185) [spring-web-4.3.9.RELEASE.jar:4.3.9.RELEASE]
        at org.springframework.http.converter.xml.AbstractXmlHttpMessageConverter.writeInternal(AbstractXmlHttpMessageConverter.java:66) [spring-web-4.3.9.RELEASE.jar:4.3.9.RELEASE]
        at org.springframework.http.converter.AbstractHttpMessageConverter.write(AbstractHttpMessageConverter.java:227) [spring-web-4.3.9.RELEASE.jar:4.3.9.RELEASE]
        at org.springframework.web.client.RestTemplate$HttpEntityRequestCallback.doWithRequest(RestTemplate.java:882) [spring-web-4.3.9.RELEASE.jar:4.3.9.RELEASE]
        at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:650) [spring-web-4.3.9.RELEASE.jar:4.3.9.RELEASE]
        at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:613) [spring-web-4.3.9.RELEASE.jar:4.3.9.RELEASE]
        at org.springframework.web.client.RestTemplate.postForObject(RestTemplate.java:380) [spring-web-4.3.9.RELEASE.jar:4.3.9.RELEASE]
        at com.ford.gforce.rushhour.client.GatewayClient.load(GatewayClient.java:32) [bin/:na]
        at com.ford.gforce.rushhour.fatcat.run.FatcatTestGatewayRunner.warmCache(FatcatTestGatewayRunner.java:35) [bin/:na]
        at com.ford.gforce.rushhour.fatcat.run.FatcatTestGatewayRunner.lambda$1(FatcatTestGatewayRunner.java:39) [bin/:na]
        at io.reactivex.internal.operators.completable.CompletableFromAction.subscribeActual(CompletableFromAction.java:34) ~[rxjava-2.1.0.jar:2.1.0]
        at io.reactivex.Completable.subscribe(Completable.java:1635) ~[rxjava-2.1.0.jar:2.1.0]
        at io.reactivex.internal.operators.completable.CompletableSubscribeOn$SubscribeOnObserver.run(CompletableSubscribeOn.java:64) ~[rxjava-2.1.0.jar:2.1.0]
        at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:452) ~[rxjava-2.1.0.jar:2.1.0]
        at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:61) ~[rxjava-2.1.0.jar:2.1.0]
        at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:52) ~[rxjava-2.1.0.jar:2.1.0]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_121]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_121]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[na:1.8.0_121]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_121]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_121]
        at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_121]
    i've talked to some folks in the spring boot channel and they state that it could be related to what thread context class loader RxJava uses
    it shouldn't be using Tomcat's, and should be using the Application's
    does anyone have insight how i can confirm this is the problem and how to fix it?
    Joshua Street
    turns out using the IO scheduler had cached stale threads
    shutting down and starting the IO scheduler fixes the problem
    David Karnok
    For container usages, you have to manually shutdown the schedulers via Schedulers.shutdown() when the container lifecycle ends. It is also recommended you cancel any outstanding flows before that.
    Joshua Street
    thanks @akarnokd for the confirmation.
    Paul DeMarco
    hi everyone, i have a function that I'd like to return a completable on whether or not it kicked off the longer running observable correctly.
    how can I achieve this?
    Completable startNofications() {
             .flatMap(foo -> foo.setupNotification())
             .doOnNext( // Setup correctly, return completable to function)
             .subscribe(data -> ...)
    Mark Raynsford
    'lo. is there such a thing as a Subject that protects against infinite recursion? consider the case where a subscriber to a Subject publishes events to that same Subject.
    that'll almost certainly result in a StackOverflowException sooner or later. it'd be nice if it were possible to electively avoid that on the first recursion
    @io7m Dont use subjects?
    Is there some type of Observable that has two types of data? I know a pair works fine, but I'm considering using this lib in an API I'm working on, Observables would be useful, but I would feel obligated to replace existing methods like onAdd(Consumer<Type> listener) with observable equivalents. In which case, I would end up doing a ton of pairing and then unpairing, which isn't something I really want to do(performance and more work for API user).