These are chat archives for ReactiveX/RxJava

26th
May 2016
Vadym
@dragantl
May 26 2016 00:27

@Dorus I think I've got it but I'd still love to hear your thoughts. I've created a SensorHolder object that holds the found sensor types that depend upon location to be triggered. I do the following:

Observer<SensorHolder> sensorObserver = Observer.combineLatest(
   sensor1, sensor2, sensor3, sensor4, SensorHolder::new);

Observer<SensorPoint> pointerObserver = location.combineWithLatest(sensorObserver, (location, holder) -> new SensorPoint(
   location, holder.sensor1, holder.sensor2, holder.sensor3, holder.sensor4));

That seems to work pretty well. Shame for the useless holder though but at least it works. Do you think there is a way to optimize it further?

Dorus
@Dorus
May 26 2016 10:08
@dragantl If you need multiple withlatestfrom, do something like source.zip(source.withLatestFrom(b, (a, b) => b), source.withLatestFrom(c, (a, c) => c))), zip does support a variable number of arguments right? (sorry, getting creative here)\
Or, source.withLatestFrom(b.withLatestFrom(c.withLatestFrom(d)))?
Vadym
@dragantl
May 26 2016 12:54
@Dorus nice, I'll see how that applies. One more thing, if I can ask, I had a big headache with trying to resolve a case when the window comes in empty. So when sensor1 emits something in the first window and doesn't emit anything in the second, withlatestfrom will take care of propagating the last value. However, what was actually happening was that an empty window would come in and my code would fail because I would try to flatmap the observables within the window (it would throw sequence is empty or something like that).
I tried doing something like sensor1.window(location).filter(window -> window.isEmpty().toBlocking().first).<doProcessingOfObservablesInNonEmptyWindow> but the thread handling the filter would get stuck... which is weird because that meant that isEmpty().toBlocking() was the one blocking... so that means window is not completed? And then I had to do a stupid thing because I got frustrated sensor1.window(location).flatMap(window -> window.toList()).filter(list -> list.size() > 0).map(list -> Observable.from(list)).<doProcessingOfObservables>
Dorus
@Dorus
May 26 2016 12:59
i'de start off with avoiding toBlocking
What was the original failing code?
If you use window().flatMap(window -> window....) and window is empty, it should emit nothing. So no problems....
@dragantl
Vadym
@dragantl
May 26 2016 13:09
@Dorus so failing code would happen within flatmap like so `...window(location).flatMap(window -> { Observable<Float> x = window.map(event -> event.getSensorEvent().values[0]); ... });
Dorus
@Dorus
May 26 2016 13:09
But map wont even run if window is empty. So i dont see the problem.
You're doing something else with x?
Vadym
@dragantl
May 26 2016 13:12
It would then calculate min, max, and avg from x using rxmath like so Observable<Float> xMin = MathObservable.min(x); xMax = ..., xAvg = ...; then it would zip xMin, xMax, xAvg
the equivalents of x would be done for y and z (zMin, zMax, zAvg), and those zips would be zipped again
Dorus
@Dorus
May 26 2016 13:13
Still shouldnt go wrong. The min of an empty stream is an empty stream right??
Or does it give an error?
Vadym
@dragantl
May 26 2016 13:13
so it probably fails on calculating min, max, and average
I'm not sure abou that
Dorus
@Dorus
May 26 2016 13:14
If so, try .min().catch(e => Rx.Observable.empty()) or so
(problem checking e if it's an emptyStreamException or something like that).
Vadym
@dragantl
May 26 2016 13:14
I'll test it out if math functions tolerate empty streams
Dorus
@Dorus
May 26 2016 13:14
not sure what it would throw. I dont know those functinos.
Vadym
@dragantl
May 26 2016 13:15
maybe I could do MathObservable.min(x.defaultIfEmpty(0))?
Dorus
@Dorus
May 26 2016 13:15
I'de prefer to return empty steams.
That way you can ignore the window
Vadym
@dragantl
May 26 2016 13:16
yes, you're right
Dorus
@Dorus
May 26 2016 13:16
In fact, it shouldnt even run the min/max functions...
Vadym
@dragantl
May 26 2016 13:19
I'll mess with it some more, now that i know where it fails. However, I'm just glad to have at least a working version, even if not optimized
Dorus
@Dorus
May 26 2016 13:19
window(location).flatMap(window -> window
  .scan((o, n) => new {count = o.count+1, total = o.total+n, min = min(o.min, n), max = max(o.max, n)})
  .map(e => new {avg = o.total/o.count, min = o.min, max = o.max})
I'de do something like that (speudo code and all)
Vadym
@dragantl
May 26 2016 13:22
oh hey, that's pretty good
definitely simpler logic to follow
Dorus
@Dorus
May 26 2016 13:26
Should also be possible to do window(location).flatMap(window -> Rx.obserbable.zip(window.min(), window.max(), window.average()))
Perhaps replace .min with .scan(int.max, (o, n) -> min(o, n))
I kinda forgot if scan emit something (the seed??) if it is used on an empty stream.
@dragantl
Vadym
@dragantl
May 26 2016 14:19
@Dorus I def like the scan idea but right now i'm debugging to find where exactly the issue happens. I think it will be easier to resolve it. Thanks for all the help, its much appreciated!
Dorus
@Dorus
May 26 2016 23:33
@dragantl :point_up: 25 mei 2016 22:55 is still on my list (shamelessly putting a reminder for myself here), but if you want help quicker, stackoverflow is also a good source.
dont forget to ping back here