Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    Nicholas Bransby-Williams
    @nbransby
    Been studying the operators but can’t see an obvious way to do it
    Nicholas Bransby-Williams
    @nbransby
    think ive got it: first.buffer(second)
    .map { it.size - 1 }
    .scan(0) { acc, current -> acc + current }
    .filter { it == 0 }
    骨来(PeterLi)
    @pli2014
    Schedulers.io() is un-bound thread pool, so when there is a blocked process,the number of threads will be increased unsafely.
     @Test
        public void threadCounter() throws Exception {
            int j = 0;
            while (j++ < 1000) {
                Flowable
                    .fromCallable(() -> {
                        Thread.sleep(1000);
                        return RandomUtils.nextInt();
                    })
                    .subscribeOn(Schedulers.io())
                    .observeOn(Schedulers.io())
                    .subscribe(c -> {
                        //System.out.println("c:" + c);
                    });
            }
    
            IoScheduler io = (IoScheduler) Schedulers.io();
    
            int i = 0;
            while (i++ < 10000) {
                System.out.println("io size:" + io.size());
                Thread.sleep(1000);
            }
        }
    David Karnok
    @akarnokd
    @igorbzin_gitlab It is important you provide code that demonstrates your problem. The implication of viewModel.getPlace() changes the setup quite a bit. I assume this fires from the main thread and thus subscribeOn has no effect. Replace it with observeOn.
    @nbransby How should that single emitted resulting item relate to the inputs?
    Nicholas Bransby-Williams
    @nbransby
    @akarnokd my use case is keeping track of open tabs and the result should signal that all tabs have been closed, I have this but havent actually tested it yet
                //close the window when all tabs are closed
                tabDidOpen.buffer(tabDidClose)
                        .map { it.size - 1 }
                        .scan(0) { acc, current -> acc + current }
                        .filter { it == 0 }
                        .subscribe { closeWindow() }
    Ignacio Baca Moreno-Torres
    @ibaca
    @nbransby if you have a stream of “plus” and “minus” events, you can do something like this…
    val lines = Observable.create<String> { while (true) it.onNext(readLine()!!) }
            .subscribeOn(Schedulers.newThread()).share()
    val plus = lines.filter { "+" == it }.map { { acc: Int -> acc + 1 } }
    val minus = lines.filter { "-" == it }.map { { acc: Int -> acc - 1 } }
    val calc = mergeArray<(Int) -> Int>(plus, minus).scan(0) { acc, fn -> fn(acc) }
    calc.doOnNext(::println).blockingLast()
    plus and minus are of type Observable<(Int) -> Int>
    Nicholas Bransby-Williams
    @nbransby
    thank you @ibaca for that although after removing the 0 parameter to the scan operator my solution appears to work well
    James Fleming
    @Flemingjp
    @ibaca I want to merge the request into the stream, but not merge the two items together - almost inject the single value into the stream. I've tried using mergeWith(...) but when the single terminates, the other stream also terminates. Im trying to find behaviour like this
    Ignacio Baca Moreno-Torres
    @ibaca
    Observable.merge and mergeWith should work
    blocklinkrob
    @blocklinkrob
    Hello folks, I'm currently trying to port an application I had built in Electron using node.js to JavaFX. I relied heavily on RX in general for the initial effort (rxjs in particular of course) and I am having some trouble getting the semantics right for RxJava. Should mention I am using Kotlin, but I am not (intentionally, anyways) using RxKotlin per se.
    I had written a lot of code to extend the Observable prototype in rxjs, and I am trying to translate these into extension methods in Kotlin.
    function debounceOnMissedHeartbeat<TKey, TValue> (dueTime:number, onDebounceItemFactory:(key:TKey) => TValue, scheduler:Rx.Scheduler) {
      let sources:Rx.GroupedObservable = this
      return Rx.Observable.create(o => {
        return sources.subscribe(innerSource => {
          let key:TKey = innerSource.key
          let debouncedStream = innerSource.debounceWithSelector(dueTime, () => onDebounceItemFactory(key), scheduler)
          o.onNext(debouncedStream)
        },
        ex => o.onError(ex),
        () => o.onCompleted()
        )
      })
    }
    Rx.Observable.prototype.debounceOnMissedHeartbeat = debounceOnMissedHeartbeat
    function debounceWithSelector (dueTime:number, itemSelector, scheduler:Rx.Scheduler) {
      let source = this
      return Rx.Observable.create(o => {
        let disposables = new Rx.CompositeDisposable()
        let debounceDisposable = new Rx.SerialDisposable()
        disposables.add(debounceDisposable)
        let debounce = () => {
          debounceDisposable.setDisposable(
            scheduler.scheduleFuture(
              '',
              dueTime,
              () => {
                let debouncedItem = itemSelector()
                o.onNext(debouncedItem)
              }
            )
          )
        }
        disposables.add(
          source.subscribe(
            item => {
              debounce()
              o.onNext(item)
            },
            ex => {
              try {
                o.onError(ex)
              } catch (err1) {
              }
            },
            () => o.onCompleted()
          )
        )
        debounce()
        return disposables
      })
    }
    Rx.Observable.prototype.debounceWithSelector = debounceWithSelector
    blocklinkrob
    @blocklinkrob
    In particular, in the first piece of code, where I call o.onNext(debouncedStream), this does not seem to be valid in RxJava...but works fine in rxjs
    Here's my attempt in Kotlin btw:
    fun <TKey, Any> Observable<GroupedObservable<TKey, Any>>.debounceOnMissedHeartbeat(dueTime: Long, onDebounceItemFactory:(key: TKey) -> Any, scheduler: Scheduler) =
            Observable.create<Any> { o ->
                    this.subscribe({innerSource ->
                            val key: TKey? = innerSource.key
                            val debouncedStream = innerSource.debounceWithSelector(dueTime, onDebounceItemFactory(key), scheduler)
                            o.onNext(debouncedStream)
                    },
                    { ex: Throwable -> o.onError(ex) },
                    { o.onComplete() })
            }
    Yannick Lecaillez
    @ylecaillez
    Hi All !
    For information, contrary to what the javadoc says, reduce does not throws NoSuchElementException on empty source.
    The following tests fails:
    Flowable.empty().reduce((r1, r2) -> r1).test().assertError(NoSuchElementException.class);
    Not sure which part is wrong ? The javadoc or the impl ?
    David Karnok
    @akarnokd
    @ylecaillez ReactiveX/RxJava#6195
    Yannick Lecaillez
    @ylecaillez
    Great thanks, i'm on it :-)
    Yannick Lecaillez
    @ylecaillez
    @akarnokd ReactiveX/RxJava#6197 hope that helps.
    James Fleming
    @Flemingjp
    How can I buffer a Flowable until a Single completes?
    Ignacio Baca Moreno-Torres
    @ibaca

    I think this might work, but it is pretty weird… maybe you can rethink whatever you are trying to do in a more… reactive… way

    Flowable<Long> buffer$ = Flowable.concatArrayEager(
            // maxConcurrency, prefetch,
            Completable.timer(1, SECONDS).toFlowable(),
            Flowable.interval(10, MILLISECONDS)/*.onBackpressureBuffer(capacity)*/);

    you can use the commented code to control the buffering, if you have a single, that looks that it is actually a completable as you are ignoring its unique element, you can call .ignoreElement() and the again .toFlowable()

    another, maybe more “natural” approach is to use a ConnectableFlowable and connect it manually, so it start buffering at that moment… but then you need to control the disconnect of this ConnectableFlowable, the example is longer so I use this weird previous example bc atleast the subscriptions are handled automatically
    James Fleming
    @Flemingjp
    I'm trying to implement an OrderBook system where there is a snapshot (the Single) and updates to the snapshot (which is a Flowable) Im wanting to make it as self managed as possible
    Nicholas Bransby-Williams
    @nbransby
    @Flemingjp perhaps something like flowable.buffer(single.materialize()) ?
    Sergey Serdyuk
    @neee
    Hi everyone, I have a question:
    ServiceReturnObservable1.getData()
    .flatMap(r -> serviceReturnObservable2.getSomeData(r.getValue()))
    .flatMap( here I want to work with ServiceReturnObservable1.getData() and serviceReturnObservable2.getSomeData(r.getValue()))
    Pulkit Kumar
    @pulkitkumar
    @neee serviceReturnObservable2.getSomeData(r.getValue()) should emit an object which has everything you need to chain it further. Since it is already accepting value emitted by ServiceReturnObservable1, you should be able to add those values to the object emitted by ServiceReturnObservable2 .
    Dan O'Reilly
    @dano
    @neee i usually .map to a tuple type inside of the first flatMap call, or nest the second flatMap call inside the first
    i have a bunch of helpers for automatically "chaining" the second emitted value into a tuple with the previous emitted value, and then unpacking the tuple in the next call
    So you'd end up with
    ServiceReturnObservable1.getData()
        .flatMap(chainObs(r -> serviceReturnObservable2.getSomeData(r.getValue())))
        .flatMap(unpack((r1, r2) -> ...))
    Ignacio Baca Moreno-Torres
    @ibaca
    you know this sorcery works since java8 and can be saved in var since java10 :scream:
    Observable.just(1,2,3,4)
            .flatMap(n -> Observable.just("a","b","c")
                    .map(s -> new Object() { Integer num = n; String str = s; }))
            .subscribe(obj -> System.out.println(obj.num + ":" + obj.str));
    Ricky Limka
    @rcklmk_twitter

    Hey everyone, today I encountered a strange behavior with Observable.create:

    ...
    private ObservableEmitter<Action> extEmitter;
    
        public Test() {
            Observable<Action> actions = Observable.create(emitter -> {
                extEmitter = emitter;
    
                emitter.onNext(Action.TEST);
                emitter.onNext(Action.REVERT);
                emitter.onNext(Action.TEST);
                emitter.onNext(Action.REVERT);
            });
    
    
            Observable<Action> reverts = actions.filter(Action.REVERT::equals);
            Observable<Action> tests = actions.filter(Action.TEST::equals);
    
            Observable.merge(tests, reverts).subscribe(System.out::println);
    
            extEmitter.onNext(Action.TEST);
            extEmitter.onNext(Action.REVERT);
            extEmitter.onNext(Action.TEST);
            extEmitter.onNext(Action.REVERT);
        }
    ...

    Does anyone know why using extEmitter (emitter outside of lambda) emits different results than emitter inside lambda?

    ↑ what is the common practice for exporting emitter outside for pushing custom values from other java classes?
    Ricky Limka
    @rcklmk_twitter
    I should have used Rx.Subject instead?
    Incubator
    @incube8r
    Hello
    Aadi Deshpande
    @cilquirm
    Hi, i'm having trouble mapping a flowable. I am getting an error that I kinda sorta understand but I can't understand why I 'm getting it:
    [Java] The parameterized method <Tuple2<Entity,SourceEntry>>map(Function<? super Entity,? extends Tuple2<Entity,SourceEntry>>) of type Flowable<Entity> is not applicable for the arguments (Function<Entity,Tuple2<Entity,SourceEntry>>)
    Incubator
    @incube8r

    Hello, in this code (below), is it possible get the FileObject (also), I mean to subscribe to get it?)

    public Single<Boolean> uploadFile(String entityType, String entityId, String blobName){
        return Single.create(e -> {
            uploadElement = DOM.createElement("input");
            uploadElement.setAttribute("type", "file");
            uploadElement.setAttribute("accept", "*/*");;
            Event.sinkEvents(uploadElement, Event.ONCHANGE);
            Event.setEventListener(uploadElement, event -> {
                final FileObject fileObject = (FileObject) getFile(event);
                readAsDataURL((FileObject) fileObject, new FileReaderCallback() {
                    @Override
                    public void onLoad(String data) {
                            Entity entity = new Entity(entityType);
                            entity.setEntityId(entityId);
                            entity.setBlobProperty(blobName, data).subscribe(isSuccess -> {
                                e.onSuccess(isSuccess);
                            }, error -> {
                                e.onError(error);
                            });
                    }
                });
            });
            click();
        });
    }

    Or I have to change the Single<Boolean> to something else that will be able to get the FileObject and pass it one.onSuccess?

    Ignacio Baca Moreno-Torres
    @ibaca
    Something eles? Hehe just use Single<FileObject>
    Incubator
    @incube8r
    I also need to get the boolean status
    so I have to create a POJO to have both Boolean and FileObject at the same time?
    Ignacio Baca Moreno-Torres
    @ibaca
    Boolean status? It is always true, this is not a status
    Incubator
    @incube8r
    the method entity.setBlobProperty emits isSuccess which is the status either true or false
    Ignacio Baca Moreno-Torres
    @ibaca
    You are also adding a listener and never removing it, use the rxgwt utilities for observing event which handle that correctly
    Oh I see, and this is an Observable? Remember, never ever subscribe inside another subscription, use flap Map instead
    RxJava is like callback on steroids, so whenever you see a callback you can trivially wrap it as a RxJava type, readasdataurl has a callback, so wrap it as a rxtype and flatMap too (when I say flatMap, you can use the one you need, flatMap, concatMap, switchMap...)
    Incubator
    @incube8r
    problem here is that Event.setEventListener blocking that chain
    I mean I can't do return Observable... inside that anonymous class