These are chat archives for ReactiveX/RxJava

21st
Feb 2016
Derek Perez
@perezd
Feb 21 2016 18:06
Newb here, I have a merged collection of observables that are all doing async work and ultimately emit objects that have a consistent tag (like: thingA, thingB) but have unique values associated with those tags. Ultimately, I'd like to have a Map that has thingA->[data,data], thingB->[data,data]
I should also mention I am using buffers
so I have mergedIo.buffer(100).somethingThatMovesThroughBufferThatIsMapFriendly()
need help with that last part :)
buffer is semi-reducing here.
which I don't really want.
window might be better, but I can't really tell.
Any thoughts?
I basically want buffers of maps
that I subscribe to
maps will have n keys that point to m values per buffer from the thing I'm subscribed to.
the root issue is, the buffer is ambiguous, it contains objects with thingA and thingB and I want to split them out into their respective map keys
Dorus
@Dorus
Feb 21 2016 18:26
Perhaps you can split thingA and thingB before you call buffer? GroupBy might help.
Then buffer the grouped observables, apply the reduce function to the result and merge it back together.
Derek Perez
@perezd
Feb 21 2016 18:26
so I can buffer a GroupBy? I am having a hard time understanding what GroupBy does in practice...it feels right but this GroupedObservable thing is messing meup
so, if I buffer after a groupBy does it make a buffer per group?
Dorus
@Dorus
Feb 21 2016 18:27
Also, difference between Buffer and Window is that Window allows you to process values 1 at the time, so you wont need to store 100 values in memory before you start processing.
However, if you need to randomly access all 100 values window wont work.
So then you need Buffer.
Derek Perez
@perezd
Feb 21 2016 18:27
well its the diff between collection and observable
I don't need random access realy.
Dorus
@Dorus
Feb 21 2016 18:28
Then Window is superior
Derek Perez
@perezd
Feb 21 2016 18:28
so groupBy -> window -> forEach
Dorus
@Dorus
Feb 21 2016 18:28
Well, GroupBy goes from Observable<T> to Observabel<GroupedObservable<T>>
Derek Perez
@perezd
Feb 21 2016 18:28
what does forEach receive at this point?
Dorus
@Dorus
Feb 21 2016 18:28
So you call source.GroupBy(...).flatmap(e -> e.buffer())
Derek Perez
@perezd
Feb 21 2016 18:28
yeah, whats confusing to me is it seems like it wants me to subscribe in my subscription
GroupedObservable has a buffer method?
its a freaking observable
ok yes
mind blown.
OK let me try this.
Dorus
@Dorus
Feb 21 2016 18:29
GroupedObservable is the same as Observable, only it has a key value.
Same with Window. Only slightly easier Observable<T> to Observabel<Observable<T>>
Buffer goes from Observable<T> to Observable<List<T>>
See the similarities?
Derek Perez
@perezd
Feb 21 2016 18:32
yeah
this is exactly what I want
Dorus
@Dorus
Feb 21 2016 18:32
^_^
Derek Perez
@perezd
Feb 21 2016 18:32
except eventually I want to map it
but that should be doable.
oh wait
I don't need to anymore
because groupBy first
thats the key here I think
  • flatmap to buffer then.
Dorus
@Dorus
Feb 21 2016 18:33
flatMap does that. flatMap can both map and flatten the observable
Derek Perez
@perezd
Feb 21 2016 18:33
when I do that flatMap, how do I retain key information?
because I still care about the tags like thingA
or I don't care, they are sorted at this point
Dorus
@Dorus
Feb 21 2016 18:34
source.GroupBy(...).flatmap(e -> e.key)
Derek Perez
@perezd
Feb 21 2016 18:34
sorry, I mean, I kinda want a Map with a key and a list of the buffer
like thingA->buffer
via window
so window will get like [thingA,thingB,thingB,thingA]
and I want thingA->(2 items), thingB->(2 items) to be what I receive from a window
Dorus
@Dorus
Feb 21 2016 18:35
After you use group you get [thingA, thingA], [thingB, thinbB]
Derek Perez
@perezd
Feb 21 2016 18:35
right, thats good. but how do I know I have a thingA
Dorus
@Dorus
Feb 21 2016 18:36
groupedObserbable.key
getKey() actually
Do you want to take 100 items, and count the number of thingA and thingB? Or do you want to emit every100 times you get thingA and eveyr 100 times you get thingB?
Because in the first case, you need to use Window before groupBy
Derek Perez
@perezd
Feb 21 2016 18:39
I want 100 grouped things to do
Dorus
@Dorus
Feb 21 2016 18:39
In that case source.Window(100).flatMap(win -> win.groupBy(...).flatMap(grp -> grp.Count().map(count -> Tuple.create(grp.getKey(), count)))
Derek Perez
@perezd
Feb 21 2016 18:40
lol I'm doing this in Java 7
so no lambdas for me right now
yet
so it felt wrong to receive an Observable<> in a Func1
Dorus
@Dorus
Feb 21 2016 18:40
Yeah, convert that :)
Derek Perez
@perezd
Feb 21 2016 18:40
thats what I was grappling with
but clearly its fine
Dorus
@Dorus
Feb 21 2016 18:40
But it's 10x longer if i write it in Java 7 style
Derek Perez
@perezd
Feb 21 2016 18:40
oh yeah I get it
I'm doing android
and I'm trying to integrate retrolambda, but my build tool is fighting me
Dorus
@Dorus
Feb 21 2016 18:41
There was something to get Lambdas on Android
Derek Perez
@perezd
Feb 21 2016 18:41
yeah retrolambda
but it requires build system hacks
Dorus
@Dorus
Feb 21 2016 18:41
Ah yeah, retrolambda, there are people on this channel that know how that works. Not me
Derek Perez
@perezd
Feb 21 2016 18:41
yeah, I know how it works I am just blocked by my build tool
so flatMap func1<Observable<T>, GroupedObservable<String, T>> ?
looks right ?
Dorus
@Dorus
Feb 21 2016 18:44
We got 3 lambdas in there, not sure which one you mean.
Derek Perez
@perezd
Feb 21 2016 18:44
the first one
win -> groupBy
Dorus
@Dorus
Feb 21 2016 18:45
win -> win.groupBy? That one ends in Tuple<String, int>
Derek Perez
@perezd
Feb 21 2016 18:45
meant the type sig of that lambda
Dorus
@Dorus
Feb 21 2016 18:45
win -> win.groupBy(...).flatMap(grp -> grp.Count().map(count -> Tuple.create(grp.getKey(), count))
Derek Perez
@perezd
Feb 21 2016 18:45
win -> win.groupBy(...) that part
lambda sig would be Func1<Observable<T>, GroupedObservable<String, T>>
yes?
assuming string keys
Dorus
@Dorus
Feb 21 2016 18:46
Func1<Observable<T>, Observable<Tuple<String, T>>>
Derek Perez
@perezd
Feb 21 2016 18:46
that can't be right
wait you are doing ALL of that
oh
in one lambda
my bad
lost track
so you are flatMapping in a flatMap ?
Dorus
@Dorus
Feb 21 2016 18:47
And grp -> grp.Count().map(count -> Tuple.create(grp.getKey(), count) is Func1<GroupedObservable<String, T>>, Observable<Tuple<String, T>>>
Ther's really only one flatMap
Derek Perez
@perezd
Feb 21 2016 18:48
at the top level
Dorus
@Dorus
Feb 21 2016 18:49
oh right, yes
I even lost count of one
Derek Perez
@perezd
Feb 21 2016 18:49
:)
I think I actually want a multiMap here
because key->[value]
jeez this is hard to follow
Dorus
@Dorus
Feb 21 2016 19:01
Well, lets take a few steps.
We got source as Observable<T>
Next we call .Window(100) as Observable<Observable<T>>
.flatMap(win win is of type Observable<T>
-> win.groupBy(...) Now we have an Observable<GroupedObservable<T>>
.flatMap(grp grp is GroupedObservable<T>
-> grp.Count() is Observable<int>
.map(count count as int
-> Tuple.create(grp.getKey(), count) result Tuple<string, int>
) map here gives us Observable<Tuple<string, int>>
) flatMap here goes from GroupedObservable<Observable<Tuple<string, int>>> to Observable<Tuple<string, int>>
And finally the second flatMap goes from Observable<Observable<Tuple<string, int>>> to Observable<Tuple<string, int>>
Derek Perez
@perezd
Feb 21 2016 19:02
Gotcha, yes. wow.
I think I am able to make this make sense.
will post my solution here ina sec
this is open source so :)
Dorus
@Dorus
Feb 21 2016 19:02
Aaaand i missed a bracket in the end
Derek Perez
@perezd
Feb 21 2016 19:02
I think for my needs collect() simplifies things.
GroupedObservable.collect() should be able to get me an Observable<Map<>>
Dorus
@Dorus
Feb 21 2016 19:05
Why collect? You only wanted to count right?
source
    .Window(100)
    .flatMap(win -> win.groupBy(...)
        .flatMap(grp -> grp.count()
            .map(count -> Tuple.create(grp.getKey(), count))
        )
    )
That should be more readable
Derek Perez
@perezd
Feb 21 2016 19:05
no I actually want the real objects
sorry, I was shorthanding
ultimately I want a map of thingA->[obj, obj]
like a hashmap
Dorus
@Dorus
Feb 21 2016 19:06
ah in that case grp.count() is slightly more complicated ;)
But you knew that already.
Derek Perez
@perezd
Feb 21 2016 19:06
yes :)
but this has been helpful nonetheless :)
Dorus
@Dorus
Feb 21 2016 19:06
Anyway the important part is that reduce only emit 1 value.
Derek Perez
@perezd
Feb 21 2016 19:08
which is OK because its in a flatMap
Dorus
@Dorus
Feb 21 2016 19:08
Oh yeah, you could use scan too if you wanted, but that's a bit weird in combination with window before
Derek Perez
@perezd
Feb 21 2016 19:08
the only problem I have is how to think about a GroupedObservable in a collect
I assume it just calls onNext n times
but then, how do I adapt that
Dorus
@Dorus
Feb 21 2016 19:09
???
Derek Perez
@perezd
Feb 21 2016 19:09
so I have groupBy().collect9)
()
window().flatMap(w -> w.groupBy().collect() )
Dorus
@Dorus
Feb 21 2016 19:09
more like groupBy().flatMap(grp -> grp.collect())
Derek Perez
@perezd
Feb 21 2016 19:09
oh fuck
OK
yes makes more sense
Dorus
@Dorus
Feb 21 2016 19:10
just replace grp.count() in my previouse example with grp.collect(), the rest stays the same.
Derek Perez
@perezd
Feb 21 2016 19:10
so I collect() and then map it again?
collect() gets a list right?
or just say toList here()
Dorus
@Dorus
Feb 21 2016 19:12
 source
    .Window(100)
    .flatMap(win -> win.groupBy(...)
        .flatMap(grp -> grp.collect(() -> new hashMap(), (map, e) -> map.add(e))
            .map(map -> Tuple.create(grp.getKey(), map))
        )
    )
Derek Perez
@perezd
Feb 21 2016 19:13
I think you mean list on collect right?
Dorus
@Dorus
Feb 21 2016 19:14
hashmap, list, whatever you prefer :)
Derek Perez
@perezd
Feb 21 2016 19:15
how do you have grp.getKey() in scope here?
is map a sibling to flatMap here?
or nested in flatMap
Dorus
@Dorus
Feb 21 2016 19:15
Because it's part of the same lambda
Derek Perez
@perezd
Feb 21 2016 19:15
OK its just formatted confusingly
Dorus
@Dorus
Feb 21 2016 19:16
ah sorry
Derek Perez
@perezd
Feb 21 2016 19:16
you mean call map on the collect output
Dorus
@Dorus
Feb 21 2016 19:16
Count the brackets, i'm pretty sure they're correct now.
Derek Perez
@perezd
Feb 21 2016 19:17
yeah I think that makes sense
this is some of the ugliest code I've ever seen lol
lambdas are essential
Dorus
@Dorus
Feb 21 2016 19:18
Only because you're not using lambdas :(
Derek Perez
@perezd
Feb 21 2016 19:18
yeah
not blaming you!
your stuff looks great
mine looks like trash :)
I'll show you in a moment
Dorus
@Dorus
Feb 21 2016 19:19
Once you've used lambdas you can't understand how you ever wrote code without :P
Derek Perez
@perezd
Feb 21 2016 19:19
yeah, I've been using them in another non-android project
they are great
Dorus
@Dorus
Feb 21 2016 19:19
Then get retrolamdas working ^_^
Derek Perez
@perezd
Feb 21 2016 19:19
yeah I'm working on it :P
need bazel.io to support a minor change
they don't let me compile jars in JDK8 if its for android
which is dumb
its a trivial fix
Dorus
@Dorus
Feb 21 2016 19:20
I like lambdas in JS and C# even more, in Java i'm fighting the type system half the time.
Derek Perez
@perezd
Feb 21 2016 19:20
but politics
so map() creates an observable for me right?
I just give it the real value
Dorus
@Dorus
Feb 21 2016 19:21
map doesn't create observables, it's type is (T -> R, Observable<T>) -> Observable<R>
grp is an GroupedObservable<String, T>
collect changes that to Observable<List<T>>
map changes that to Observable<Tuple<String, List<T>>>
Derek Perez
@perezd
Feb 21 2016 19:23
right so I don't need to return an observable in my map right?
Dorus
@Dorus
Feb 21 2016 19:23
No, you write the T -> R function
map takes care of the Observable.
Derek Perez
@perezd
Feb 21 2016 19:24
right
Dorus
@Dorus
Feb 21 2016 19:25
Just like Collect starts off with an Observable<T>, you write Unit -> List<T> and List<T> -> Unit and Collect hands you Observable<List<T>>.
Unit here is an empty parameter like void ^_^
I mean, either no parameters, or void return type.
Remember when we write source.map(e -> e + 1), map actually took 2 parameters. The observable (source) and the function int->int.
Derek Perez
@perezd
Feb 21 2016 19:28
holy shit it compiles finally
Dorus
@Dorus
Feb 21 2016 19:28
yay!
Derek Perez
@perezd
Feb 21 2016 19:29
I don't think its right, but let me show you anyhow
WDYT
Dorus
@Dorus
Feb 21 2016 19:30
wtf.java -> Striking name :)
Derek Perez
@perezd
Feb 21 2016 19:30
(background, I am getting a bz2 CSV file, and tranforming the file into an emitter per line in the CSV file, thats the first part)
the idea is, we grab a batch of 100 lines (from either CSV file that we are processing) and batch them into work.
the files are unrelated in anyway
its just parallel I/O
Dorus
@Dorus
Feb 21 2016 19:32
Why group them per 100 lines?
Derek Perez
@perezd
Feb 21 2016 19:32
arbitrary
Dorus
@Dorus
Feb 21 2016 19:32
yeah but, why group them per N lines?
Derek Perez
@perezd
Feb 21 2016 19:32
RxJava was complaining about backpressure
well one actual example....I am going to turn a batch into a query
where I write up to 100 things to a DB at a given time.
progressive transactions
because the files are massive.
so I am doing this to batch bulk upload
to a db
Dorus
@Dorus
Feb 21 2016 19:35
Wait, your code ends with forEach instead of subscribe
Derek Perez
@perezd
Feb 21 2016 19:36
is that bad?
I thought forEach was just a shorthand for subscribing
Dorus
@Dorus
Feb 21 2016 19:37
Oh right, looks like it is.
I always prefer to have a subscription at hand
Derek Perez
@perezd
Feb 21 2016 19:37
the idea is that forEach gets a batch of Map<String, List<Row>>
and it can do some DB I/O
Dorus
@Dorus
Feb 21 2016 19:37
never really use blocking observables either.
Derek Perez
@perezd
Feb 21 2016 19:37
thats what its all for
what is blocking here
I may have accidentally blocked here
is it the bz2 fetch part?
Dorus
@Dorus
Feb 21 2016 19:38
Not sure if we got blocking observables here. Dont think we do even. So just ignore me :P
I was looking up what forEach did.
And i found the blockingObservabe forEach first, that's a different functions apparently.
Derek Perez
@perezd
Feb 21 2016 19:39
interesting
I probably do need subscribe actually
my process just exits
which means something needs to hang
Dorus
@Dorus
Feb 21 2016 19:39
Oooh
That's the nature of Observables
Derek Perez
@perezd
Feb 21 2016 19:40
when using a forEach...
Dorus
@Dorus
Feb 21 2016 19:40
You're using an async programming library
Derek Perez
@perezd
Feb 21 2016 19:40
it shoudl block right?
Dorus
@Dorus
Feb 21 2016 19:40
it tends to run things async ;)
Derek Perez
@perezd
Feb 21 2016 19:40
no I mean forEach
Derek Perez
@perezd
Feb 21 2016 19:40
right, it should be.
but it appears not to be.
Dorus
@Dorus
Feb 21 2016 19:41
Nope, there are two versions of forEach.
You need to use toBlocking( ) to get the blocking one.
Derek Perez
@perezd
Feb 21 2016 19:41
call that after forEach()?
Dorus
@Dorus
Feb 21 2016 19:41
no, right before.
forEach doesn't return anything.
Anyway the preferable way would be to just let the Observable run in the background
And keep your thread alive in other ways/
Derek Perez
@perezd
Feb 21 2016 19:42
I'm already in a background thread, so its OK.
Dorus
@Dorus
Feb 21 2016 19:42
You can subscribe and use the onCompleted() function on subscribe to indicate your observable is done.
Derek Perez
@perezd
Feb 21 2016 19:42
yeah, interesting subscribe appeared to block.
that was the change I made most recently that made this behavior happen.
Dorus
@Dorus
Feb 21 2016 19:43
subscribe shouldn't be blocking, but it depend on how your observable is implemented.
Derek Perez
@perezd
Feb 21 2016 19:43
but yeah, gross right? JDK8 would clean this up
Dorus
@Dorus
Feb 21 2016 19:43
FetchBz2Observable.create in this case.
Derek Perez
@perezd
Feb 21 2016 19:44
hmm thats not it.
it still exits.
Dorus
@Dorus
Feb 21 2016 19:44
You mean exits because your program terminate?
Derek Perez
@perezd
Feb 21 2016 19:44
I think so
where should an observeOn be?
Dorus
@Dorus
Feb 21 2016 19:45
Yeah that's normal. Background threads do not keep your application alive.
Derek Perez
@perezd
Feb 21 2016 19:45
well I am using android to ping an intent to my background service
should it should stay alive
Dorus
@Dorus
Feb 21 2016 19:45
observeOn? Normally you use it as little and as late as possible.
Derek Perez
@perezd
Feb 21 2016 19:45
or I should say, it was staying alive.
oh I wanted to schedule off to my I/O threads
does the placement of that in my observable chain mean anything?
Dorus
@Dorus
Feb 21 2016 19:46
have you tried subscribeOn(scheduler.io)?
Derek Perez
@perezd
Feb 21 2016 19:46
yeah thats what I have
check the code
its near the top
my question is, does it matter when that gets called
Dorus
@Dorus
Feb 21 2016 19:47
I can only find .observeOn(Schedulers.io())
Derek Perez
@perezd
Feb 21 2016 19:47
wait do I have this backwards?
I want my observable work to happen in the io pool
like the fetching and stuff.
ironically tho this is all async.
Dorus
@Dorus
Feb 21 2016 19:47
subscribeOn can only be used once, and decide what the first thread is where you subscribe on.
all the code to the observables you see is there
Dorus
@Dorus
Feb 21 2016 19:47
observerOn is used to schedule work after it
Derek Perez
@perezd
Feb 21 2016 19:47
if you are curious
Best to read that page, it's difficult to explain right, picture says it all
Derek Perez
@perezd
Feb 21 2016 19:48
I got rid of that observeOn and it didn't change anything
I'm gonna leave it out for now
Dorus
@Dorus
Feb 21 2016 19:49
If anything you want subscribeOn(scheduler.io), but if you're already in a background thread and subscribe() returns instantly, i dont think you need it. Probably FetchBz2Observable.create is async also right?
Derek Perez
@perezd
Feb 21 2016 19:49
yeah
it uses async
everything is async so its not really worth worrying about.
and I'm not on a UI thread anyhow
Dorus
@Dorus
Feb 21 2016 19:50
observeOn is for example, used right before subscribe to switch the final step to say, the UI thread.
Derek Perez
@perezd
Feb 21 2016 19:50
fucking A, it works!
just verified
        sub.onError(ex);
        sub.onCompleted();
That's not valid
You need to call one or the other. Not both.
Derek Perez
@perezd
Feb 21 2016 19:51
oh good to know
thanks!
Dorus
@Dorus
Feb 21 2016 19:52
onCompleted is probably swallowed if you use the right factory methods.
But best to not call it
Actually it might be a good idea to use Observable.create there.
instead of implementing OnSubscribe directly.
Oh wait, you do that elsewhere. Nevermind :)
Derek Perez
@perezd
Feb 21 2016 20:04
so wait, should I never call onCompleted ever?
Dorus
@Dorus
Feb 21 2016 20:05
Not after onError
An observable makes zero or more onNext calls, followed by a single onError or onCompleted. That's the contract.
Also onError and onCompleted are optional
they both indicate all work is done and resources can be (should be) released.
Derek Perez
@perezd
Feb 21 2016 20:07
yeah I like that at least
ok will fx
any other issues you may have spotted while you're there ? :D
Dorus
@Dorus
Feb 21 2016 20:08

Anyway the factory method Observable.create ensures your new observable follows the contract, so if you call onCompleted or onNext after you already called onError, it will silently swallows them.
Derek Perez
@perezd
Feb 21 2016 20:08
oh thats why thats happening then
Dorus
@Dorus
Feb 21 2016 20:08
.serialize does the same.
I also noticed you call onCompleted inside onResponse. That pretty much makes it a single item observable.
(I wasn't sure if that was intended)
Derek Perez
@perezd
Feb 21 2016 20:13
yeah it is
it only does one thing
Dorus
@Dorus
Feb 21 2016 20:13
Found another one: https://gitlab.com/perezd/ffruit/blob/master/java/org/fallingfruit/ffruit/util/observables/CsvRowEmitterObservable.java
} catch (IOException ex) {
  sub.onError(ex);
} finally {
  sub.onCompleted();
}
Still, in this case i'm not sure if i would change it, as onCompleted is ignored anway.
Else you need to add some guard that checks if onError has been called.
There is another thing i would add here anyway
namely, check if sub is still subscribed inside the while loop.
while (!sub.isUnsubscribed()
       && (row = reader.readNext()) != null) {
Actually if you replace catch (IOException ex) with catch (Exception ex), you can move onCompleted inside the try block.
Derek Perez
@perezd
Feb 21 2016 20:59
appreciate all this!!
Dorus
@Dorus
Feb 21 2016 21:01

here's an example of how you can make an IO reader:

https://github.com/LeeCampbell/RxCookbook/tree/master/IO/Disk

C# but it explains the concepts pretty nice.

Derek Perez
@perezd
Feb 21 2016 21:07
This one is a little weird mainly because I'm adapting some really incompatible concepts
bzip -> csv
bzip I can't do iteratively
it has to be fully decompressed
and then I can stream it
so thats why its a little clunky