Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
Repo info
    David Karnok
    You can debug RxActivityResult and see if there is a subscriber to PublishSubject at that point or not.
    Viktor Sinelnikov
    @akarnokd I see 1 subscriber when subject calls onNext
    Also I tried to do a hack:
    .flatMap(object: Function<RequiresCaptchaResponse, Observable<String>> {
                        override fun apply(it: RequiresCaptchaResponse): Observable<String> {
                            val subject = PublishSubject.create<String>()
                            if (it.requiresCaptcha) {
                                        .startIntent(Intent(activity, Recaptcha2Activity::class.java))
                                        .map { it.data().getStringExtra(Recaptcha2Activity.KEY_CAPTCHA_HASH)?:"" }
                                        .subscribe({ subject.onNext(it) })
                                return subject
                            } else {
                                return Observable.just("")
    .flatMap { api.publishSendIt(id!!, it) }
    I am getting a callback in "subsribe", so it 100% calls subject.onNext
    But I don't get a call into next flatMap
    David Karnok
    Try ReplaySubject in your hack.
    Viktor Sinelnikov
    Tried, no success(
    David Karnok
    In that case, please provide a runnable application that demonstrates this problem.
    Amr Elmasry

    Hello, I have a question about the operators implementation,

            public void onSubscribe(Disposable s) {
                if (DisposableHelper.validate(this.s, s)) {
                    this.s = s;

    I understand that this line DisposableHelper.validate(this.s, s) is to assert that the value from upstream isn't null and it wasn't set before, but why in this line actual.onSubscribe(this); we passed this ? why not just passing s ? what is the need for implementing Disposable ?

    Is there a version of RxJava 2 that uses java functional interfaces?
    David Karnok
    @AmrElmasry Many operators have to intercept dispose, but otherwise it is required by RxJava so operator fusion doesn't skip your operator by talking to the upstream's Disposable directly.
    @Exerosis No. RxJava has to run on Java 6 where those interfaces are not available. If there is going to be a RxJava 3 with Java 9, we may still use custom interfaces as the Java 8 functional interfaces don't allow throwing checked exceptions - it has become a convenience with RxJava 2 to be able to throw at will from lambdas.
    Amr Elmasry
    @akarnokd Thanks
    Haris Kljajić
    Is it possible to get current index/position when using .flatMapIterable?
    Or is there any other option to get a index on each iteration of a list
    On each iteration I want to call a method to notify current progress which in this case is the index.
    David Stocking
    I don't know if its idiomatic, but have you tried just zipping your data together with an Observable that just counts from 0 to size-1.
    Amir Mahmoud
    Hello Guys, does any one know a good course for learning RxJava and RxAndroid?
    Dmitriy Manzhosov
    hi guys! :)
    I tried many hours now :worried: , maybe someone can help me. I have to get x files from a server, but only want to download 4 at the same time. These downloads are observables. And when i downloaded all of them (no difference if they fail or not) i get the message, that the download is complete. i found zip and flatMapWithMaxConcurrent but i dont know how to use them with each other. Maybe other methods are even better. I hope someone can help me :)
    @Gundelino85 flatMap has overloads with the maxConcurrency parameter. Just set that to 4 and use the flatMap as normal.
    you again :D
    ok, let me explain what i have. I use retrofit and get a list with UUIDs in the first request. In that subscription i want to do what i wrote above. But i can not get this "complete state" after everything is finished
    *in the first request i get the list of IDs for the PDFs i want to download...
    @Gundelino85 yeah, but my RxJava knowledge is a bit more limited. However the answer i gave in the other chat should still apply. You could do something like:
      .flatMap(arr -> Observable.from(arr)
        .flatMap(id -> otherRequest(id), bool, 4)
    but then i subscribe on each request and not on the package, or am i wrong?
    Ethan Le

    Hello guys! i have a problem with rxjava.
    I have a list of image url and i need to convert it to jpeg, so i tried


    in ImageUtils.convertFile, i decode bitmap then convert it to jpeg.
    It worked well on my Galaxy S7 Edge :tada:, but it throw out of memory exception in BitmapFactory.decodeFile (ImageUtils) :( on low-end devices.
    At first, i though it's because BitmapFactory.decodeFile use too much memory, but then i realize it can be error of Schedulers.io().
    Do you have any advice to solve this problem? Thank you so much:D

    @quanlt why do you think it have to do with Schedulers.io ?
    I am new to Rx and RxJava, I am trying a very simple example Observable<String> ob = Observable.<String>create(t -> t.onNext("1")). concatWith(Observable.create(t -> t.onNext("2"))); ob.subscribe(System.out::println);. For some reason it only prints 1. Any idea why ?
    Ethan Le
    @tabiul i think it's because Schedulers.io have a limited memory, and decode bitmap consumes many memory
    i'm quite new with rxjava too :(

    @quanlt Taking into account that you are working with bitmaps, I'd say that Schedulers#io has nothing to do with OOM in this case

    code that you provided shouldn't process few bitmaps in parallel so you'd rather check your ImageUtils#convertFile

    Observable#concatWith will subscribe to Observable that passed as its param only after parent Observable:

    Observable.<String>create(t -> t.onNext("1"))

    completes, but in our case it won't complete ever

    Ethan Le
    @ViTORossonero , i didn't process it in parallel,
    public static File copyFile(PhotoEntry entry, final Context context) throws IOException {
            File file = new File(entry.getPath());
            InputStream in = new FileInputStream(file);
            File outputDir = new File(context.getFilesDir() + IMAGE_TMP_FOLDER);
            if (!outputDir.exists()) {
            File outputFile = new File(context.getFilesDir() + IMAGE_TMP_FOLDER + "/" + entry.getName() + JPEG_EXTENSION);
            try {
                Bitmap bitmap = BitmapFactory.decodeFile(entry.getPath());
                OutputStream out = new FileOutputStream(outputFile);
                bitmap.compress(Bitmap.CompressFormat.JPEG, 100, out);
                ImageUtils.copyExif(entry.getPath(), outputFile.getPath());
            } finally {
            return outputFile;
    i've just tried with asynctask, and it didn't help, that mean you were right about Schedulers.io
    @ViTORossonero , looks like you are right, so it works once I complete the first observable. Thanks
    I would like some advice on how could I approach this. I am trying to wrap HystrixCommand Observable into a pagination observable. So each Hystrix Command will respond with the response and about whether there is a next page. Currently the way I did it is like HystrixCommand.toObservable.concatMap{ // read the page information and if there is further page then return new HystrixCommand Observable}. This is done recursively. The main problem with this if there are lot of pages then the memory will get filled up as there would be lot of observables to be executed and in reality the user might not need all of them at the same time. Is there a way to do this lazily like when a subscriber consumes then concatMap maybe +2 observables ? I know there is an api doOnNext but that does not allow modifying the Observable
    Dakotah North
    Not sure if this addresses your hystrix question ... but the Netflix guys point to hystrix as part of the motivation for reactive sockets. https://github.com/rsocket/rsocket/blob/master/Motivations.md
    Costin Grigore
    Hi, I would like some hints on what operation is needed to return the latest events from an observable. I have an Observable to watch folder changes. When a change in the folder happens I have a heavy operation that might take several minutes like rendering a pdf file. After it finishes I would like to get the new full list of all events/file changes till the current moment and again start to process them. I tried to use sample, throttleLast, reduce, sliding and thumbling but none works as expected. Maybe I don't get the parameters well or I don't get the semantics.
    Costin Grigore
    Actually I managed to solve it:
         val src = org.raisercostin.jedi.Locations.file("""c:\data\work2\docs-proces\images1""")
        val srcObs: Observable[Seq[FileAlterated]] = 
          src.watchFileCreated(1000).doOnEach(x=>println(s"got $x")).tumblingBuffer(Duration(1,TimeUnit.SECONDS)).filter(_.nonEmpty).doOnEach(event=>{
          println(s"$event => ignore")
    Haris Kljajić
    Trying to do a retryWhen on a merge but it doesn't get a throwable back from merge.
    Looking like this:
    private Observable requestData(String token) {
            return Observable.merge(new Observable[]{
                    .retryWhen(errors -> errors) <-- Only a object?
    In the retryWhen I'm planning to catch a 401 Unauthorized and retry the call.
    Haris Kljajić
    Maybe I must use a mergeDelayError? Any suggestions?
    Shashank Tomar
    Hey Guys, wrote down this blog post, some of you might like it.
    Hristo Hristov
    hey guys, I have one question. I am receiving data from bluetooth inputstream, I have two observables listenReceivedData Observable which listens all the time and I have another one which is called sendAndReceiveData Observable, which sends, waits till its received and calls next or onComplete based on some logic. My question is because the problem here is -> I have only one inputstream and when I read it the data is gone, how I can somehow read from one place and emit to those two observables
    Min Hiew

    Hi everyone. How does one convert an Observable of one type to another type:
    Observable<T> to Observable<R>

    Are we supposed to use the to operator or perhaps flatMap?

    When I try using flatmap it complains about observable cannot be applied to anonymous function<Observable<T>, Observable<R>>

    Mark Elston
    @mhiew , I may have misunderstood your question but what is the issue using Map? If you provide a function T -> R doesn't this do what you want?