These are chat archives for ReactiveX/RxJava

6th
Feb 2018
Ozzy Osborne
@BarDweller
Feb 06 2018 20:49
Can anyone help with what I'm doing wrong here? I'm trying to have the messages in the messages Observable pop out at random intervals.. This is based off some code from stack overflow, but if I grok it right, it's trying to create one observable stream that loops that has the switchMap to cause it to wait a random amount (via the call to randomTime() that in my impl just returns '2').. and then zipWith laces the two together.. meaning each message has to wait for the next delay to pop.. except it doesn't work.. and I don't understand why...
       Observable<String> delay = Observable.just("")
                                    .switchMap(dummy -> Observable.timer(randomTime(), TimeUnit.SECONDS))
                                    .map( a -> String.valueOf(a) )
                                    .repeat();

       Observable<String> messages = Observable.just("Test") //eventually lines from a file...
                                               .repeat();

       messages.zipWith(delay, (d, msg) -> ""+d+" "+msg  ).subscribe( System.out::println );
David Karnok
@akarnokd
Feb 06 2018 22:10
How does it not work? Did you add Thread.sleep(10000) to see it produce events before the main quits?
Ozzy Osborne
@BarDweller
Feb 06 2018 22:10
The main doesn't quit at all, and nothing is emitted
it shouldn't quit tho right? I'm zipping across 2 infinite streams
David Karnok
@akarnokd
Feb 06 2018 22:12
Is this RxJava 1 code? I've tested it and works, prints Test 0 a couple of times.
Ozzy Osborne
@BarDweller
Feb 06 2018 22:12
rxjava2
I mean.. it's great to know it works for you.. you've no idea how nice it is to know my half brained understanding of this stuff is at least on the right tracks ;p
David Karnok
@akarnokd
Feb 06 2018 22:13
v2 Observable doesn't support backpressure, therefore the just.repeat will keep emitting the "Test" into the zip's buffer and never letting the delay ever execute. With Flowable, the messages will pause after 128 items which allows the delay to start executing
Ozzy Osborne
@BarDweller
Feb 06 2018 22:14
Ah.. ok.. I'm new to all this stuff =) so I should swap them all to be Flowables?
David Karnok
@akarnokd
Feb 06 2018 22:15
Yes, and switch the places of messages and delay: so it can start the timer first.
Ozzy Osborne
@BarDweller
Feb 06 2018 22:16
I shall have a bash, thank you =)
David Karnok
@akarnokd
Feb 06 2018 22:16
Also please read the wiki about the differences between the two major versions.
Ozzy Osborne
@BarDweller
Feb 06 2018 22:16
I've never used version 1 or version 2 at least never beyond very simple examples
 Flowable<String> delay = Flowable.just("")
                                    .switchMap(dummy -> Flowable.timer(randomTime(), TimeUnit.SECONDS))
                                    .map( a -> String.valueOf(a) )
                                    .repeat();

       Flowable<String> messages = Flowable.just("Test") //eventually lines from a file...
                                               .repeat();

       delay.zipWith(messages, (d, msg) -> ""+d+" "+msg  ).subscribe( System.out::println );
So I tried that... and now it just exits immediately with no output =)
David Karnok
@akarnokd
Feb 06 2018 22:21
There are a lot of tutorials out there, mostly targeting v1. The wiki should help you turning them into v2 code.
Did you put in Thread.sleep(10000)?
Ozzy Osborne
@BarDweller
Feb 06 2018 22:21
No.. why do I need it?
David Karnok
@akarnokd
Feb 06 2018 22:23
Standard Schedulers use daemon threads that won't prevent the JVM from exiting.
Ozzy Osborne
@BarDweller
Feb 06 2018 22:23
It does work with the sleep.. so that's good to know.. =)
ahh.. so I need to subscribe on the current thread somehow?
David Karnok
@akarnokd
Feb 06 2018 22:25
You need to prevent the main thread from returning prematurely by some means. If you'd have finite flows, blockingSubscribe could work, although the only way to stop the example flow is to kill the JVM.
Ozzy Osborne
@BarDweller
Feb 06 2018 22:26
yeah.. this is basically a quick app that reads lines from a file, and invokes a webservice with a line as a message, then delays randomly and repeats.. (ideally using the lines sequentially)
so when it runs out of lines from the file, ideally I want it to start over again.. it's never supposed to exit, or rather, I'm fine exiting it by killing the process
David Karnok
@akarnokd
Feb 06 2018 22:28
In that case, use blockingSubscribe().
Ozzy Osborne
@BarDweller
Feb 06 2018 22:31
ah.. of course.. =) I find a lot of this stuff is at the mo 'try this, doesn't work, repeat until find one way that works'
like.. I understand that Flowable.timer( delay, unit) produces an observable that returns 0L after the specified delay
ahh.. and now I figure out why my next change didn't work either..
David Karnok
@akarnokd
Feb 06 2018 22:35
It might be worth reading an RxJava 2 book: https://www.packtpub.com/application-development/learning-rxjava
Ozzy Osborne
@BarDweller
Feb 06 2018 22:35
Indeed.. probably a sane suggestion.
I'm playing with WebFlux, RxJava and Lagom all at the same time..
not in the same app, thankfully
but in parallel, trying to do the same thing in each