These are chat archives for ReactiveX/RxJava

26th
Aug 2018
Ignacio Baca Moreno-Torres
@ibaca
Aug 26 2018 07:08
You can use a combine latest for example, but the the buffer will grow until the request completes, alternatively you can make the request and we it completes flatMap it with the stream, so you don't need the buffer
James Fleming
@Flemingjp
Aug 26 2018 08:12
What would be the best way to start the async task once the stream has started? I'm cautious of interrupting the flow of the stream
Igor Bozin
@igorbzin_gitlab
Aug 26 2018 09:30

@akarnokd already tried it.

        viewModel.getPlace().map(this::getPicturePaths)
                .subscribeOn(Schedulers.io())
                .toObservable()
                .concatMap(new Function<ArrayList<Uri>, Observable<RxObservableList<Uri>>>() {
                    @Override
                    public Observable<RxObservableList<Uri>> apply(ArrayList<Uri> uriArrayList) throws Exception {
                        mAdapter = new PictureRvAdapter(MarkerActivity.this, uriArrayList, MarkerActivity.this);
                        LinearLayoutManager layoutManager = new LinearLayoutManager(MarkerActivity.this, LinearLayoutManager.VERTICAL, false);
                        DividerItemDecoration decoration = new DividerItemDecoration(getApplicationContext(), VERTICAL);
                        mPicturesRV.addItemDecoration(decoration);
                        mPicturesRV.setLayoutManager(layoutManager);
                        mPicturesRV.setAdapter(mAdapter);
                        beginTransition(constraintSetEnd, 850);
                        Log.d(TAG, "Adapter set, transition started");

                        uriObservableList.setList(uriArrayList);
                        return uriObservableList.getObservable();
                        //return Observable.just(uriObservableList);
                    }
                })
                .doOnNext(new Consumer<RxObservableList<Uri>>() {
                    @Override
                    public void accept(RxObservableList<Uri> uris) throws Exception {
                        String pathString = makePathString(uris.getList());
                        viewModel.updatePicturePaths(pathString);
                        Log.d(TAG, "Updated pictures in database");
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<RxObservableList<Uri>>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "Onsubscribe called");
                    }

                    @Override
                    public void onNext(RxObservableList<Uri> uris) {
                        mAdapter.updatePictures(uris.getList());
                        Log.d(TAG, "Adapter pictures updated");
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "OnError : " + e);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "OnComplete called");
                    }
                });

still gives me this error: MarkerActivity: OnError : java.lang.IllegalStateException: Cannot access database on the main thread since it may potentially lock the UI for a long period of time.

Ignacio Baca Moreno-Torres
@ibaca
Aug 26 2018 09:48
@Flemingjp
Observable<String> stream = Observable.fromArray("a", "b", "c");
Observable<String> request = Single.just("1").toObservable();
stream.withLatestFrom(request, (streamItem, requestItem) -> streamItem + requestItem)
        .forEach(System.out::println);
// output: a1b1c1
Nicholas Bransby-Williams
@nbransby
Aug 26 2018 14:41
What combination of operators do I need to achieve the following
Nicholas Bransby-Williams
@nbransby
Aug 26 2018 14:52
Combine two streams into one which only emits an item( or completes ) as soon as both streams have emitted the same number of items (at least 1)
Been studying the operators but can’t see an obvious way to do it