Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
    m0rgan
    @m0rgan_twitter
    Mostly the disposing part
    I tried this:
    BehaviorSubject<T> valueSubject = (BehaviorSubject<T>) BehaviorSubject.create(new ObservableOnSubscribe<T>() { @Override public void subscribe(ObservableEmitter<T> emitter) throws Exception { // do something here } });
    But I got a io.reactivex.internal.operators.observable.ObservableCreate cannot be cast to io.reactivex.subjects.BehaviorSubject
    m0rgan
    @m0rgan_twitter
    I'm feeling it isn't the right approach
    m0rgan
    @m0rgan_twitter
    I don't know how to respond to that
    David Karnok
    @akarnokd
    Based on your question, you should read a bit about RxJava and Subjects first.
    m0rgan
    @m0rgan_twitter
    That is what I have been doing and am currently doing, I just thought I could find some help here, my bad
    Thank you anyway
    David Karnok
    @akarnokd
    Subjects are self-contained components of RxJava and you can only create them via their own create method. create(ObservableOnSubscribe) is defined on Observable which creates an Observable, a cold source and will have nothing to do with the subject type. Many operators in RxJava are named after the behavior they do in an everyday language, such as "actions done when an observer subscribes" -> doOnSubscribe, when the subscription gets disposed: doOnDispose. Subjects themselves are not disposed.
    Based on the misunderstanding your question implies, I wanted to encourage you to familiarize yourself with the concepts and components of RxJava first, so the answer, which invokes more concepts and components, can be understood better. For this, I suggest reading Learning RxJava.
    m0rgan
    @m0rgan_twitter
    Thank you David, that helped a lot
    Grigorii Tkachuk
    @tkgreg
    Hi guys! I have next question, I'm trying to use rxjava because I heard a lot that it's easy to handle situation when your api under high pressure on your service layer. I decided to try but it looks like I do not understand main idea. So imagine I have next method which exposed thru the REST endpoint @RequestMapping(method = RequestMethod.POST, value = "/pushData", produces = MediaType.APPLICATION_JSON_UTF8_VALUE) public Single<String> pushData(@RequestBody String json) How I can slowdown a pressure on the logic inside this method? Is it a right way of thinking or I should slow it down the level of Undertow/Jetty/Netty by applying some strategy on it.
    David Karnok
    @akarnokd
    @tkgreg Single implies a single result so backpressure is not really at play there. If you want to reduce the call frequency to pushData itself, there are ways for it, such as using concatMap or flatMap with concurrency limit, spanning out the source events over time, etc. You could also research Retrofit use cases similar to your problem.
    Grigorii Tkachuk
    @tkgreg
    @akarnokd what is Retrofit?
    public Single<String> pushJsonDataAsync(String json) {
            return Single.create(source -> {
                try {
                    PushCrawlerDataRequest.Builder request = PushCrawlerDataRequest.newBuilder();
                    JsonFormat.parser().merge(json, request);
                    PushCrawlerDataResponse.Builder builder = pushData(request.build());
                    source.onSuccess(JsonFormat.printer().print(builder));
                } catch (Exception ex) {
                    source.onError(ex);
                }
            });
        }
    this is how I'm trying to use it right now
    Karthik
    @karthik-ir

    hello,
    I have a Flowable that gets inbound data with events. These events are not in order. These messages are expected to reach subscribers in order(ascending order of event ID).

    This is a infinite stream, sometime may be large enough to be stored in memory.

    How can I achieve this using rxJava or without rx?

    Rafael Guillen
    @rguillens
    Hi all
    I'm processing a live video stream and would like to improve performance using Flowable, BackpressureStrategy.DROP and the sample operator to limit the number of frames per second. I have implemented the same functionality using Observable and also limiting the number of frames with the sample operator. The second variant has a superior performance with respect to the first, however, I am concerned that in a computer with less processing power may have Backpressure issues.
    Eugene Popovich
    @httpdispatch

    How to properly perform action after the subscription? doOnSubscribe doesn't fit the needs

    Example

    PublishProcessor<Integer> processor = PublishProcessor.create();
    processor
                    .doOnSubscribe(subscription -> processor.onNext(1))
                    .subscribe(System.out::println);
     processor.onNext(2);

    prints

    2

    As a workaround i use this approach

    processor
                    .mergeWith(Completable.fromAction(() -> processor.onNext(1)).toFlowable())
                     .subscribe(System.out::println);

    This prints

    1
    2

    as expected

    David Karnok
    @akarnokd
    @httpdispatch startWith. doOnSubscribe is not good in this case because the processor's onNext doesn't see the subscriber at that moment.
    Eugene Popovich
    @httpdispatch
    @akarnokd Thanks for the answer. startWith is good if item may be ignored by any other processor subscribers. How i understand i should use some custom workaround in this case
    David Karnok
    @akarnokd
    I can't imagine why you need such reentrance, i.e., why old subscribers should be notified about new subscribers .
    Eugene Popovich
    @httpdispatch

    @akarnokd I have 2 event bus like processors. One is for requests second is for results. There may be multiple request handlers which will post a results to the result processor. This may be either synchronously or asynchronously. I want to make a request and wait for the result in the same flowable

    Simplified case

    PublishProcessor<Integer> requestBus = PublishProcessor.create();
    PublishProcessor<Pair<Integer, Integer>> resultBus = PublishProcessor.create();
    requestBus.filter(value -> value == 1)
            .map(value -> Pair.create(value, 10))
            .subscribe(resultBus);
    
    Integer request = 1;
    Flowable<Integer> resultFlowable = resultBus
            .filter(requestResultPair -> requestResultPair.first == request)
            .doOnSubscribe(__ -> requestBus.onNext(request));
    Gautam
    @im-gautam
    Nn
    James Nugent
    @jen20
    Hi, is anyone aware of an operator with semantics similar to Timeout, but where you can mutate the last value and remain subscribed on a timeout?
    David Karnok
    @akarnokd
    There is no standard operator for that, however, you could adapt this answer by using a BehaviorSubject flatMapped in instead of the constant and subscribe the BehaviorSubject to the shared observable in publish.
    James Nugent
    @jen20
    Thanks @akarnokd, that looks like it shold work
    Andrei Sereda
    @asereda-gs

    Hello.

    I'm trying to implement a functionality which can be described as "striped" Observer. The idea is pretty simple, I want Observers (subscriptions) for a “stripe” to be single-threaded.

    Stated differently, I would like to select a Scheduler (for observeOn operation) based on current event. Think something like observeOn(Function<T, Scheduler>) instead of current observeOn(Scheduler).

    Simple example which seem to work but not when called multiple times:

    @Test
    public void example() throws Exception {
    final List<Scheduler> schedulers = Stream.generate(() -> Schedulers.from(Executors.newSingleThreadExecutor())).limit(10).collect(Collectors.toList());
    final Function<String, String> keyFn = s -> s;

    // select scheduler for each element
    final Function<String, Scheduler> schedulerFn = key -> schedulers.get(Math.abs(key.hashCode()) % schedulers.size());
    
    Observable.just("one", "two", "three", "one", "two", "four")
            .groupBy(i -> i) // this is value -> key function
            .flatMap(g -> g.subscribeOn(schedulerFn.apply(g.getKey())))
            .subscribe(e -> System.out.printf("key=%s value=%s thread=%s\n", e, e, Thread.currentThread().getName()));
    
    Thread.sleep(1_000);

    }

    Unfortunately this doesn’t work if called multiple times. I have created a test:
    Any help would be appreciated
    Eugene Popovich
    @httpdispatch
    @asereda-gs use observeOn insteadof subscribeOn
    or create new observable in the flatMap
    Observable.just(g).subscribeOn(schedulerFn.apply(g.getKey())
    David Karnok
    @akarnokd
    g.subscribeOn moves the subscription to another thread but g is a hot Observable and therefore subscribeOn has no practical effect on it. Use observeOn but note that your setup will still execute the onNext handler of subscribe sequentially. You should add computation after g.observeOn(...).op().op() before merging the results. Also note that Observable.just(g) will likely still not work as there is no reason to turn g into a nested source.
    Andrei Sereda
    @asereda-gs
    @akarnokd something like this ?
    observable.groupBy(keyFn::apply)
    .flatMap(o -> o.observeOn(fn.apply(o.getKey())).map(dummy -> dummy))
    .subscribe(e -> {
    state.computeIfAbsent(e, ignore -> new CopyOnWriteArraySet<>()).add(Thread.currentThread().getName());
    latch.countDown();
    });
    if the problem in Observable.just() for test as the source ?
    @httpdispatch I did try with both observeOn() and subscribeOn() same result
    Andrei Sereda
    @asereda-gs
    Observable.just("one", "two", "three", "one", "two", "four")
            .groupBy(i -> i)
            .flatMap(g -> g.observeOn(schedulerFn.apply(g.getKey())).map(dummy -> dummy))
            .subscribe(e -> System.out.printf("key=%s value=%s thread=%s\n", e, e, Thread.currentThread().getName()));
    key=one value=one thread=pool-3-thread-1
    key=one value=one thread=pool-3-thread-1
    key=two value=two thread=pool-3-thread-1
    key=two value=two thread=pool-3-thread-1
    key=four value=four thread=pool-3-thread-1
    key=three value=three thread=pool-3-thread-1
    I would like subscription to happen in different threads
    David Karnok
    @akarnokd
    FlatMap can reuse the same thread to emit items coming from different threads. You should put the computation into that map(dummy -> dummy) part.
    Andrei Sereda
    @asereda-gs
    How can I "isolate" that functionality as
    Observable Transformer / Operation ?
    Observable.compose( .... ).subscribe( )
    David Karnok
    @akarnokd
    .flatMap(g -> g.observeOn(schedulerFn.apply(g.getKey())).doOnNext(e -> System.out.printf("key=%s value=%s thread=%s\n", e, e, Thread.currentThread().getName())))
    Andrei Sereda
    @asereda-gs
    Thanks David. What I'm looking is something like this:
    // consumer is "single-threaded"
    Consumer<String> consumer = str -> {
      state.computeIfAbsent(str, ignore -> ConcurrentHashMap.newKeySet()).add(Thread.currentThread().getName());
      latch.countDown();
    };
    
    observable.compose(dynamicSchedulerSubscription(fn)).subscribe(consumer::accept);
    private static <T, K> Observable.Transformer<T, T> dynamicSchedulerSubscription(Function<T, Scheduler> fn) {
    // this doesn't work
    return obs -> obs.flatMap(o -> Observable.just(o).observeOn(fn.apply(o)));
    }