Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    blocklinkrob
    @blocklinkrob
    I had written a lot of code to extend the Observable prototype in rxjs, and I am trying to translate these into extension methods in Kotlin.
    function debounceOnMissedHeartbeat<TKey, TValue> (dueTime:number, onDebounceItemFactory:(key:TKey) => TValue, scheduler:Rx.Scheduler) {
      let sources:Rx.GroupedObservable = this
      return Rx.Observable.create(o => {
        return sources.subscribe(innerSource => {
          let key:TKey = innerSource.key
          let debouncedStream = innerSource.debounceWithSelector(dueTime, () => onDebounceItemFactory(key), scheduler)
          o.onNext(debouncedStream)
        },
        ex => o.onError(ex),
        () => o.onCompleted()
        )
      })
    }
    Rx.Observable.prototype.debounceOnMissedHeartbeat = debounceOnMissedHeartbeat
    function debounceWithSelector (dueTime:number, itemSelector, scheduler:Rx.Scheduler) {
      let source = this
      return Rx.Observable.create(o => {
        let disposables = new Rx.CompositeDisposable()
        let debounceDisposable = new Rx.SerialDisposable()
        disposables.add(debounceDisposable)
        let debounce = () => {
          debounceDisposable.setDisposable(
            scheduler.scheduleFuture(
              '',
              dueTime,
              () => {
                let debouncedItem = itemSelector()
                o.onNext(debouncedItem)
              }
            )
          )
        }
        disposables.add(
          source.subscribe(
            item => {
              debounce()
              o.onNext(item)
            },
            ex => {
              try {
                o.onError(ex)
              } catch (err1) {
              }
            },
            () => o.onCompleted()
          )
        )
        debounce()
        return disposables
      })
    }
    Rx.Observable.prototype.debounceWithSelector = debounceWithSelector
    blocklinkrob
    @blocklinkrob
    In particular, in the first piece of code, where I call o.onNext(debouncedStream), this does not seem to be valid in RxJava...but works fine in rxjs
    Here's my attempt in Kotlin btw:
    fun <TKey, Any> Observable<GroupedObservable<TKey, Any>>.debounceOnMissedHeartbeat(dueTime: Long, onDebounceItemFactory:(key: TKey) -> Any, scheduler: Scheduler) =
            Observable.create<Any> { o ->
                    this.subscribe({innerSource ->
                            val key: TKey? = innerSource.key
                            val debouncedStream = innerSource.debounceWithSelector(dueTime, onDebounceItemFactory(key), scheduler)
                            o.onNext(debouncedStream)
                    },
                    { ex: Throwable -> o.onError(ex) },
                    { o.onComplete() })
            }
    Yannick Lecaillez
    @ylecaillez
    Hi All !
    For information, contrary to what the javadoc says, reduce does not throws NoSuchElementException on empty source.
    The following tests fails:
    Flowable.empty().reduce((r1, r2) -> r1).test().assertError(NoSuchElementException.class);
    Not sure which part is wrong ? The javadoc or the impl ?
    David Karnok
    @akarnokd
    @ylecaillez ReactiveX/RxJava#6195
    Yannick Lecaillez
    @ylecaillez
    Great thanks, i'm on it :-)
    Yannick Lecaillez
    @ylecaillez
    @akarnokd ReactiveX/RxJava#6197 hope that helps.
    James Fleming
    @Flemingjp
    How can I buffer a Flowable until a Single completes?
    Ignacio Baca Moreno-Torres
    @ibaca

    I think this might work, but it is pretty weird… maybe you can rethink whatever you are trying to do in a more… reactive… way

    Flowable<Long> buffer$ = Flowable.concatArrayEager(
            // maxConcurrency, prefetch,
            Completable.timer(1, SECONDS).toFlowable(),
            Flowable.interval(10, MILLISECONDS)/*.onBackpressureBuffer(capacity)*/);

    you can use the commented code to control the buffering, if you have a single, that looks that it is actually a completable as you are ignoring its unique element, you can call .ignoreElement() and the again .toFlowable()

    another, maybe more “natural” approach is to use a ConnectableFlowable and connect it manually, so it start buffering at that moment… but then you need to control the disconnect of this ConnectableFlowable, the example is longer so I use this weird previous example bc atleast the subscriptions are handled automatically
    James Fleming
    @Flemingjp
    I'm trying to implement an OrderBook system where there is a snapshot (the Single) and updates to the snapshot (which is a Flowable) Im wanting to make it as self managed as possible
    Nicholas Bransby-Williams
    @nbransby
    @Flemingjp perhaps something like flowable.buffer(single.materialize()) ?
    Sergey Serdyuk
    @neee
    Hi everyone, I have a question:
    ServiceReturnObservable1.getData()
    .flatMap(r -> serviceReturnObservable2.getSomeData(r.getValue()))
    .flatMap( here I want to work with ServiceReturnObservable1.getData() and serviceReturnObservable2.getSomeData(r.getValue()))
    Pulkit Kumar
    @pulkitkumar
    @neee serviceReturnObservable2.getSomeData(r.getValue()) should emit an object which has everything you need to chain it further. Since it is already accepting value emitted by ServiceReturnObservable1, you should be able to add those values to the object emitted by ServiceReturnObservable2 .
    Dan O'Reilly
    @dano
    @neee i usually .map to a tuple type inside of the first flatMap call, or nest the second flatMap call inside the first
    i have a bunch of helpers for automatically "chaining" the second emitted value into a tuple with the previous emitted value, and then unpacking the tuple in the next call
    So you'd end up with
    ServiceReturnObservable1.getData()
        .flatMap(chainObs(r -> serviceReturnObservable2.getSomeData(r.getValue())))
        .flatMap(unpack((r1, r2) -> ...))
    Ignacio Baca Moreno-Torres
    @ibaca
    you know this sorcery works since java8 and can be saved in var since java10 :scream:
    Observable.just(1,2,3,4)
            .flatMap(n -> Observable.just("a","b","c")
                    .map(s -> new Object() { Integer num = n; String str = s; }))
            .subscribe(obj -> System.out.println(obj.num + ":" + obj.str));
    Ricky Limka
    @rcklmk_twitter

    Hey everyone, today I encountered a strange behavior with Observable.create:

    ...
    private ObservableEmitter<Action> extEmitter;
    
        public Test() {
            Observable<Action> actions = Observable.create(emitter -> {
                extEmitter = emitter;
    
                emitter.onNext(Action.TEST);
                emitter.onNext(Action.REVERT);
                emitter.onNext(Action.TEST);
                emitter.onNext(Action.REVERT);
            });
    
    
            Observable<Action> reverts = actions.filter(Action.REVERT::equals);
            Observable<Action> tests = actions.filter(Action.TEST::equals);
    
            Observable.merge(tests, reverts).subscribe(System.out::println);
    
            extEmitter.onNext(Action.TEST);
            extEmitter.onNext(Action.REVERT);
            extEmitter.onNext(Action.TEST);
            extEmitter.onNext(Action.REVERT);
        }
    ...

    Does anyone know why using extEmitter (emitter outside of lambda) emits different results than emitter inside lambda?

    ↑ what is the common practice for exporting emitter outside for pushing custom values from other java classes?
    Ricky Limka
    @rcklmk_twitter
    I should have used Rx.Subject instead?
    Incubator
    @incube8r
    Hello
    Aadi Deshpande
    @cilquirm
    Hi, i'm having trouble mapping a flowable. I am getting an error that I kinda sorta understand but I can't understand why I 'm getting it:
    [Java] The parameterized method <Tuple2<Entity,SourceEntry>>map(Function<? super Entity,? extends Tuple2<Entity,SourceEntry>>) of type Flowable<Entity> is not applicable for the arguments (Function<Entity,Tuple2<Entity,SourceEntry>>)
    Incubator
    @incube8r

    Hello, in this code (below), is it possible get the FileObject (also), I mean to subscribe to get it?)

    public Single<Boolean> uploadFile(String entityType, String entityId, String blobName){
        return Single.create(e -> {
            uploadElement = DOM.createElement("input");
            uploadElement.setAttribute("type", "file");
            uploadElement.setAttribute("accept", "*/*");;
            Event.sinkEvents(uploadElement, Event.ONCHANGE);
            Event.setEventListener(uploadElement, event -> {
                final FileObject fileObject = (FileObject) getFile(event);
                readAsDataURL((FileObject) fileObject, new FileReaderCallback() {
                    @Override
                    public void onLoad(String data) {
                            Entity entity = new Entity(entityType);
                            entity.setEntityId(entityId);
                            entity.setBlobProperty(blobName, data).subscribe(isSuccess -> {
                                e.onSuccess(isSuccess);
                            }, error -> {
                                e.onError(error);
                            });
                    }
                });
            });
            click();
        });
    }

    Or I have to change the Single<Boolean> to something else that will be able to get the FileObject and pass it one.onSuccess?

    Ignacio Baca Moreno-Torres
    @ibaca
    Something eles? Hehe just use Single<FileObject>
    Incubator
    @incube8r
    I also need to get the boolean status
    so I have to create a POJO to have both Boolean and FileObject at the same time?
    Ignacio Baca Moreno-Torres
    @ibaca
    Boolean status? It is always true, this is not a status
    Incubator
    @incube8r
    the method entity.setBlobProperty emits isSuccess which is the status either true or false
    Ignacio Baca Moreno-Torres
    @ibaca
    You are also adding a listener and never removing it, use the rxgwt utilities for observing event which handle that correctly
    Oh I see, and this is an Observable? Remember, never ever subscribe inside another subscription, use flap Map instead
    RxJava is like callback on steroids, so whenever you see a callback you can trivially wrap it as a RxJava type, readasdataurl has a callback, so wrap it as a rxtype and flatMap too (when I say flatMap, you can use the one you need, flatMap, concatMap, switchMap...)
    Incubator
    @incube8r
    problem here is that Event.setEventListener blocking that chain
    I mean I can't do return Observable... inside that anonymous class
    I could flatMap outside but the typed return would change
    Ignacio Baca Moreno-Torres
    @ibaca
    use RxGWT, then flapMap the event with the ReadAsDatUrl Observable wrapper
    Incubator
    @incube8r
    can you show me how to do this the right way using the RxGWT utilities?
    Ignacio Baca Moreno-Torres
    @ibaca
    :grimacing: sorry I'm out, but yep I'll help you tomorrow :wink:
    Ignacio Baca Moreno-Torres
    @ibaca
    @incube8r this is how I would do it
    // create the `input` outside Single.create! add it to the view, not in the observable
    HTMLInputElement input = Elements.input("file").get();
    // now just listen to changes using RxGWT
    Observable<InputEvent> inputChange$ = RxElemento.fromEvent(input, EventType.change);
    // map to file, this is just an example, use whatever code you have in `getFile`
    Observable<File> fileSelected$ = inputChange$.map(ev -> ev.dataTransfer.files.item(0));
    // move this as a method, maybe I'll add this to RxGWT
    Function<File, Single<String>> fileAsDataUrl = file -> Single.create(em -> {
        FileReader reader = new FileReader();
        reader.onload = ev -> { em.onSuccess(reader.result.asString()); return null; };
        reader.onerror = ev -> em.tryOnError(new RuntimeException(/*TODO*/));
        reader.readAsDataURL(file);
        em.setCancellable(reader::abort);
    });
    // combine the file with the fileAsDataUrl, or use switchMap to cancel previous
    Observable<String> dataUrlSelected$ = fileSelected$.flatMapSingle(fileAsDataUrl);
    // again, flatMap the entity, I think returning a "success" boolean is bad idea, use
    // exception to indicate exceptional cases, I'll map it so I can return the entity
    Observable<Entity> selectedEntity$ = dataUrlSelected$.flatMapSingle(url -> {
        Entity entity = new Entity(entityType).entityId(entityId);
        return entity.blobProperty(blobName, url).map(n -> {
            if (n) return entity; else throw new Exception(/*TODO*/);
        } );
    });  
    // so at that point, when you subscribe you will listen to the ready entity instances
    selectedEntity$.subscribe((Entity entity) -> console.log(entity));
    Ignacio Baca Moreno-Torres
    @ibaca
    @incube8r so the important conclusion is this 2 points! apply in order, bc the scond condition should be applied after the first point has been resolved
    1. always use rx types, if you add a callback, promise, completablefuture or any alternative async structure, wrap it with a rx type, rx compose only with other rx types, and you use rx bc it can be composed (I like to think that rxjava is like a callback unifier), this will assert that rxjava chain errors and cancelation correctly too, wich is the the mayor difference with raw callback and other alternatives
    2. never subscribe into a subscription, always compose the rx types, there are really a lot of operator, but atleast use on of flatMap, concatMap or switchMap. this is critical to allow rx to chain errors and cancelation correctly, if you subscribe into a subscription, then you are just callback-programming, and IMO if you mix rx and callback-programing is going to be an absolute hell, with just the disvantages of both sides
    Ignacio Baca Moreno-Torres
    @ibaca
    I’m not 100% sure, but we almost never need to debug the rx stack, and we have really few unexpected situation with rx, and I think this is bc we ALWAYS follow this 2 simple rules! this makes composition perfect (subscribe, next, complete, error and cancel signaling chained corretly in any situation) so the code much more predictable too; hehe not sure, but I’m starting to think that most of the people that complaing about rx problems and rx debugging, do not follow this rule, this breaks the chain and requires you to debug the code to find where the [error or cancelation] has gone
    David Karnok
    @akarnokd
    My experience is that pretty often, the code is full of very similar looking flows and the stacktrace listing the involved operators is not enough to locate the problematic flow. This is also apparent when map fails due to null returns and the stacktrace points to the RxJava operator, not the lambda of the developer that returned the null to the operator.
    Ignacio Baca Moreno-Torres
    @ibaca
    yep, this is true, but the solution is reasonable easely… use the “jump to type source” debug action
    image.png
    Dan O'Reilly
    @dano
    that doesn't help you when all you have is a stack trace in a log file :)
    Ignacio Baca Moreno-Torres
    @ibaca
    yep, true hehe I was thinking of that, it must be a reproducible error