These are chat archives for ReactiveX/RxJava

30th
Jun 2017
David Karnok
@akarnokd
Jun 30 2017 08:05
In RxJava, we use a different style of Schedulers. Our schedulers are designed as a pair of Scheduler and Worker. Each Scheduler has a createWorker() method that returns a Worker that is guaranteed to be FIFO for non-timed tasks. observeOn runs on such worker and thus the item event order is preserved (with the option to make sure errors don't cut ahead as well). I.e., Observable.just("Hello", "world").observeOn(Schedulers.computation()).subscribe(System.out::println); will print the two words in order on the same thread. If you are on 1.x then Schedulers.from() is an option for having a single threaded scheduler. in 2.x there is the Schedulers.single(). I have a 3 part series about RxJava's Schedulers.
Joshua Street
@jjstreet
Jun 30 2017 19:00
Hey guys, I'm having a problem with using RxJava in my Spring Boot application. The stack trace is:
java.lang.IllegalStateException: Illegal access: this web application instance has been stopped already. Could not load [META-INF/services/javax.xml.bind.JAXBContext]. The following stack trace is thrown for debugging purposes as well as to attempt to terminate the thread which caused the illegal access.
    at org.apache.catalina.loader.WebappClassLoaderBase.checkStateForResourceLoading(WebappClassLoaderBase.java:1305) [tomcat-embed-core-8.5.15.jar:8.5.15]
    at org.apache.catalina.loader.WebappClassLoaderBase.getResource(WebappClassLoaderBase.java:986) [tomcat-embed-core-8.5.15.jar:8.5.15]
    at javax.xml.bind.ContextFinder.find(ContextFinder.java:432) [na:1.8.0_121]
    at javax.xml.bind.JAXBContext.newInstance(JAXBContext.java:641) [na:1.8.0_121]
    at javax.xml.bind.JAXBContext.newInstance(JAXBContext.java:584) [na:1.8.0_121]
    at org.springframework.http.converter.xml.AbstractJaxb2HttpMessageConverter.getJaxbContext(AbstractJaxb2HttpMessageConverter.java:111) [spring-web-4.3.9.RELEASE.jar:4.3.9.RELEASE]
    at org.springframework.http.converter.xml.AbstractJaxb2HttpMessageConverter.createMarshaller(AbstractJaxb2HttpMessageConverter.java:50) [spring-web-4.3.9.RELEASE.jar:4.3.9.RELEASE]
    at org.springframework.http.converter.xml.Jaxb2RootElementHttpMessageConverter.writeToResult(Jaxb2RootElementHttpMessageConverter.java:185) [spring-web-4.3.9.RELEASE.jar:4.3.9.RELEASE]
    at org.springframework.http.converter.xml.AbstractXmlHttpMessageConverter.writeInternal(AbstractXmlHttpMessageConverter.java:66) [spring-web-4.3.9.RELEASE.jar:4.3.9.RELEASE]
    at org.springframework.http.converter.AbstractHttpMessageConverter.write(AbstractHttpMessageConverter.java:227) [spring-web-4.3.9.RELEASE.jar:4.3.9.RELEASE]
    at org.springframework.web.client.RestTemplate$HttpEntityRequestCallback.doWithRequest(RestTemplate.java:882) [spring-web-4.3.9.RELEASE.jar:4.3.9.RELEASE]
    at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:650) [spring-web-4.3.9.RELEASE.jar:4.3.9.RELEASE]
    at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:613) [spring-web-4.3.9.RELEASE.jar:4.3.9.RELEASE]
    at org.springframework.web.client.RestTemplate.postForObject(RestTemplate.java:380) [spring-web-4.3.9.RELEASE.jar:4.3.9.RELEASE]
    at com.ford.gforce.rushhour.client.GatewayClient.load(GatewayClient.java:32) [bin/:na]
    at com.ford.gforce.rushhour.fatcat.run.FatcatTestGatewayRunner.warmCache(FatcatTestGatewayRunner.java:35) [bin/:na]
    at com.ford.gforce.rushhour.fatcat.run.FatcatTestGatewayRunner.lambda$1(FatcatTestGatewayRunner.java:39) [bin/:na]
    at io.reactivex.internal.operators.completable.CompletableFromAction.subscribeActual(CompletableFromAction.java:34) ~[rxjava-2.1.0.jar:2.1.0]
    at io.reactivex.Completable.subscribe(Completable.java:1635) ~[rxjava-2.1.0.jar:2.1.0]
    at io.reactivex.internal.operators.completable.CompletableSubscribeOn$SubscribeOnObserver.run(CompletableSubscribeOn.java:64) ~[rxjava-2.1.0.jar:2.1.0]
    at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:452) ~[rxjava-2.1.0.jar:2.1.0]
    at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:61) ~[rxjava-2.1.0.jar:2.1.0]
    at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:52) ~[rxjava-2.1.0.jar:2.1.0]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_121]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_121]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[na:1.8.0_121]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_121]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_121]
    at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_121]
i've talked to some folks in the spring boot channel and they state that it could be related to what thread context class loader RxJava uses
it shouldn't be using Tomcat's, and should be using the Application's
does anyone have insight how i can confirm this is the problem and how to fix it?
Joshua Street
@jjstreet
Jun 30 2017 19:18
turns out using the IO scheduler had cached stale threads
shutting down and starting the IO scheduler fixes the problem
David Karnok
@akarnokd
Jun 30 2017 20:13
For container usages, you have to manually shutdown the schedulers via Schedulers.shutdown() when the container lifecycle ends. It is also recommended you cancel any outstanding flows before that.
Joshua Street
@jjstreet
Jun 30 2017 21:24
thanks @akarnokd for the confirmation.
Paul DeMarco
@pauldemarco
Jun 30 2017 22:27
hi everyone, i have a function that I'd like to return a completable on whether or not it kicked off the longer running observable correctly.
how can I achieve this?
Completable startNofications() {
  longRunningObservable
         .flatMap(foo -> foo.setupNotification())
         .doOnNext( // Setup correctly, return completable to function)
         .map(otherwork)
         .subscribe(data -> ...)
}