Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Costin Grigore
    @raisercostin
    Hi, I would like some hints on what operation is needed to return the latest events from an observable. I have an Observable to watch folder changes. When a change in the folder happens I have a heavy operation that might take several minutes like rendering a pdf file. After it finishes I would like to get the new full list of all events/file changes till the current moment and again start to process them. I tried to use sample, throttleLast, reduce, sliding and thumbling but none works as expected. Maybe I don't get the parameters well or I don't get the semantics.
    Costin Grigore
    @raisercostin
    Actually I managed to solve it:
         val src = org.raisercostin.jedi.Locations.file("""c:\data\work2\docs-proces\images1""")
    
        val srcObs: Observable[Seq[FileAlterated]] = 
          src.watchFileCreated(1000).doOnEach(x=>println(s"got $x")).tumblingBuffer(Duration(1,TimeUnit.SECONDS)).filter(_.nonEmpty).doOnEach(event=>{
          println(s"$event => ignore")
          })
         srcObs.toBlocking.last
    Haris Kljajić
    @devharis
    Trying to do a retryWhen on a merge but it doesn't get a throwable back from merge.
    Looking like this:
    private Observable requestData(String token) {
            return Observable.merge(new Observable[]{
                 mAssignmentRepository.sync(token),
                 mFacilityRepository.sync(token),
                 ...
    }
    requestData(token)
                    .retryWhen(errors -> errors) <-- Only a object?
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.immediate())
            .subscribe;
    In the retryWhen I'm planning to catch a 401 Unauthorized and retry the call.
    Haris Kljajić
    @devharis
    Maybe I must use a mergeDelayError? Any suggestions?
    Shashank Tomar
    @shashanktomar
    Hey Guys, wrote down this blog post, some of you might like it.
    https://proandroiddev.com/reactive-mythology-interrupt-patterns-e244979b865a
    Hristo Hristov
    @pentarex
    hey guys, I have one question. I am receiving data from bluetooth inputstream, I have two observables listenReceivedData Observable which listens all the time and I have another one which is called sendAndReceiveData Observable, which sends, waits till its received and calls next or onComplete based on some logic. My question is because the problem here is -> I have only one inputstream and when I read it the data is gone, how I can somehow read from one place and emit to those two observables
    Min Hiew
    @mhiew

    Hi everyone. How does one convert an Observable of one type to another type:
    Observable<T> to Observable<R>

    Are we supposed to use the to operator or perhaps flatMap?

    When I try using flatmap it complains about observable cannot be applied to anonymous function<Observable<T>, Observable<R>>

    Mark Elston
    @melston
    @mhiew , I may have misunderstood your question but what is the issue using Map? If you provide a function T -> R doesn't this do what you want?
    David Stocking
    @dmstocking
    Are you asking how to take an Observable<T> and make it an Observable<R>? Just call map. If you want to map from an Observable<T> to Observable<R> via multiple Observable<R> then you need flatmap.
    map (T -> R) -> R
    flatmap (T -> Observable<R>) -> R (I hope I wrote that right)
    Mark Elston
    @melston
    @dmstocking, that looks right.
    Min Hiew
    @mhiew

    Hi, I believe the issue was due to my misunderstanding of flatmap. I thought it was going to pass in `Observable<T>.

    (Observable<T> -> Observable<R>)

    But as @dmstocking mentioned it should be:
    (T -> Observable<R>)

    Changing that fixed things for me.
    Thank you

    sankarpatw
    @sankarpatw
    Hi I am a java developer and I would like to contribute for RX. I am familiar with observable pattern. could some one help me to start?
    Hristo Hristov
    @pentarex
    @sankarpatw start using it or start contributing?
    sankarpatw
    @sankarpatw
    Start contributing
    @pentarex
    @pentarex
    Hristo Hristov
    @pentarex
    I guess you have an account in github. open this page https://github.com/ReactiveX/RxJava for the repo, make your changes and then you should be able to make a push request, after rx team check your changes, they will decide to approve or decline them
    sankarpatw
    @sankarpatw
    ok
    David Karnok
    @akarnokd
    @sankarpatw Do you have something particular in mind you want to contribute? I suggest you open an issue first about what you'd like to contribute (unless it's a documentation-fix or -enhancement) so you don't waste your time on something that will be most likely declined. For example, trivial and/or convenience operators, breaking the protocols or the architecture, rare .. esoteric operators.
    Oleh Dokuka
    @OlegDokuka
    @akarnokd Are you planning to add forkjoinpool support? I saw that there is an operator for that in reactive-streams-commons, however, there is no production implementation in both RxJava and Reactor still.
    David Karnok
    @akarnokd
    RxJava targets Java 6 so it can't support it directly. The Schedulers.from is largely good enough to wrap a regular FJ pool as it's an ExecutorService.
    Oleh Dokuka
    @OlegDokuka
    Douh, forgot about that)
    Exerosis
    @Exerosis

    I'm guessing there is no way preexisting way to accomplish something like this right?

    Observable<CustomEvent> events = listen().toCustomEvent();
    events
        .map(Event::cause)
        .map(Cause::getStarter)
        .filter(User::isModerator)
        .subscribe(result -> {
              //Here result is a user(Cause::getStarter), but what I would really 
             //like is to then do something like.
              CustomEvent event = result;
              event.fiddleAroundABit();
              event.cancel();
         });

    I run into this kind of issue all the time and end up with very stringy filters.
    What I would like is something sorta like this:

    BiObservable<String, Integer> test;
    test
        .first().map(stringToDoubleMap::get)
        //At this point we have BiObservable<Double, Integer>
        .second().map(intToStringMap::get)
        //Now we have BiObservable<Double, String>
        .subscribe((first, second) -> 
              //Will emit when one from both sides is available.
         });

    This would make something like this relatively simple:

    Observable<CustomEvent> events;
        events
             .splitToSecond(Event::cause)
              //BiObservable<CustomEvent, Cause>
             .second().map(Cause::getStarter)
              //BiObservable<CustomEvent, User>
             .second().filter(User::isModerator)
             //If one side of the 'pair' gets removed by filter the other side does too.
            .first().subscribe(event -> {
                //Only care about the first side.
            });

    However I don't want to figure out how to make something like this possible until I know there is no 'standard' solution.

    Ronen
    @ronenhamias
    Hello guys wanted to ask you what do you think about using reactive streams as part of microservices architecture so service might communicate over remote streams and rpc like patterns
    here is QuoteService example:
    https://github.com/scalecube/scalecube/tree/master/examples/src/main/java/io/scalecube/examples/services/stocks
    Sourabh Verma
    @sourabhv
    What's the UndeliverableException in RxJava for? I got this exception reported in my app a few times in observables streams where onError was implemented.
    And how can I catch it so my app doesn't crash?
    David Karnok
    @akarnokd
    @ronenhamias Spring 5 has tools for that and they support Reactive-Streams.
    Oleh Dokuka
    @OlegDokuka
    @ronenhamias take a look on google gRPC or RSocket
    FartCompany
    @FartCompany
    can somebody give me an idea of what to program?
    Oleh Dokuka
    @OlegDokuka
    @FartCompany Try to create a simple application that consumes messages from Gitter using https://developer.gitter.im/docs/streaming-api (use for that purpose RxNetty https://github.com/ReactiveX/RxNetty) and then store consumed messages inside mongoDb, calculate some statistic such as, most active user, and return it to the websocket channel to webpage
    It would be quite interesting exercise to learn reactive programming and RxJava
    FartCompany
    @FartCompany
    @OlegDokuka too complecated im still an java begginner
    James Turner
    @jamesturner
    Hi, what is the best way to repeat an observable when you need the code in the Observable.create to run again?
    Dorus
    @Dorus
    .repeat()?
    James Turner
    @jamesturner
    From what I can tell, that will simply repeat the same items from the Observable right?
    Rather than re-run the code used to create the observable
    Actually, just found that I think I can do this using Observable.interval(X, TimeUnit.SECONDS).map(re-run the code in this lambda).subscribe()...
    Dorus
    @Dorus
    yeah that's a cleaner way
    how do you do this TimeUnit.SECONDS stuff?
    Sudhir Nimavat
    @snimavat
    New to RxJava, trying to figure out how to distribute elements amongst subscribers.
    eg Observable.from([1,2,3,4] - and there are four subscribers, each of the subscriber should get only one of the number.
    using filter is not an option probably, as in my case filter cant decide what to filter on to give each subscriber unique items
    What i have is a huge collection that i want to be able to process in parallel by few different subscribers
    Oleh Dokuka
    @OlegDokuka
    try to look at parallel operator
    Sudhir Nimavat
    @snimavat
    @OlegDokuka that works, how would i go about blocking the main thread for the parallel flowable to finish !?
    Sudhir Nimavat
    @snimavat
    sequential()