These are chat archives for ReactiveX/RxJava

19th
Oct 2017
Dakotah North
@dakotahNorth
Oct 19 2017 10:43
Not sure if this addresses your hystrix question ... but the Netflix guys point to hystrix as part of the motivation for reactive sockets. https://github.com/rsocket/rsocket/blob/master/Motivations.md
Costin Grigore
@raisercostin
Oct 19 2017 11:03
Hi, I would like some hints on what operation is needed to return the latest events from an observable. I have an Observable to watch folder changes. When a change in the folder happens I have a heavy operation that might take several minutes like rendering a pdf file. After it finishes I would like to get the new full list of all events/file changes till the current moment and again start to process them. I tried to use sample, throttleLast, reduce, sliding and thumbling but none works as expected. Maybe I don't get the parameters well or I don't get the semantics.
Costin Grigore
@raisercostin
Oct 19 2017 12:17
Actually I managed to solve it:
     val src = org.raisercostin.jedi.Locations.file("""c:\data\work2\docs-proces\images1""")

    val srcObs: Observable[Seq[FileAlterated]] = 
      src.watchFileCreated(1000).doOnEach(x=>println(s"got $x")).tumblingBuffer(Duration(1,TimeUnit.SECONDS)).filter(_.nonEmpty).doOnEach(event=>{
      println(s"$event => ignore")
      })
     srcObs.toBlocking.last
Haris Kljajić
@devharis
Oct 19 2017 18:42
Trying to do a retryWhen on a merge but it doesn't get a throwable back from merge.
Looking like this:
private Observable requestData(String token) {
        return Observable.merge(new Observable[]{
             mAssignmentRepository.sync(token),
             mFacilityRepository.sync(token),
             ...
}
requestData(token)
                .retryWhen(errors -> errors) <-- Only a object?
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.immediate())
        .subscribe;
In the retryWhen I'm planning to catch a 401 Unauthorized and retry the call.
Haris Kljajić
@devharis
Oct 19 2017 19:48
Maybe I must use a mergeDelayError? Any suggestions?