These are chat archives for ReactiveX/RxJava

3rd
Dec 2015
Dorus
@Dorus
Dec 03 2015 10:48
@joshdurbin One more thing i was thinking about: If you use do like that you do lose the reference to the subscriber. The advantage of flatMap would be that it ties the lifetime of the inner subscription to that of the entire sequence.
I m not sure if there is a shorter way to write this, but you can also do: .flatMap(product -> otherService.doSomething(product).ignoreElements().map(__ -> product).startWith(product))
xorgate
@xorgate
Dec 03 2015 13:11
i have an android app where in debug builds i want to have .onErrorReturn(), but not in releasebuilds. What's a good way to achieve something like this?
Josh Durbin
@joshdurbin
Dec 03 2015 16:18
Yeah I ended up with this, which seems to work:
protected Observable<Product> construct() {

  Observable.concat(redisProductsCache.get(requestParameters.productNDC).bindExec(),
    productService.getByCode(requestParameters.productNDC)
      .doOnNext { Product product -> redisProductsCache.set(requestParameters.productNDC, product, PRODUCT_CACHE_TTL).subscribe() }
  ).first()
}
Dorus
@Dorus
Dec 03 2015 19:22
@joshdurbin Looks good. I understand what that function does, bit clumsy on the naming perhapts. Only thing that i'm wondering about: Why does redisProductsCache.set return an observable?
Josh Durbin
@joshdurbin
Dec 03 2015 19:40
Yeah, I’m still tinkering… part of the RX implementation of Lettuce 4.0. I believe it returns a String “OK” or ...

One other question…

I have a similar function, that I actually want cached, the previous example was more just to wrap my mind around how to achieve this. The query returns distinct values across the mongo collection, strings, but isn’t cached. This is a more reasonable candidate for some type of caching than just caching random products.

In this example getDistinctList returns Observable<String> and the cache takes string keys and values, smembers returns an Observable<String> and sadd returns an Observable<Long> (number of elements added to set).

The following works, however, only one item is placed in the cache, presumably due to the doOnNext. It is, as you would expect, the first item in the stream of strings that come from the call to productService.getDistinctList(…).

    Observable.concat(cache.smembers(CACHE_KEY).bindExec(),
      productService.getDistinctList(DISTINCT_PROPERTY_KEY)
        .doOnNext { String categoryName -> cache.sadd(CACHE_KEY, categoryName).subscribe() }).first()
Josh Durbin
@joshdurbin
Dec 03 2015 19:47
So in this case, if the stream is not available from the cache and we engage the product service, we then need to “wire tap” that stream and place each emitted String into the cache.
Via, perhaps, doOnEach?
Dorus
@Dorus
Dec 03 2015 20:08
I'm not sure if i understood all of that. But i think you are on the correct path yes.
Josh Durbin
@joshdurbin
Dec 03 2015 20:09
Yeah, so that code works, but it only inserts one String in the redis set.
But if you call productService.getDistinctList(DISTINCT_PROPERTY_KEY) and observe that stream till complete, there are about 20 or so strings in the set (if you collect to a set)
Dorus
@Dorus
Dec 03 2015 20:10
But doOnNext fires for every element in the stream
The difference with doOnEach is that it also includes onCompleted and onError
Josh Durbin
@joshdurbin
Dec 03 2015 20:10
The cached entry from the above code is the first string, which makes sense given the description of doOnNext
Oh. Interesting.
Yeah, so in that example, if you observe productService.getDistinctList(DISTINCT_PROPERTY_KEY)toList() then you get [ ‘A’, ‘B’, ‘C’ ]
But subsequent calls through that block of code yield ‘A’ only, and I observe ‘A’ in the redis set via the command line tool. So the other entries ‘B’ and ‘C’ aren’t making it into the cache.
I’ll tinker.
Dorus
@Dorus
Dec 03 2015 20:15

Yeah I ended up with this, which seems to work:

protected Observable<Product> construct() {

  Observable.concat(redisProductsCache.get(requestParameters.productNDC).bindExec(),
    productService.getByCode(requestParameters.productNDC)
      .doOnNext { Product product -> redisProductsCache.set(requestParameters.productNDC, product, PRODUCT_CACHE_TTL).subscribe() }
  ).first()
}

This wont even compile . The .doOnNext { Product part looks plain wrong.

Josh Durbin
@joshdurbin
Dec 03 2015 20:15
Hm. In groovy?
Dorus
@Dorus
Dec 03 2015 20:16
ooh
Josh Durbin
@joshdurbin
Dec 03 2015 20:16
It’s working. :-/
Dorus
@Dorus
Dec 03 2015 20:16
i was trying to read it as java :)
Josh Durbin
@joshdurbin
Dec 03 2015 20:16
Ah. That’s helpful information, yeah?! :-)
Dorus
@Dorus
Dec 03 2015 20:17
i think i get it
you use .first()
so it only reads one value
Josh Durbin
@joshdurbin
Dec 03 2015 20:18
ahhh, but that goes hand in hand with the concat in this situation, yeah?
this (or) that
Dorus
@Dorus
Dec 03 2015 20:21
What i think you need is change it to a observable to observables. Then take first and subscribe to the first observable you get.
Josh Durbin
@joshdurbin
Dec 03 2015 20:22
How do you do that?
I’ve only done or seen collection in a web handler, collect or convert the observables to a list of the same type so that list can be rendered back to the HTTP caller, or whatever you’re doing.
Dorus
@Dorus
Dec 03 2015 20:25
weird i could swear i ran into the function for that earlier today, but i cant find it now
The idea with concat was to either pull 1 value from the cache, or go to the network when that value isn't there.
But you get a random number of values, you dont know how many
so you dont know when you reach the end of the cache
Josh Durbin
@joshdurbin
Dec 03 2015 20:26
yeah, precisely
in the case where you’re referring to an object by a unique identifier, it’s fine
Dorus
@Dorus
Dec 03 2015 20:27
I'm also wondering. If you run to the end of the cache, after 10 items, do you want to check online if there are 10 more?
Completely different usecase than what i had in mind when you where asking about it before :)
Josh Durbin
@joshdurbin
Dec 03 2015 20:28
Well, these are small enough sets, that I could expect all or nothing
Dorus
@Dorus
Dec 03 2015 20:29
ok, so when it's in the cache, it's completely there?
Josh Durbin
@joshdurbin
Dec 03 2015 20:30
yeah.
Yeah, and if i remove first(), as expected, the cache becomes populated with the stream of entries coming from ‘online'.
But the full observable stream contains the cache items and the online items.
which is expected.
Dorus
@Dorus
Dec 03 2015 20:31
So we should modify the cache to emit an observable only if it contains something.
mmm
or at least detect it
switchIfEmpty
i think that'll work
    protected Observable<Product> construct() {
        redisProductsCache.get(requestParameters.productNDC).bindExec()
           .switchIfEmpty(productService.getByCode(requestParameters.productNDC)
              .doOnNext { Product product -> redisProductsCache.set(requestParameters.productNDC, product, PRODUCT_CACHE_TTL).subscribe() }
           )
    }
The trick with concat was cute, but not the solution in this case :)
Dorus
@Dorus
Dec 03 2015 20:39
The only remaining thing that makes me tingle is that you use fire-and-forget on redisProductsCache.set. That could be okay if you know it will never emit errors and is short-running.
(Also i wonder who the hell wrote that interface. Sending a "Ok" string is insane. Would be much better if it emitted the just added item and then onCompleted. Or just onCompleted. That still leaves onError to be used for errors, and would allow us to write .flatMap(item -> redisProductsCache.set(item, ...)))
I hope i didn't lose you. I tend to derail a little bit on my rants :)
Josh Durbin
@joshdurbin
Dec 03 2015 21:59
nah had to step away for a meeting