These are chat archives for ReactiveX/RxJava

26th
Apr 2017
jonefee wang
@jonefeewang
Apr 26 2017 15:11

hi,folks, i have a question, it is simple business logic flow:

check whether an employee in multiple departments, department and employee relationship is in cache, first check whether the relationships exists in cache, if exist, check whether
employee belong it, in case of not in cache, get it from database, and check the relationship against the employee and then save the department info to cache.

this is the code:


public Observable  isEmployeeInDepartment(List<Long> departmentIds, long employeeId){

         Observable  departmentInfoExsitInCache= checkDepartmentInfoFromCache(...).share();   //this observable will resolve twice, and cause unnecessary cache access

         Observable  departInfoNotInCache = departmentInfoExsitInCache.filter(...);

         Observable  departmentInfoFromDb=departInfoNotInCache.flatMap(departmentIds->checkFromDb()).share(); //this observable will resolve twice, and cause unnecessary database access

         Observable<Long> saveResult=departmentInfoFromDb.flatMap(departmentInfo->saveToCache());

         Observable<Long> departInfoInCache = departmentInfoExsitInCache.filter(...);

         return departInfoInCache.check(userId).merge( departmentInfoFromDb.check(userId)).doOnCompleted(saveResult.subscribe());
}

the problem is that departmentInfoExsitInCache and saveResult will be resolved twice by once client method subscribe.

jonefee wang
@jonefeewang
Apr 26 2017 15:17
I found that once remove the save subscription code .doOnCompleted(saveResult.subscribe()), it will become normal and only resolve once . Is there anything wrong with this code ?
Mark Elston
@melston
Apr 26 2017 18:26

@jonefeewang,Two things I noticed.

  1. departInfoNotInCache is not needed. You can go directly to departmentInfoFromDb using something like this:

         Observable  departmentInfoFromDb = 
             departmentInfoExsitInCache
                  .filter(...)
                  .flatMap(departmentIds->checkFromDb())
                  .share(); 
  2. You are not passing a function to the subscribe call in your doOnCompleted call. The only effect that would have is to force a second subscription to departmentInfoExsitInCache and all subsequent observables. If these are cold observables then you will get repeated calls to the operations you provided for them. I would have thought that the share used for departmentInfoExsitInCacheand departmentInfoFromDb would have prevented this for these observables, though not for those following them in the processing chain.

Zak Taccardi
@ZakTaccardi
Apr 26 2017 21:06
I'm using .startWith() in a .share() stream. I need .startWith() to return the value from a function any time this stream is restarted (subscriber count goes from 0 to 1). However, this value is cached from the very first subscription and re-used, even after the subscriber count has gone from 1-0 and back to 0-1 again
nikslushkin
@nikslushkin
Apr 26 2017 22:20

Hi all!
Please help me with one problem.

I’m using RxRelay for RxBus implementation. So if I subscribe to relay and use async schedulers then right on next line of code hasObservables return false.

There is test that fails

public class RelayTest {

    @NonNull private final Relay<Object> relay = PublishRelay.create().toSerialized();

    @Test
    public void testSubscribe() {
        relay
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.trampoline())
                .subscribe();

        assertTrue(relay.hasObservers()); // fail
    }
}
David Karnok
@akarnokd
Apr 26 2017 22:25
That's because of subscribeOn changes the thread where the subscribe() is called on the Relay. The actual subscription thus will happen at a non-deterministic later time and relay still doesn't know about it. Add Thread.sleep(100) which is usually enough to detect the observers.
nikslushkin
@nikslushkin
Apr 26 2017 22:26
@akarnokd is there some stable solution? ((
David Karnok
@akarnokd
Apr 26 2017 22:28
Why do you use subscribeOn in the first place? I don't know PublishRelay but judging from the name, it has no subscription side effect thus subscribing to it on any thread is the same.
Plus observeOn(Schedulers.trampoline()) has no practical effect because you already serialized the relay, observeOn trampolines internally and otherwise you can't get back to the test thread with a trampoline scheduler.
nikslushkin
@nikslushkin
Apr 26 2017 22:34
ok I just want to handle some event and than flatMap it to another Observable that should be processed in async thread. What do I wrong?
David Karnok
@akarnokd
Apr 26 2017 22:37
You shouldn't be testing PublishRelays basic functionality I guess.
What is your source, what is the result of the Function of flatMap?
nikslushkin
@nikslushkin
Apr 26 2017 22:45
This test just to check its behavior. My problem is if I subscribe to relay and send event on next line it will not be processed. I'm using this implementation of RxBus https://gist.github.com/igoticecream/47b0946996fe9a88407dd564600fe3be
And sample code of usage:
commonRxBus.receive(SomeEvent.class)
        .flatMap(ignored -> longProcessWorker.doWork())
        .subscribeOn(workScheduler)
        .observeOn(mainScheduler)
        .subscribe();

commonRxBus.post(new SomeEvent()); // will not be processed
David Karnok
@akarnokd
Apr 26 2017 22:57
Apart from using a bus, you should probably move the subscribeOn onto the doWork: .flatMap(ignored -> longProcessWorker.doWork().subscribeOn(workScheduler)).observeOn(mainScheduler).subscribe()
nikslushkin
@nikslushkin
Apr 26 2017 23:07
@akarnokd it works, thank you so much