Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Dorus
    @Dorus
    @Gundelino85 flatMap has overloads with the maxConcurrency parameter. Just set that to 4 and use the flatMap as normal.
    Gundelino85
    @Gundelino85
    you again :D
    ok, let me explain what i have. I use retrofit and get a list with UUIDs in the first request. In that subscription i want to do what i wrote above. But i can not get this "complete state" after everything is finished
    *in the first request i get the list of IDs for the PDFs i want to download...
    Dorus
    @Dorus
    @Gundelino85 yeah, but my RxJava knowledge is a bit more limited. However the answer i gave in the other chat should still apply. You could do something like:
    initialRequest()
      .flatMap(arr -> Observable.from(arr)
        .flatMap(id -> otherRequest(id), bool, 4)
      )
    Gundelino85
    @Gundelino85
    but then i subscribe on each request and not on the package, or am i wrong?
    Ethan Le
    @quanlt

    Hello guys! i have a problem with rxjava.
    I have a list of image url and i need to convert it to jpeg, so i tried

    Observable.fromIterable(entries)
           .concatMap(entry->Observable.just(ImageUtils.convertFile(entry.getPath())))
           .subscribeOn(Schedulers.io())

    in ImageUtils.convertFile, i decode bitmap then convert it to jpeg.
    It worked well on my Galaxy S7 Edge :tada:, but it throw out of memory exception in BitmapFactory.decodeFile (ImageUtils) :( on low-end devices.
    At first, i though it's because BitmapFactory.decodeFile use too much memory, but then i realize it can be error of Schedulers.io().
    Do you have any advice to solve this problem? Thank you so much:D

    Tabiul
    @tabiul
    @quanlt why do you think it have to do with Schedulers.io ?
    I am new to Rx and RxJava, I am trying a very simple example Observable<String> ob = Observable.<String>create(t -> t.onNext("1")). concatWith(Observable.create(t -> t.onNext("2"))); ob.subscribe(System.out::println);. For some reason it only prints 1. Any idea why ?
    Ethan Le
    @quanlt
    @tabiul i think it's because Schedulers.io have a limited memory, and decode bitmap consumes many memory
    i'm quite new with rxjava too :(
    ViTORossonero
    @ViTORossonero

    @quanlt Taking into account that you are working with bitmaps, I'd say that Schedulers#io has nothing to do with OOM in this case

    code that you provided shouldn't process few bitmaps in parallel so you'd rather check your ImageUtils#convertFile

    @tabiul
    Observable#concatWith will subscribe to Observable that passed as its param only after parent Observable:

    Observable.<String>create(t -> t.onNext("1"))

    completes, but in our case it won't complete ever

    Ethan Le
    @quanlt
    @ViTORossonero , i didn't process it in parallel,
    public static File copyFile(PhotoEntry entry, final Context context) throws IOException {
            File file = new File(entry.getPath());
            InputStream in = new FileInputStream(file);
            File outputDir = new File(context.getFilesDir() + IMAGE_TMP_FOLDER);
            if (!outputDir.exists()) {
                outputDir.mkdir();
            }
            File outputFile = new File(context.getFilesDir() + IMAGE_TMP_FOLDER + "/" + entry.getName() + JPEG_EXTENSION);
            try {
                Bitmap bitmap = BitmapFactory.decodeFile(entry.getPath());
                OutputStream out = new FileOutputStream(outputFile);
                bitmap.compress(Bitmap.CompressFormat.JPEG, 100, out);
                out.flush();
                out.close();
                ImageUtils.copyExif(entry.getPath(), outputFile.getPath());
                bitmap.recycle();
            } finally {
                in.close();
            }
            return outputFile;
        }
    i've just tried with asynctask, and it didn't help, that mean you were right about Schedulers.io
    Tabiul
    @tabiul
    @ViTORossonero , looks like you are right, so it works once I complete the first observable. Thanks
    Tabiul
    @tabiul
    I would like some advice on how could I approach this. I am trying to wrap HystrixCommand Observable into a pagination observable. So each Hystrix Command will respond with the response and about whether there is a next page. Currently the way I did it is like HystrixCommand.toObservable.concatMap{ // read the page information and if there is further page then return new HystrixCommand Observable}. This is done recursively. The main problem with this if there are lot of pages then the memory will get filled up as there would be lot of observables to be executed and in reality the user might not need all of them at the same time. Is there a way to do this lazily like when a subscriber consumes then concatMap maybe +2 observables ? I know there is an api doOnNext but that does not allow modifying the Observable
    Dakotah North
    @dakotahNorth
    Not sure if this addresses your hystrix question ... but the Netflix guys point to hystrix as part of the motivation for reactive sockets. https://github.com/rsocket/rsocket/blob/master/Motivations.md
    Costin Grigore
    @raisercostin
    Hi, I would like some hints on what operation is needed to return the latest events from an observable. I have an Observable to watch folder changes. When a change in the folder happens I have a heavy operation that might take several minutes like rendering a pdf file. After it finishes I would like to get the new full list of all events/file changes till the current moment and again start to process them. I tried to use sample, throttleLast, reduce, sliding and thumbling but none works as expected. Maybe I don't get the parameters well or I don't get the semantics.
    Costin Grigore
    @raisercostin
    Actually I managed to solve it:
         val src = org.raisercostin.jedi.Locations.file("""c:\data\work2\docs-proces\images1""")
    
        val srcObs: Observable[Seq[FileAlterated]] = 
          src.watchFileCreated(1000).doOnEach(x=>println(s"got $x")).tumblingBuffer(Duration(1,TimeUnit.SECONDS)).filter(_.nonEmpty).doOnEach(event=>{
          println(s"$event => ignore")
          })
         srcObs.toBlocking.last
    Haris Kljajić
    @devharis
    Trying to do a retryWhen on a merge but it doesn't get a throwable back from merge.
    Looking like this:
    private Observable requestData(String token) {
            return Observable.merge(new Observable[]{
                 mAssignmentRepository.sync(token),
                 mFacilityRepository.sync(token),
                 ...
    }
    requestData(token)
                    .retryWhen(errors -> errors) <-- Only a object?
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.immediate())
            .subscribe;
    In the retryWhen I'm planning to catch a 401 Unauthorized and retry the call.
    Haris Kljajić
    @devharis
    Maybe I must use a mergeDelayError? Any suggestions?
    Shashank Tomar
    @shashanktomar
    Hey Guys, wrote down this blog post, some of you might like it.
    https://proandroiddev.com/reactive-mythology-interrupt-patterns-e244979b865a
    Hristo Hristov
    @pentarex
    hey guys, I have one question. I am receiving data from bluetooth inputstream, I have two observables listenReceivedData Observable which listens all the time and I have another one which is called sendAndReceiveData Observable, which sends, waits till its received and calls next or onComplete based on some logic. My question is because the problem here is -> I have only one inputstream and when I read it the data is gone, how I can somehow read from one place and emit to those two observables
    Min Hiew
    @mhiew

    Hi everyone. How does one convert an Observable of one type to another type:
    Observable<T> to Observable<R>

    Are we supposed to use the to operator or perhaps flatMap?

    When I try using flatmap it complains about observable cannot be applied to anonymous function<Observable<T>, Observable<R>>

    Mark Elston
    @melston
    @mhiew , I may have misunderstood your question but what is the issue using Map? If you provide a function T -> R doesn't this do what you want?
    David Stocking
    @dmstocking
    Are you asking how to take an Observable<T> and make it an Observable<R>? Just call map. If you want to map from an Observable<T> to Observable<R> via multiple Observable<R> then you need flatmap.
    map (T -> R) -> R
    flatmap (T -> Observable<R>) -> R (I hope I wrote that right)
    Mark Elston
    @melston
    @dmstocking, that looks right.
    Min Hiew
    @mhiew

    Hi, I believe the issue was due to my misunderstanding of flatmap. I thought it was going to pass in `Observable<T>.

    (Observable<T> -> Observable<R>)

    But as @dmstocking mentioned it should be:
    (T -> Observable<R>)

    Changing that fixed things for me.
    Thank you

    sankarpatw
    @sankarpatw
    Hi I am a java developer and I would like to contribute for RX. I am familiar with observable pattern. could some one help me to start?
    Hristo Hristov
    @pentarex
    @sankarpatw start using it or start contributing?
    sankarpatw
    @sankarpatw
    Start contributing
    @pentarex
    @pentarex
    Hristo Hristov
    @pentarex
    I guess you have an account in github. open this page https://github.com/ReactiveX/RxJava for the repo, make your changes and then you should be able to make a push request, after rx team check your changes, they will decide to approve or decline them
    sankarpatw
    @sankarpatw
    ok
    David Karnok
    @akarnokd
    @sankarpatw Do you have something particular in mind you want to contribute? I suggest you open an issue first about what you'd like to contribute (unless it's a documentation-fix or -enhancement) so you don't waste your time on something that will be most likely declined. For example, trivial and/or convenience operators, breaking the protocols or the architecture, rare .. esoteric operators.
    Oleh Dokuka
    @OlegDokuka
    @akarnokd Are you planning to add forkjoinpool support? I saw that there is an operator for that in reactive-streams-commons, however, there is no production implementation in both RxJava and Reactor still.
    David Karnok
    @akarnokd
    RxJava targets Java 6 so it can't support it directly. The Schedulers.from is largely good enough to wrap a regular FJ pool as it's an ExecutorService.
    Oleh Dokuka
    @OlegDokuka
    Douh, forgot about that)
    Exerosis
    @Exerosis

    I'm guessing there is no way preexisting way to accomplish something like this right?

    Observable<CustomEvent> events = listen().toCustomEvent();
    events
        .map(Event::cause)
        .map(Cause::getStarter)
        .filter(User::isModerator)
        .subscribe(result -> {
              //Here result is a user(Cause::getStarter), but what I would really 
             //like is to then do something like.
              CustomEvent event = result;
              event.fiddleAroundABit();
              event.cancel();
         });

    I run into this kind of issue all the time and end up with very stringy filters.
    What I would like is something sorta like this:

    BiObservable<String, Integer> test;
    test
        .first().map(stringToDoubleMap::get)
        //At this point we have BiObservable<Double, Integer>
        .second().map(intToStringMap::get)
        //Now we have BiObservable<Double, String>
        .subscribe((first, second) -> 
              //Will emit when one from both sides is available.
         });

    This would make something like this relatively simple:

    Observable<CustomEvent> events;
        events
             .splitToSecond(Event::cause)
              //BiObservable<CustomEvent, Cause>
             .second().map(Cause::getStarter)
              //BiObservable<CustomEvent, User>
             .second().filter(User::isModerator)
             //If one side of the 'pair' gets removed by filter the other side does too.
            .first().subscribe(event -> {
                //Only care about the first side.
            });

    However I don't want to figure out how to make something like this possible until I know there is no 'standard' solution.

    Ronen
    @ronenhamias
    Hello guys wanted to ask you what do you think about using reactive streams as part of microservices architecture so service might communicate over remote streams and rpc like patterns
    here is QuoteService example:
    https://github.com/scalecube/scalecube/tree/master/examples/src/main/java/io/scalecube/examples/services/stocks
    Sourabh Verma
    @sourabhv
    What's the UndeliverableException in RxJava for? I got this exception reported in my app a few times in observables streams where onError was implemented.
    And how can I catch it so my app doesn't crash?
    David Karnok
    @akarnokd
    @ronenhamias Spring 5 has tools for that and they support Reactive-Streams.