Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • 01:49
    benlesh review_requested #5077
  • 01:49
    benlesh opened #5077
  • 01:37

    benlesh on master

    build(package): refactoring bui… (compare)

  • 01:37
    benlesh closed #5032
  • 01:17
    benlesh review_requested #5019
  • 01:17
    benlesh commented #5019
  • 01:15

    benlesh on master

    test(take operator): use TestSc… (compare)

  • 01:15
    benlesh closed #5038
  • 01:15

    benlesh on master

    test(delay operator): execute t… (compare)

  • 01:15
    benlesh closed #5040
  • 01:15

    benlesh on master

    test(audit operator): execute t… (compare)

  • 01:15
    benlesh closed #5052
  • 01:14

    benlesh on master

    test(buffer operator): execute … (compare)

  • 01:14
    benlesh closed #5055
  • 01:13

    benlesh on master

    Update of.ts (#5027) * Update … (compare)

  • 01:13
    benlesh closed #5027
  • 00:59

    benlesh on master

    fix(filter): Fix overload order… (compare)

  • 00:59
    benlesh closed #5024
  • 00:57

    benlesh on master

    update tests to use TestSchedul… (compare)

  • 00:57
    benlesh closed #5054
A-STAR
@A-STAR
:D
Dorus
@Dorus

@Dorus I've never used publish with a selector as you do in the snippet you sent. Is this used so we can subscribe to source1 and source2 multiple times without actually executing the side effects multiple times?

Yes

and more specifically, I was confused about takeUntil(), I would have expected something like delayWhen() (assuming it does what I think it does)

But you cannot stop delayWhen so it would keep delaying even after you no longer need so. There used to be a pauseWhen many versions ago.
@renke

Karsten Pruehss
@kpruehss
i must have had things on my mind
Dorus
@Dorus
const currentNum$ = zip(nums$, clicks$).pipe(map([nums, clicks]) => ([nums, clicks]))

this should emit a tuple of the number and the associated click (if you want to just emit the numbers, or an object etc, just change the map function)

What does this map add exactly?

Karsten Pruehss
@kpruehss
@Dorus in this particular snippet, nothing. But if you wanted to transform the emitted value into something else, like say, only emit the numbers, or as an object literal, you'd do that through the map. I guess I could have left it off, but i didn't want to assume knowledge
@Dorus also bad wording on my part perhaps, next time i'll actually just transform the emission into something. i blame the lack of coffee
Renke Grunwald
@renke
@Dorus I've played around with your second snippet, but I couldn't get it to work; source2 values come before value from source1 and come again after the values from source1. The very first snippet you sent uses shareReplay with a selector which my RxJS version (~6.5) does not seem to support .
      const foo = first$.pipe(
        publish(firstSubject$ => {
          return second$.pipe(secondSubject$ => {
            return concat(
              merge(
                firstSubject$,
                secondSubject$.pipe(
                  takeUntil(concat(firstSubject$.pipe(ignoreElements()), of())),
                  toArray(),
                  mergeAll()
                )
              ),
              secondSubject$
            );
          });
        })
      );
const first$ = of("first1", "first2");
const second$ = of("second1", "second2");
Dorus
@Dorus
@renke You're missing an publish to create secondSubject
Renke Grunwald
@renke
oh...
Dorus
@Dorus
Like this it will subscribe to second$ twice and idk what effect that has.
Renke Grunwald
@renke
mh, I still see the values from second$ first. Is it a problem that I use of as to create my observables here?
Dorus
@Dorus

@renke https://rxviz.com/v/moY6P51O

works for me.

@renke Oops indeed, looks like that when source2 completes, it instantly emit.
@renke https://rxviz.com/v/38MaX4Y8
source1.pipe(publish(src1 => source2.pipe(publish(src2 =>
  concat(
    merge(
      src1,
      merge(NEVER, src2).pipe(
        takeUntil(concat(src1.pipe(ignoreElements()), of(1))),
        toArray(),
        mergeAll()
    )), src2)
))))
I wonder if i missed a way to simplify this...
Renke Grunwald
@renke
yes, that makes sense, I guess, that would prevent it from completing before src1 completes.
Dorus
@Dorus
yup
toArray react to completion and you dont want src2 completing to trigger that before src1 does.
Renke Grunwald
@renke
I still don't understand why delayWhen is not the right choice here:
      const foo = first$.pipe(
        publish(firstSubject$ => {
          return merge(
            firstSubject$,
            second$.pipe(
              delayWhen(() =>
                concat(firstSubject$.pipe(ignoreElements()), of())
              )
            )
          );
        })
      );
I would imagine that when firstSubject$ completes there is no leakage from delayWhen whatsoever
Dorus
@Dorus
You should test it. But i dont think this will allow second$ to emit once first is done, not after the initial burst.
Idk, does firstSubject$ remember it completed?
Renke Grunwald
@renke
oh, i see the factory is called on each emit of second$?
Dorus
@Dorus
@renke Quick test does show it works. I said nothing :D https://rxviz.com/v/B8ZABXaJ
Actually, thinking back, i think the subject stays in completion status and will emit complete() right away once you subscribe again on new commits from second$
So that's a very neat solution
Renke Grunwald
@renke
it feels a bit more natural so to say
that aside, thanks for your help, I think I learned a lot here, especially about publish
Dorus
@Dorus
Glad to help, i was half asleep yesterday so it's a miracle i gave at least somewhat working code xD
Renke Grunwald
@renke
it certainly pushed me into the right direction, although your shareReplay snippet confused me a bit :-D
Dorus
@Dorus
it's like publish but with a replaySubject backing the inner stream.
Just a ranom brainwave of mine i guess :P
I do like this new found way to using delayWhen. gotta memorize it.
Renke Grunwald
@renke
I am still puzzled why there is no publishReplay that takes a selector function though. Does that not make sense or why does only publish support that.
Dorus
@Dorus
https://rxjs.dev/api/operators/publishReplay does show selectorOrScheduler
@renke
Renke Grunwald
@renke
Oh, I see, maybe was looking at shareReplay which seem to lack it indeed
Dorus
@Dorus
@renke Oops, my mistake. I named shareReplay and i also forgot to set the first 2 params. Idk i always confuse share and publish, even thou i know very well what they do.
Will Mruzek
@mruzekw

Hi there, say I have a high-order observable. It basically contains a set of set of numbers.

I'd like the first inner observable to be listened to until the next inner observable is available. Then the next one should be listened to. I know there's switchMap, but I can't seem to get it to work.

const setOfSetOfNums$ = merge(
  of([4,5,6,7]).pipe(delay(50)),
  of([0,1,2,3]),
).pipe(
  distinctUntilChanged(),
  switchMap(context => of(...context)),
)

zip(interval(1000), setOfSetOfNums$).pipe(
  map(([_, hand]) => hand),
)

I would expect this to output something like:

---0---1---4---5---6---7--->

But it spits out:

---0---1---2---3---4---5---6---7--->

Derek
@derekkite
@mruzekw not quite clear what you need. 0-1-? or 0 - 4 - 5 - 6 - 7 - 1 then what? maybe describe what you are trying to accomplish
Will Mruzek
@mruzekw
I would like the delayed observable to interrupt the non-delayed one
Thus the desired output of ---0---1---4---5---6---7--->
In actuality, it's a user event that generates a new set of nums to listen to, but this should replicate the idea...I think?
If there's a new observable in setOfSetOfNums$, we should immediately switch to that
Dorus
@Dorus
@mruzekw the problem is you zip after switchMap
thus you have them first emit all events at the same time, and then queue up at zip after the switchMap
Observables have no so called backpressure
Derek
@derekkite
of(4,5,6,7).pipe(
    delay(50), 
    switchMap((of(1,2,3,4))
)