Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Yannick Lecaillez
    @ylecaillez
    Isn't there a backpressure problem here ?
    Volkan Yazıcı
    @vy

    Using RxJava 1.1.9, I have a problem as follows:

    import rx.Observable;
    
    public enum ZipWithTest {;
    
        public static void main(String[] args) {
            Observable
                    .just(1, 2, 3)
                    .doOnNext(next -> System.out.format("next: %d%n", next))
                    .zipWith(
                            Observable.just(10, 100, 1000),
                            (next, multiplier) -> next * multiplier)
                    .doOnNext(multiplied -> System.out.format("multiplied: %d%n", multiplied))
                    .toBlocking()
                    .last();
        }
    
        /*
        next: 1
        next: 2
        next: 3
        multiplied: 10
        multiplied: 200
        multiplied: 3000
         */
    
    }

    I was expecting to get the following:

        next: 1
        multiplied: 10
        next: 2
        multiplied: 200
        next: 3
        multiplied: 3000

    What am I missing?

    骨来(PeterLi)
    @pli2014
    @vy doOnNext only go through every element in just(1, 2, 3) source.
    so that it invokes an action when it calls {@code onNext}
    Volkan Yazıcı
    @vy
    @pli2014 I got it. But why is it getting every element from just(1,2,3) and then zipping with the rest? Shouldn't they go in parallel? That is, one from source, one from multiplier, one from source, one from multiplier, and so on.
    骨来(PeterLi)
    @pli2014
    zipwith behavior is just that
    DoOnNext is called after ever onNext op from every element of source
    Ignacio Baca Moreno-Torres
    @ibaca
    @vy you should really upgrade to rxjava 2, but as rx1 observable has backpressure this might work as you said, but the problem is that zipWith has a buffer, so the 3 next call are juts filling the zip buffer, not sure if in rx1 exists the zipWith operator overload to indicate the buffer size, but you can do it in rx2, in that case you need to use Flowable instead of Observable to support backpressure
    Ignacio Baca Moreno-Torres
    @ibaca
    @ylecaillez you can use publish and merge the take(1) with a ignoreElements, so the source observable keep subscribe untils it completes (you can exgract the compose lamda to reuse the operator)
    Observable.just(1, 2, 3).doOnNext(n -> out.println("emmit " + n))
            .compose(o -> o.publish(p -> p.take(1)))
            .subscribe(n -> out.println("final " + n));
    // emmit 1 ⏎ final 1
    Observable.just(1, 2, 3).doOnNext(n -> out.println("emmit " + n))
            .compose(o -> o.publish(p -> p.take(1).mergeWith(p.ignoreElements())))
            .subscribe(n -> out.println("final " + n));
    // emmit 1 ⏎ final 1 ⏎ emmit 2 ⏎ emmit 3
    Mark Raynsford
    @io7m
    'ello. i'm wondering if there's a recommended way to achieve the following on 1.x: i have a hot observable (a publishsubject) but i'd like to buffer the last produced item so that new subscribers always see the most recently published event when they subscribe (and then see any new events when i publish them after that)
    i'm currently stuck on 1.x due to an incompatible third-party dependency in the project i'm working on, unfortunately
    sorry, that should read "buffer the most recently produced value"
    Mark Raynsford
    @io7m
    i assume that i want some combination of replay and defer, but i can't quite work it out
    Mark Raynsford
    @io7m
    ... turns out a BehaviorSubject does exactly this
    sorry for the noise!
    Moritz Bust
    @Busti
    How does ReactiveX compile to so many different languages? Is a transpiler used to achieve this, or is every code-base managed separately?
    Igor Bozin
    @igorbzin_gitlab
    I am trying to get a grasp of RxJava. How would i chain following situation in calls with RxJava: i retrieve a Single<User> from my database, loading a user by id for example, this should happen async. After loading the user object, i put some of his specific data in an arraylist. After that finished I want to listen on this array for changes, so that when an element gets added or removed I write to the database async to update the user. I have following code so far, also implemented the piece you have suggested
    David Karnok
    @akarnokd
    @Busti RxJava compiles to a JVM bytecode and any JVM language should be able to interact with it. The language adapters such as RxScala, RxKotlin, etc. are usually thin wrappers around RxJava so technically you still run Java bytecode beneath.
    @igorbzin_gitlab You mean something like this? https://stackoverflow.com/q/28816691/61158
    Moritz Bust
    @Busti
    @akarnokd Thank you for your answer. I was just wondering if non-related languages in the ReactiveX project somehow shared a common codebase since there are so many different languages supported. Something like haxe comes to mind, but I do realize that that would be a bad idea.
    Igor Bozin
    @igorbzin_gitlab

    @akarnokd yes, something exactly like this, thank you. I am still not sure how to chain those asynchronous calls though, since i operate on the database.

            Observable.just(uriObservableList)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<RxObservableList<Uri>>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "ONSUBSCRIBE ");
                        }
    
                        @Override
                        public void onNext(RxObservableList<Uri> uris) {
                            String s = makePathString(uris.getList());
                            AppExecutor.getInstance().diskIO().execute(() -> viewModel.updatePicturePaths(s)); // <-- THis async call should be performed on every onNext 
                            mAdapter.updatePictures(uris.getList());
                            Log.d(TAG, "ONNEXT");
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "ONERROR  " + e);
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "ONCOMPLETE  ");
                        }
                    });

    How do I construct my Subscription so that before every onNext call I can perform a write operation on the database? I have tried doOnNext which is supposed to be executed before onNext, but it is always called from the main thread

    David Stocking
    @dmstocking
    @ylecaillez woopsy I didn't notice your message. Yours appears to be really similar. The only difference appears to be that I take a action to run not just filter the first value. Yours kind of looks like it implements first().
    Viktor Vostrikov
    @wellbranding
    I have previously used MediatorLiveData to combine results from FCM, Room and Networking in my Repository. It worked well. However, now I want more complexity and some additional bonuses from RxJava. This is why I have decided to use RxJava this time. I can combine Room and Networking with Observable.concatArrayEager(my _observables) also I have used merge, but I want to get only latest changed value, not all observables, like it is done with MediatorLiveData However, I don't now how to do that after FCM pushes value, and how should I notify my main observable after new changes occur? No examples on this issue whatsoever. It is crucial part. I receive FCM in my BroadCastReceiver and then notified my repository's livedata, which notified my MediatorLiveData... How to do that with RxJava? Would really welcome any advice, because it is really important issue.
    David Karnok
    @akarnokd
    @igorbzin_gitlab Put the doOnNext between the subscribeOn and observeOn.
    James Fleming
    @Flemingjp
    Hello, I'm trying to get my head around trying to open a stream, buffering the items it emits until an async task has completed (this case a web request) and then process all the buffered items and the items following (if any).
    Ignacio Baca Moreno-Torres
    @ibaca
    You can use a combine latest for example, but the the buffer will grow until the request completes, alternatively you can make the request and we it completes flatMap it with the stream, so you don't need the buffer
    James Fleming
    @Flemingjp
    What would be the best way to start the async task once the stream has started? I'm cautious of interrupting the flow of the stream
    Igor Bozin
    @igorbzin_gitlab

    @akarnokd already tried it.

            viewModel.getPlace().map(this::getPicturePaths)
                    .subscribeOn(Schedulers.io())
                    .toObservable()
                    .concatMap(new Function<ArrayList<Uri>, Observable<RxObservableList<Uri>>>() {
                        @Override
                        public Observable<RxObservableList<Uri>> apply(ArrayList<Uri> uriArrayList) throws Exception {
                            mAdapter = new PictureRvAdapter(MarkerActivity.this, uriArrayList, MarkerActivity.this);
                            LinearLayoutManager layoutManager = new LinearLayoutManager(MarkerActivity.this, LinearLayoutManager.VERTICAL, false);
                            DividerItemDecoration decoration = new DividerItemDecoration(getApplicationContext(), VERTICAL);
                            mPicturesRV.addItemDecoration(decoration);
                            mPicturesRV.setLayoutManager(layoutManager);
                            mPicturesRV.setAdapter(mAdapter);
                            beginTransition(constraintSetEnd, 850);
                            Log.d(TAG, "Adapter set, transition started");
    
                            uriObservableList.setList(uriArrayList);
                            return uriObservableList.getObservable();
                            //return Observable.just(uriObservableList);
                        }
                    })
                    .doOnNext(new Consumer<RxObservableList<Uri>>() {
                        @Override
                        public void accept(RxObservableList<Uri> uris) throws Exception {
                            String pathString = makePathString(uris.getList());
                            viewModel.updatePicturePaths(pathString);
                            Log.d(TAG, "Updated pictures in database");
                        }
                    })
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<RxObservableList<Uri>>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "Onsubscribe called");
                        }
    
                        @Override
                        public void onNext(RxObservableList<Uri> uris) {
                            mAdapter.updatePictures(uris.getList());
                            Log.d(TAG, "Adapter pictures updated");
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "OnError : " + e);
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "OnComplete called");
                        }
                    });

    still gives me this error: MarkerActivity: OnError : java.lang.IllegalStateException: Cannot access database on the main thread since it may potentially lock the UI for a long period of time.

    Ignacio Baca Moreno-Torres
    @ibaca
    @Flemingjp
    Observable<String> stream = Observable.fromArray("a", "b", "c");
    Observable<String> request = Single.just("1").toObservable();
    stream.withLatestFrom(request, (streamItem, requestItem) -> streamItem + requestItem)
            .forEach(System.out::println);
    // output: a1b1c1
    Nicholas Bransby-Williams
    @nbransby
    What combination of operators do I need to achieve the following
    Nicholas Bransby-Williams
    @nbransby
    Combine two streams into one which only emits an item( or completes ) as soon as both streams have emitted the same number of items (at least 1)
    Been studying the operators but can’t see an obvious way to do it
    Nicholas Bransby-Williams
    @nbransby
    think ive got it: first.buffer(second)
    .map { it.size - 1 }
    .scan(0) { acc, current -> acc + current }
    .filter { it == 0 }
    骨来(PeterLi)
    @pli2014
    Schedulers.io() is un-bound thread pool, so when there is a blocked process,the number of threads will be increased unsafely.
     @Test
        public void threadCounter() throws Exception {
            int j = 0;
            while (j++ < 1000) {
                Flowable
                    .fromCallable(() -> {
                        Thread.sleep(1000);
                        return RandomUtils.nextInt();
                    })
                    .subscribeOn(Schedulers.io())
                    .observeOn(Schedulers.io())
                    .subscribe(c -> {
                        //System.out.println("c:" + c);
                    });
            }
    
            IoScheduler io = (IoScheduler) Schedulers.io();
    
            int i = 0;
            while (i++ < 10000) {
                System.out.println("io size:" + io.size());
                Thread.sleep(1000);
            }
        }
    David Karnok
    @akarnokd
    @igorbzin_gitlab It is important you provide code that demonstrates your problem. The implication of viewModel.getPlace() changes the setup quite a bit. I assume this fires from the main thread and thus subscribeOn has no effect. Replace it with observeOn.
    @nbransby How should that single emitted resulting item relate to the inputs?
    Nicholas Bransby-Williams
    @nbransby
    @akarnokd my use case is keeping track of open tabs and the result should signal that all tabs have been closed, I have this but havent actually tested it yet
                //close the window when all tabs are closed
                tabDidOpen.buffer(tabDidClose)
                        .map { it.size - 1 }
                        .scan(0) { acc, current -> acc + current }
                        .filter { it == 0 }
                        .subscribe { closeWindow() }
    Ignacio Baca Moreno-Torres
    @ibaca
    @nbransby if you have a stream of “plus” and “minus” events, you can do something like this…
    val lines = Observable.create<String> { while (true) it.onNext(readLine()!!) }
            .subscribeOn(Schedulers.newThread()).share()
    val plus = lines.filter { "+" == it }.map { { acc: Int -> acc + 1 } }
    val minus = lines.filter { "-" == it }.map { { acc: Int -> acc - 1 } }
    val calc = mergeArray<(Int) -> Int>(plus, minus).scan(0) { acc, fn -> fn(acc) }
    calc.doOnNext(::println).blockingLast()
    plus and minus are of type Observable<(Int) -> Int>
    Nicholas Bransby-Williams
    @nbransby
    thank you @ibaca for that although after removing the 0 parameter to the scan operator my solution appears to work well
    James Fleming
    @Flemingjp
    @ibaca I want to merge the request into the stream, but not merge the two items together - almost inject the single value into the stream. I've tried using mergeWith(...) but when the single terminates, the other stream also terminates. Im trying to find behaviour like this
    Ignacio Baca Moreno-Torres
    @ibaca
    Observable.merge and mergeWith should work
    blocklinkrob
    @blocklinkrob
    Hello folks, I'm currently trying to port an application I had built in Electron using node.js to JavaFX. I relied heavily on RX in general for the initial effort (rxjs in particular of course) and I am having some trouble getting the semantics right for RxJava. Should mention I am using Kotlin, but I am not (intentionally, anyways) using RxKotlin per se.
    I had written a lot of code to extend the Observable prototype in rxjs, and I am trying to translate these into extension methods in Kotlin.
    function debounceOnMissedHeartbeat<TKey, TValue> (dueTime:number, onDebounceItemFactory:(key:TKey) => TValue, scheduler:Rx.Scheduler) {
      let sources:Rx.GroupedObservable = this
      return Rx.Observable.create(o => {
        return sources.subscribe(innerSource => {
          let key:TKey = innerSource.key
          let debouncedStream = innerSource.debounceWithSelector(dueTime, () => onDebounceItemFactory(key), scheduler)
          o.onNext(debouncedStream)
        },
        ex => o.onError(ex),
        () => o.onCompleted()
        )
      })
    }
    Rx.Observable.prototype.debounceOnMissedHeartbeat = debounceOnMissedHeartbeat
    function debounceWithSelector (dueTime:number, itemSelector, scheduler:Rx.Scheduler) {
      let source = this
      return Rx.Observable.create(o => {
        let disposables = new Rx.CompositeDisposable()
        let debounceDisposable = new Rx.SerialDisposable()
        disposables.add(debounceDisposable)
        let debounce = () => {
          debounceDisposable.setDisposable(
            scheduler.scheduleFuture(
              '',
              dueTime,
              () => {
                let debouncedItem = itemSelector()
                o.onNext(debouncedItem)
              }
            )
          )
        }
        disposables.add(
          source.subscribe(
            item => {
              debounce()
              o.onNext(item)
            },
            ex => {
              try {
                o.onError(ex)
              } catch (err1) {
              }
            },
            () => o.onCompleted()
          )
        )
        debounce()
        return disposables
      })
    }
    Rx.Observable.prototype.debounceWithSelector = debounceWithSelector
    blocklinkrob
    @blocklinkrob
    In particular, in the first piece of code, where I call o.onNext(debouncedStream), this does not seem to be valid in RxJava...but works fine in rxjs
    Here's my attempt in Kotlin btw:
    fun <TKey, Any> Observable<GroupedObservable<TKey, Any>>.debounceOnMissedHeartbeat(dueTime: Long, onDebounceItemFactory:(key: TKey) -> Any, scheduler: Scheduler) =
            Observable.create<Any> { o ->
                    this.subscribe({innerSource ->
                            val key: TKey? = innerSource.key
                            val debouncedStream = innerSource.debounceWithSelector(dueTime, onDebounceItemFactory(key), scheduler)
                            o.onNext(debouncedStream)
                    },
                    { ex: Throwable -> o.onError(ex) },
                    { o.onComplete() })
            }
    Yannick Lecaillez
    @ylecaillez
    Hi All !
    For information, contrary to what the javadoc says, reduce does not throws NoSuchElementException on empty source.
    The following tests fails:
    Flowable.empty().reduce((r1, r2) -> r1).test().assertError(NoSuchElementException.class);