Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Ethan Le
    @quanlt
    i'm quite new with rxjava too :(
    ViTORossonero
    @ViTORossonero

    @quanlt Taking into account that you are working with bitmaps, I'd say that Schedulers#io has nothing to do with OOM in this case

    code that you provided shouldn't process few bitmaps in parallel so you'd rather check your ImageUtils#convertFile

    @tabiul
    Observable#concatWith will subscribe to Observable that passed as its param only after parent Observable:

    Observable.<String>create(t -> t.onNext("1"))

    completes, but in our case it won't complete ever

    Ethan Le
    @quanlt
    @ViTORossonero , i didn't process it in parallel,
    public static File copyFile(PhotoEntry entry, final Context context) throws IOException {
            File file = new File(entry.getPath());
            InputStream in = new FileInputStream(file);
            File outputDir = new File(context.getFilesDir() + IMAGE_TMP_FOLDER);
            if (!outputDir.exists()) {
                outputDir.mkdir();
            }
            File outputFile = new File(context.getFilesDir() + IMAGE_TMP_FOLDER + "/" + entry.getName() + JPEG_EXTENSION);
            try {
                Bitmap bitmap = BitmapFactory.decodeFile(entry.getPath());
                OutputStream out = new FileOutputStream(outputFile);
                bitmap.compress(Bitmap.CompressFormat.JPEG, 100, out);
                out.flush();
                out.close();
                ImageUtils.copyExif(entry.getPath(), outputFile.getPath());
                bitmap.recycle();
            } finally {
                in.close();
            }
            return outputFile;
        }
    i've just tried with asynctask, and it didn't help, that mean you were right about Schedulers.io
    Tabiul
    @tabiul
    @ViTORossonero , looks like you are right, so it works once I complete the first observable. Thanks
    Tabiul
    @tabiul
    I would like some advice on how could I approach this. I am trying to wrap HystrixCommand Observable into a pagination observable. So each Hystrix Command will respond with the response and about whether there is a next page. Currently the way I did it is like HystrixCommand.toObservable.concatMap{ // read the page information and if there is further page then return new HystrixCommand Observable}. This is done recursively. The main problem with this if there are lot of pages then the memory will get filled up as there would be lot of observables to be executed and in reality the user might not need all of them at the same time. Is there a way to do this lazily like when a subscriber consumes then concatMap maybe +2 observables ? I know there is an api doOnNext but that does not allow modifying the Observable
    Dakotah North
    @dakotahNorth
    Not sure if this addresses your hystrix question ... but the Netflix guys point to hystrix as part of the motivation for reactive sockets. https://github.com/rsocket/rsocket/blob/master/Motivations.md
    Costin Grigore
    @raisercostin
    Hi, I would like some hints on what operation is needed to return the latest events from an observable. I have an Observable to watch folder changes. When a change in the folder happens I have a heavy operation that might take several minutes like rendering a pdf file. After it finishes I would like to get the new full list of all events/file changes till the current moment and again start to process them. I tried to use sample, throttleLast, reduce, sliding and thumbling but none works as expected. Maybe I don't get the parameters well or I don't get the semantics.
    Costin Grigore
    @raisercostin
    Actually I managed to solve it:
         val src = org.raisercostin.jedi.Locations.file("""c:\data\work2\docs-proces\images1""")
    
        val srcObs: Observable[Seq[FileAlterated]] = 
          src.watchFileCreated(1000).doOnEach(x=>println(s"got $x")).tumblingBuffer(Duration(1,TimeUnit.SECONDS)).filter(_.nonEmpty).doOnEach(event=>{
          println(s"$event => ignore")
          })
         srcObs.toBlocking.last
    Haris Kljajić
    @devharis
    Trying to do a retryWhen on a merge but it doesn't get a throwable back from merge.
    Looking like this:
    private Observable requestData(String token) {
            return Observable.merge(new Observable[]{
                 mAssignmentRepository.sync(token),
                 mFacilityRepository.sync(token),
                 ...
    }
    requestData(token)
                    .retryWhen(errors -> errors) <-- Only a object?
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.immediate())
            .subscribe;
    In the retryWhen I'm planning to catch a 401 Unauthorized and retry the call.
    Haris Kljajić
    @devharis
    Maybe I must use a mergeDelayError? Any suggestions?
    Shashank Tomar
    @shashanktomar
    Hey Guys, wrote down this blog post, some of you might like it.
    https://proandroiddev.com/reactive-mythology-interrupt-patterns-e244979b865a
    Hristo Hristov
    @pentarex
    hey guys, I have one question. I am receiving data from bluetooth inputstream, I have two observables listenReceivedData Observable which listens all the time and I have another one which is called sendAndReceiveData Observable, which sends, waits till its received and calls next or onComplete based on some logic. My question is because the problem here is -> I have only one inputstream and when I read it the data is gone, how I can somehow read from one place and emit to those two observables
    Min Hiew
    @mhiew

    Hi everyone. How does one convert an Observable of one type to another type:
    Observable<T> to Observable<R>

    Are we supposed to use the to operator or perhaps flatMap?

    When I try using flatmap it complains about observable cannot be applied to anonymous function<Observable<T>, Observable<R>>

    Mark Elston
    @melston
    @mhiew , I may have misunderstood your question but what is the issue using Map? If you provide a function T -> R doesn't this do what you want?
    David Stocking
    @dmstocking
    Are you asking how to take an Observable<T> and make it an Observable<R>? Just call map. If you want to map from an Observable<T> to Observable<R> via multiple Observable<R> then you need flatmap.
    map (T -> R) -> R
    flatmap (T -> Observable<R>) -> R (I hope I wrote that right)
    Mark Elston
    @melston
    @dmstocking, that looks right.
    Min Hiew
    @mhiew

    Hi, I believe the issue was due to my misunderstanding of flatmap. I thought it was going to pass in `Observable<T>.

    (Observable<T> -> Observable<R>)

    But as @dmstocking mentioned it should be:
    (T -> Observable<R>)

    Changing that fixed things for me.
    Thank you

    sankarpatw
    @sankarpatw
    Hi I am a java developer and I would like to contribute for RX. I am familiar with observable pattern. could some one help me to start?
    Hristo Hristov
    @pentarex
    @sankarpatw start using it or start contributing?
    sankarpatw
    @sankarpatw
    Start contributing
    @pentarex
    @pentarex
    Hristo Hristov
    @pentarex
    I guess you have an account in github. open this page https://github.com/ReactiveX/RxJava for the repo, make your changes and then you should be able to make a push request, after rx team check your changes, they will decide to approve or decline them
    sankarpatw
    @sankarpatw
    ok
    David Karnok
    @akarnokd
    @sankarpatw Do you have something particular in mind you want to contribute? I suggest you open an issue first about what you'd like to contribute (unless it's a documentation-fix or -enhancement) so you don't waste your time on something that will be most likely declined. For example, trivial and/or convenience operators, breaking the protocols or the architecture, rare .. esoteric operators.
    Oleh Dokuka
    @OlegDokuka
    @akarnokd Are you planning to add forkjoinpool support? I saw that there is an operator for that in reactive-streams-commons, however, there is no production implementation in both RxJava and Reactor still.
    David Karnok
    @akarnokd
    RxJava targets Java 6 so it can't support it directly. The Schedulers.from is largely good enough to wrap a regular FJ pool as it's an ExecutorService.
    Oleh Dokuka
    @OlegDokuka
    Douh, forgot about that)
    Exerosis
    @Exerosis

    I'm guessing there is no way preexisting way to accomplish something like this right?

    Observable<CustomEvent> events = listen().toCustomEvent();
    events
        .map(Event::cause)
        .map(Cause::getStarter)
        .filter(User::isModerator)
        .subscribe(result -> {
              //Here result is a user(Cause::getStarter), but what I would really 
             //like is to then do something like.
              CustomEvent event = result;
              event.fiddleAroundABit();
              event.cancel();
         });

    I run into this kind of issue all the time and end up with very stringy filters.
    What I would like is something sorta like this:

    BiObservable<String, Integer> test;
    test
        .first().map(stringToDoubleMap::get)
        //At this point we have BiObservable<Double, Integer>
        .second().map(intToStringMap::get)
        //Now we have BiObservable<Double, String>
        .subscribe((first, second) -> 
              //Will emit when one from both sides is available.
         });

    This would make something like this relatively simple:

    Observable<CustomEvent> events;
        events
             .splitToSecond(Event::cause)
              //BiObservable<CustomEvent, Cause>
             .second().map(Cause::getStarter)
              //BiObservable<CustomEvent, User>
             .second().filter(User::isModerator)
             //If one side of the 'pair' gets removed by filter the other side does too.
            .first().subscribe(event -> {
                //Only care about the first side.
            });

    However I don't want to figure out how to make something like this possible until I know there is no 'standard' solution.

    Ronen
    @ronenhamias
    Hello guys wanted to ask you what do you think about using reactive streams as part of microservices architecture so service might communicate over remote streams and rpc like patterns
    here is QuoteService example:
    https://github.com/scalecube/scalecube/tree/master/examples/src/main/java/io/scalecube/examples/services/stocks
    Sourabh Verma
    @sourabhv
    What's the UndeliverableException in RxJava for? I got this exception reported in my app a few times in observables streams where onError was implemented.
    And how can I catch it so my app doesn't crash?
    David Karnok
    @akarnokd
    @ronenhamias Spring 5 has tools for that and they support Reactive-Streams.
    Oleh Dokuka
    @OlegDokuka
    @ronenhamias take a look on google gRPC or RSocket
    FartCompany
    @FartCompany
    can somebody give me an idea of what to program?
    Oleh Dokuka
    @OlegDokuka
    @FartCompany Try to create a simple application that consumes messages from Gitter using https://developer.gitter.im/docs/streaming-api (use for that purpose RxNetty https://github.com/ReactiveX/RxNetty) and then store consumed messages inside mongoDb, calculate some statistic such as, most active user, and return it to the websocket channel to webpage
    It would be quite interesting exercise to learn reactive programming and RxJava
    FartCompany
    @FartCompany
    @OlegDokuka too complecated im still an java begginner
    James Turner
    @jamesturner
    Hi, what is the best way to repeat an observable when you need the code in the Observable.create to run again?
    Dorus
    @Dorus
    .repeat()?
    James Turner
    @jamesturner
    From what I can tell, that will simply repeat the same items from the Observable right?
    Rather than re-run the code used to create the observable
    Actually, just found that I think I can do this using Observable.interval(X, TimeUnit.SECONDS).map(re-run the code in this lambda).subscribe()...