Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Dorus
    @Dorus
    @cavemansspa I'm not sure where to look at. Observables are normally never blocking anyway.
    You mean using Thread.start stuff? Well, i would prefer to have cancelation implemented and also use the Schedulers class (pass in a scheduler to those functions). But overall it's not against the Rx contract to do it like that.
    Dorus
    @Dorus
    Also, it totally IS possible to unsubscribe from customObservableBlocking. Just do customObservableBlocking.take(2). Unsubscribes after 2 items :-)
    (just that it will then ignore all subsequent calls to onNext)
    cavemansspa
    @cavemansspa
    @Dorus - yes, i was referring to the Thread.start approach. i thought that maybe the schedulers functionality came after those examples were created.
    Dorus
    @Dorus
    I have no idea who made those examples or when :)
    cavemansspa
    @cavemansspa
    looks like ben christensen
    two years ago
    Dorus
    @Dorus
    Looks like it. Like i said, they're not wrong.
    cavemansspa
    @cavemansspa
    okay thanks for input -- wasn't sure if there was a better way since a lot of stuff has been added since those were put together.
    Dorus
    @Dorus
    The most important part is to stay within the contract: Serialize calls to onNext and finish with either onCompleted or onError.
    Other than that you can call code from any thread.
    Dmitriy Zaitsev
    @DmitriyZaitsev

    Hi guys!
    I’ve faced one strange problem.

    private static Observable<List<Repository>> repositories(GitHubApi api, String user) {
      return api.getRepositories(new UserQuery(user), Sort.UPDATED, Order.ASC)
          .map(Result::response)
          .map(Response::body)
          .map(RepositoriesResponse::getItems);
      }
    @Override public Observable<List<RepositoryDto>> getUsersRepositories(String user) {
      return repositories(mApi, user)
          .doOnNext(DataRepositoryImpl::saveToCache) // WTF???
          .flatMap(Observable::from)
          .map(DomainDataMapper::toRepositoryDto)
          .toList();
      }

    This code works well. I can obtain list of repositories from github and show them on UI, but I’m wondering why method saveToCache is not invoked.

    Observable2 does emit one item, but somehow action from doOnNext does nothing
    Dmitriy Zaitsev
    @DmitriyZaitsev
    Hm.. the most interesting is that after inlining that method everything works OK. I.e. when I use doOnNext(r -> {…}) instead of doOnNext(MyClass::myMethod) the problem disappears
    cavemansspa
    @cavemansspa
    println Thread.currentThread()       
    Observable.from([1, 2, 3, 4])
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.io())
    .flatMap({
        println 'in map thread == ' + Thread.currentThread()
        Thread.sleep(500)
        return Observable.just(it + ' mapped')
        }
    )
    .toBlocking()
    .subscribe(
        { it -> println "onNext() ${it} ${Thread.currentThread()}" },
        { error -> println "onError() ${error}" },
        { println 'onCompleted' }
    )
    println '...'
    Thread[Thread-122,6,main]
    in map thread == Thread[RxCachedThreadScheduler-1,6,main]
    in map thread == Thread[RxCachedThreadScheduler-1,6,main]
    onNext() 1 mapped Thread[Thread-122,6,main]
    in map thread == Thread[RxCachedThreadScheduler-1,6,main]
    onNext() 2 mapped Thread[Thread-122,6,main]
    in map thread == Thread[RxCachedThreadScheduler-1,6,main]
    onNext() 3 mapped Thread[Thread-122,6,main]
    onNext() 4 mapped Thread[Thread-122,6,main]
    onCompleted
    ...
    in this example, i'm expecting the flatMap to each get there own thread from Schedulers.io()
    the output shows that the same thread is used i.e. [RxCachedThreadScheduler-1,6,main]
    how do you get flatMap request to use multiple threads from Schedulers.io()?
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    @cavemansspa apply subscribeOn() to Observable that you return in the flatMap, like flatMap(value -> Observable.something().subscribeOn())
    Dorus
    @Dorus
    @cavemansspa use observeOn inside flatMap.
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    @Dorus then the computation of the Observable returned in flatMap() will happen on initial (iothread in this case) and only emission down to chain after flatMap will happen on observeOn scheduler
    Dorus
    @Dorus
    yeah, it depend on the structure of your inner observalbe. Sometimes you might need subscribeOn like you mentioned.
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    yup :)
    cavemansspa
    @cavemansspa
    ok -- giving that a go.
    Dorus
    @Dorus

    You can also do

    .flatMap({
        Schedulers.io().createWorker().schedule(() -> {
        println 'in map thread == ' + Thread.currentThread()
        Thread.sleep(500)
        return Observable.just(it + ' mapped')
        })})

    (Not 100% sure if i use worker correct here, might still need to dispose it in the end somehow.)

    actually nvm, that wont even return anything :P
    cavemansspa
    @cavemansspa
    println Thread.currentThread()       
    Observable.from([1, 2, 3, 4])
    .flatMap({
        println 'in map thread == ' + Thread.currentThread()
        def val = it
        return Observable.create({ obs ->
            Thread.sleep(500)
            println 'in Observable within flatMap thread == ' + Thread.currentThread()
            obs.onNext(val + ' mapped')
            obs.onCompleted()
        }).subscribeOn(Schedulers.io())
    }
    )   
    .toBlocking()
    .subscribe(
        { it -> println "onNext() ${it} ${Thread.currentThread()}" },
        { error -> println "onError() ${error}" },
        { println 'onCompleted' }
    )
    println '...'
    Thread[Thread-133,6,main]
    in map thread == Thread[Thread-133,6,main]
    in map thread == Thread[Thread-133,6,main]
    in map thread == Thread[Thread-133,6,main]
    in map thread == Thread[Thread-133,6,main]
    in Observable within flatMap thread == Thread[RxCachedThreadScheduler-1,6,main]
    in Observable within flatMap thread == Thread[RxCachedThreadScheduler-2,6,main]
    in Observable within flatMap thread == Thread[RxCachedThreadScheduler-3,6,main]
    in Observable within flatMap thread == Thread[RxCachedThreadScheduler-4,6,main]
    onNext() 1 mapped Thread[Thread-133,6,main]
    onNext() 2 mapped Thread[Thread-133,6,main]
    onNext() 4 mapped Thread[Thread-133,6,main]
    onNext() 3 mapped Thread[Thread-133,6,main]
    onCompleted
    ...
    that does the trick
    Dorus
    @Dorus
    Or
    .flatMap({
        println 'in map thread == ' + Thread.currentThread()
        def val = it
        Observable.just(0)
            .observeOn(Schedulers.io())
            .map({ 
              Thread.sleep(500)
              println 'in Observable within flatMap thread == ' + Thread.currentThread()
              val + ' mapped'
            })
    }
    This is scala right?
    cavemansspa
    @cavemansspa
    groovy
    Dorus
    @Dorus
    ah okay, not familiar with groovy :P
    In schala you could leave out the return commands if they are the last thing in a function.
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    @cavemansspa btw, better use Observable.fromCallable() instead of create() just to not deal with Observable contract (in case you need to emit one value)
    cavemansspa
    @cavemansspa
    same with groovy. the groovyConsole makes testing these scenarios interactively very easy.
    @Dorus -- okay, will have a closer look at Observable.fromCallable
    Dorus
    @Dorus
    fromCallable is a smart one too yes.
    cavemansspa
    @cavemansspa
    @Dorus -- looks good, I should be able to pass a groovy closure to that.
    Dorus
    @Dorus
    I believe those functions should be able to take a Scheduler directly
    Or not, cant find it now :P
    Anyway i'm sure you figure it out from here.
    cavemansspa
    @cavemansspa
    @Dorus / @artem-zinnatullin -- thank you for the help.
    chijikpijik
    @chijikpijik
    what difference between fromCallable and defer with just?
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    no difference in your case
    but defer is more flexible because it allows you return any observable (but requires more code)
    chijikpijik
    @chijikpijik
    @artem-zinnatullin o, just bump RxJava to 1.0.15 and fromCallable has appear
    Dorus
    @Dorus
    oooh that's probably what i need to do too
    Artem Zinnatullin :slowpoke:
    @artem-zinnatullin
    yup, I've added it in 1.0.15