Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Haris Kljajić
    @devharis
    Is it possible to get current index/position when using .flatMapIterable?
    Or is there any other option to get a index on each iteration of a list
    On each iteration I want to call a method to notify current progress which in this case is the index.
    David Stocking
    @dmstocking
    I don't know if its idiomatic, but have you tried just zipping your data together with an Observable that just counts from 0 to size-1.
    Amir Mahmoud
    @amebrahimi
    Hello Guys, does any one know a good course for learning RxJava and RxAndroid?
    Dmitriy Manzhosov
    @manzhda
    +1
    Gundelino85
    @Gundelino85
    hi guys! :)
    I tried many hours now :worried: , maybe someone can help me. I have to get x files from a server, but only want to download 4 at the same time. These downloads are observables. And when i downloaded all of them (no difference if they fail or not) i get the message, that the download is complete. i found zip and flatMapWithMaxConcurrent but i dont know how to use them with each other. Maybe other methods are even better. I hope someone can help me :)
    Dorus
    @Dorus
    @Gundelino85 flatMap has overloads with the maxConcurrency parameter. Just set that to 4 and use the flatMap as normal.
    Gundelino85
    @Gundelino85
    you again :D
    ok, let me explain what i have. I use retrofit and get a list with UUIDs in the first request. In that subscription i want to do what i wrote above. But i can not get this "complete state" after everything is finished
    *in the first request i get the list of IDs for the PDFs i want to download...
    Dorus
    @Dorus
    @Gundelino85 yeah, but my RxJava knowledge is a bit more limited. However the answer i gave in the other chat should still apply. You could do something like:
    initialRequest()
      .flatMap(arr -> Observable.from(arr)
        .flatMap(id -> otherRequest(id), bool, 4)
      )
    Gundelino85
    @Gundelino85
    but then i subscribe on each request and not on the package, or am i wrong?
    Ethan Le
    @quanlt

    Hello guys! i have a problem with rxjava.
    I have a list of image url and i need to convert it to jpeg, so i tried

    Observable.fromIterable(entries)
           .concatMap(entry->Observable.just(ImageUtils.convertFile(entry.getPath())))
           .subscribeOn(Schedulers.io())

    in ImageUtils.convertFile, i decode bitmap then convert it to jpeg.
    It worked well on my Galaxy S7 Edge :tada:, but it throw out of memory exception in BitmapFactory.decodeFile (ImageUtils) :( on low-end devices.
    At first, i though it's because BitmapFactory.decodeFile use too much memory, but then i realize it can be error of Schedulers.io().
    Do you have any advice to solve this problem? Thank you so much:D

    Tabiul
    @tabiul
    @quanlt why do you think it have to do with Schedulers.io ?
    I am new to Rx and RxJava, I am trying a very simple example Observable<String> ob = Observable.<String>create(t -> t.onNext("1")). concatWith(Observable.create(t -> t.onNext("2"))); ob.subscribe(System.out::println);. For some reason it only prints 1. Any idea why ?
    Ethan Le
    @quanlt
    @tabiul i think it's because Schedulers.io have a limited memory, and decode bitmap consumes many memory
    i'm quite new with rxjava too :(
    ViTORossonero
    @ViTORossonero

    @quanlt Taking into account that you are working with bitmaps, I'd say that Schedulers#io has nothing to do with OOM in this case

    code that you provided shouldn't process few bitmaps in parallel so you'd rather check your ImageUtils#convertFile

    @tabiul
    Observable#concatWith will subscribe to Observable that passed as its param only after parent Observable:

    Observable.<String>create(t -> t.onNext("1"))

    completes, but in our case it won't complete ever

    Ethan Le
    @quanlt
    @ViTORossonero , i didn't process it in parallel,
    public static File copyFile(PhotoEntry entry, final Context context) throws IOException {
            File file = new File(entry.getPath());
            InputStream in = new FileInputStream(file);
            File outputDir = new File(context.getFilesDir() + IMAGE_TMP_FOLDER);
            if (!outputDir.exists()) {
                outputDir.mkdir();
            }
            File outputFile = new File(context.getFilesDir() + IMAGE_TMP_FOLDER + "/" + entry.getName() + JPEG_EXTENSION);
            try {
                Bitmap bitmap = BitmapFactory.decodeFile(entry.getPath());
                OutputStream out = new FileOutputStream(outputFile);
                bitmap.compress(Bitmap.CompressFormat.JPEG, 100, out);
                out.flush();
                out.close();
                ImageUtils.copyExif(entry.getPath(), outputFile.getPath());
                bitmap.recycle();
            } finally {
                in.close();
            }
            return outputFile;
        }
    i've just tried with asynctask, and it didn't help, that mean you were right about Schedulers.io
    Tabiul
    @tabiul
    @ViTORossonero , looks like you are right, so it works once I complete the first observable. Thanks
    Tabiul
    @tabiul
    I would like some advice on how could I approach this. I am trying to wrap HystrixCommand Observable into a pagination observable. So each Hystrix Command will respond with the response and about whether there is a next page. Currently the way I did it is like HystrixCommand.toObservable.concatMap{ // read the page information and if there is further page then return new HystrixCommand Observable}. This is done recursively. The main problem with this if there are lot of pages then the memory will get filled up as there would be lot of observables to be executed and in reality the user might not need all of them at the same time. Is there a way to do this lazily like when a subscriber consumes then concatMap maybe +2 observables ? I know there is an api doOnNext but that does not allow modifying the Observable
    Dakotah North
    @dakotahNorth
    Not sure if this addresses your hystrix question ... but the Netflix guys point to hystrix as part of the motivation for reactive sockets. https://github.com/rsocket/rsocket/blob/master/Motivations.md
    Costin Grigore
    @raisercostin
    Hi, I would like some hints on what operation is needed to return the latest events from an observable. I have an Observable to watch folder changes. When a change in the folder happens I have a heavy operation that might take several minutes like rendering a pdf file. After it finishes I would like to get the new full list of all events/file changes till the current moment and again start to process them. I tried to use sample, throttleLast, reduce, sliding and thumbling but none works as expected. Maybe I don't get the parameters well or I don't get the semantics.
    Costin Grigore
    @raisercostin
    Actually I managed to solve it:
         val src = org.raisercostin.jedi.Locations.file("""c:\data\work2\docs-proces\images1""")
    
        val srcObs: Observable[Seq[FileAlterated]] = 
          src.watchFileCreated(1000).doOnEach(x=>println(s"got $x")).tumblingBuffer(Duration(1,TimeUnit.SECONDS)).filter(_.nonEmpty).doOnEach(event=>{
          println(s"$event => ignore")
          })
         srcObs.toBlocking.last
    Haris Kljajić
    @devharis
    Trying to do a retryWhen on a merge but it doesn't get a throwable back from merge.
    Looking like this:
    private Observable requestData(String token) {
            return Observable.merge(new Observable[]{
                 mAssignmentRepository.sync(token),
                 mFacilityRepository.sync(token),
                 ...
    }
    requestData(token)
                    .retryWhen(errors -> errors) <-- Only a object?
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.immediate())
            .subscribe;
    In the retryWhen I'm planning to catch a 401 Unauthorized and retry the call.
    Haris Kljajić
    @devharis
    Maybe I must use a mergeDelayError? Any suggestions?
    Shashank Tomar
    @shashanktomar
    Hey Guys, wrote down this blog post, some of you might like it.
    https://proandroiddev.com/reactive-mythology-interrupt-patterns-e244979b865a
    Hristo Hristov
    @pentarex
    hey guys, I have one question. I am receiving data from bluetooth inputstream, I have two observables listenReceivedData Observable which listens all the time and I have another one which is called sendAndReceiveData Observable, which sends, waits till its received and calls next or onComplete based on some logic. My question is because the problem here is -> I have only one inputstream and when I read it the data is gone, how I can somehow read from one place and emit to those two observables
    Min Hiew
    @mhiew

    Hi everyone. How does one convert an Observable of one type to another type:
    Observable<T> to Observable<R>

    Are we supposed to use the to operator or perhaps flatMap?

    When I try using flatmap it complains about observable cannot be applied to anonymous function<Observable<T>, Observable<R>>

    Mark Elston
    @melston
    @mhiew , I may have misunderstood your question but what is the issue using Map? If you provide a function T -> R doesn't this do what you want?
    David Stocking
    @dmstocking
    Are you asking how to take an Observable<T> and make it an Observable<R>? Just call map. If you want to map from an Observable<T> to Observable<R> via multiple Observable<R> then you need flatmap.
    map (T -> R) -> R
    flatmap (T -> Observable<R>) -> R (I hope I wrote that right)
    Mark Elston
    @melston
    @dmstocking, that looks right.
    Min Hiew
    @mhiew

    Hi, I believe the issue was due to my misunderstanding of flatmap. I thought it was going to pass in `Observable<T>.

    (Observable<T> -> Observable<R>)

    But as @dmstocking mentioned it should be:
    (T -> Observable<R>)

    Changing that fixed things for me.
    Thank you

    sankarpatw
    @sankarpatw
    Hi I am a java developer and I would like to contribute for RX. I am familiar with observable pattern. could some one help me to start?
    Hristo Hristov
    @pentarex
    @sankarpatw start using it or start contributing?
    sankarpatw
    @sankarpatw
    Start contributing
    @pentarex
    @pentarex
    Hristo Hristov
    @pentarex
    I guess you have an account in github. open this page https://github.com/ReactiveX/RxJava for the repo, make your changes and then you should be able to make a push request, after rx team check your changes, they will decide to approve or decline them
    sankarpatw
    @sankarpatw
    ok
    David Karnok
    @akarnokd
    @sankarpatw Do you have something particular in mind you want to contribute? I suggest you open an issue first about what you'd like to contribute (unless it's a documentation-fix or -enhancement) so you don't waste your time on something that will be most likely declined. For example, trivial and/or convenience operators, breaking the protocols or the architecture, rare .. esoteric operators.
    Oleh Dokuka
    @OlegDokuka
    @akarnokd Are you planning to add forkjoinpool support? I saw that there is an operator for that in reactive-streams-commons, however, there is no production implementation in both RxJava and Reactor still.