These are chat archives for ReactiveX/RxJava

19th
Apr 2017
Yannick Lecaillez
@ylecaillez
Apr 19 2017 10:19
Hi ! I've a question regarding GroupBy. I actually want a GroupByUntil so i'm using a GroupBy() + takeWhile() on the GroupedFlowable.
Problem is that GroupBy() is cancelling the upstream source when the last GroupedFlowable has been cancelled. (https://github.com/ReactiveX/RxJava/blob/a00ea07a4d2ce409e8dbea66ddbca9c0a77ddab6/src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupBy.java#L204) This seems wrong to me: The upstream source should be cancelled only if the Flowable<GroupedFlowable<K,T>> is cancelled. WDYT ?
My use case is a stream of multiplexed response packet which has to be grouped to their corresponding request. Basically a Flowable<GroupedFlowable<Request, Response>> responses = streamOfMultiplexedResponse.groupBy(responseMmessage.getRequestId());
David Karnok
@akarnokd
Apr 19 2017 11:10
The souce is cancelled if both the main sequence of groups and each individual group is cancelled. With takeWhile on the group, you cancel the current group but the group may be reopened if the key appears later from the upstream - which is not the case with Request I presume. Unless you cancel the outer Flowable, cancelling the inner GroupedFlowables should not stop the creation and emission to other groups.
Yannick Lecaillez
@ylecaillez
Apr 19 2017 11:45
I made a unit test and indeed, you're right: cancelling the inner GroupedFlowables does not stop the creation and emission to other groups. I have misunderstood the code. Thanks a lot ! and sorry for the noise :)
Felix
@fx42
Apr 19 2017 14:28
short question: Observable.interval( 10, TimeUnit.MILLISECONDS ).subscribeOn( Schedulers.computation() )
.map( i -> methodGeneratingDoubleArray())
I now want to flatMap(array -> Observable.from(array) ) but i see this wont work with rxjava2
what is the way to go here?
DavidMihola
@DavidMihola
Apr 19 2017 14:31
fromArray?
Felix
@fx42
Apr 19 2017 14:31
this wont work
when i then subscribe i still have to work with the array
Erik Buttram
@erito
Apr 19 2017 15:41
what about something like flatMapIterable? If the array is primitive you may be stuck unwrapping it yourself with fromCallable or create
DavidMihola
@DavidMihola
Apr 19 2017 15:46
I still think that fromArray should work:
        Observable.just(new Integer[]{1,2,3})
                .flatMap(new Function<Integer[], Observable<Integer>>() {
                    @Override
                    public Observable<Integer> apply(@NonNull Integer[] ints) throws Exception {
                        return Observable.fromArray(ints);
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {

                    }
                });
Vladimir Baryshnikov
@vovkab
Apr 19 2017 18:33

Hello.

Any idea how can I achieve next:
I have stream of items, for specific type I need to wait for 1 sec, and if there is no more items of this type, let it keep moving down the stream, if there is more items of this type - wait total 5 seconds and collect everything, for all other item types just let them go as is.

Vladimir Baryshnikov
@vovkab
Apr 19 2017 18:42

I thought maybe I can do something like:

events()
    .groupBy(event -> event.type() == type ? 0 : 1)
    .flatMap(grouped -> 
        if (grouped.getKey() == 0) {
            Observable closingSelector = grouped.debounce(1, TimeUnit.SECONDS);
            grouped
                .buffer(closingSelector)
                .map(agregate());
        } else {
            return grouped;
        }
    )

But looks like I'm missing something about how groupBy is working, because I'm getting: java.lang.IllegalStateException: Only one Subscriber allowed!
note: using rxjava-1

VolodymyrBaisa
@VolodymyrBaisa
Apr 19 2017 22:37
Привет народ как написать правильно Observable while
Надо зделать loop который будет обновлять поле
Denis Stoyanov
@xgrommx
Apr 19 2017 22:49
@VolodymyrBaisa repeat
VolodymyrBaisa
@VolodymyrBaisa
Apr 19 2017 22:51
@xgrommx Привет, у меня есть строка скажем рамдомная и мне надо каждый раз вызывать примерно раз в секунду и присваивать значение во вторую строку
как ето сделать?
Denis Stoyanov
@xgrommx
Apr 19 2017 22:51
@VolodymyrBaisa ты пишешь в англ чате
@VolodymyrBaisa private chat if u want
VolodymyrBaisa
@VolodymyrBaisa
Apr 19 2017 22:52
ok