Actually that project has asyncIterator too.
It's async, but whenever I've tried IxJS I quickly run into problems or unexpected behavior.
It simply doesn't behave closely enough to RxJS to be intuitive, but these differences aren't actually documented so you're left guessing your way through it.
Ah okay. I only know it by name so I'm not sure i could help with that.

hey guys! I have this Observable property in angular:

  get listIsEmpty$(): Observable<boolean | undefined> {
    if (!this._listIsEmpty$) {
      const deferredValue: Observable<boolean | undefined> = defer(() => of(this._flatNodes && this._flatNodes.length === 0));
      // initially return "undefined", then actual value (after 800 ms)
      this._listIsEmpty$ = concat(of(undefined), timer(1000).pipe(switchMapTo(deferredValue)));
    return this._listIsEmpty$;

and it's used in component's template with async. Now, I need it to start emitting again when my list is initialized again (this._flatNodes = <somevalue>). what's the best way of doing this?

for now I just set this._listIsEmpty$ = undefined;
But I think this way async pipe will not unsubscribe correctly from the previous Observable
Robert Elliot

Hi all!

I'm using rxjs with typescript and a STOMP subscription. For... reasons I can't use a STOMP error frame if the subscription can't be set up server side, so instead I send an error message on the topic.

I'm trying to work out the right way to handle that client side. At the moment I'm doing this:

interface DesiredData {
  data: string;
interface SubscribeError
function isDesiredData(obj: DesiredData | SubscribeError): obj is DesiredData {
  return obj.data !== undefined;

const stompService: RxStompService
const topic: string

const messages = stompService.watch(topic)
    map((message: IFrame) => JSON.parse(message.body) as DesiredData | SubscribeError),
    takeWhile(message => isDesiredData(message), true)

const [dataStream, error] = partition(messages, message => isDesiredData(message));

  .pipe(map(it => it as DesiredData))

  .pipe(map(it => it as SubscribeError))

Is there a nicer way to handle the different message types?

guess that depends on how you want to handle the errors...
I'd probably just let the JSON parse step throw a error and then handle it like any other error (so probably catchError) :/
Robert Elliot
That's a good idea, just check the type after the parse and throw... I'll give that a try. Thanks!
Jan-Niklas W.
Hi everyone, on the 12th of december I am going to do a live rxjs workshop on twitch. If someone is interested, feel free to join, I would be super excited: https://twitter.com/niklas_wortmann/status/1331991104867033088
Hi All, I am new to RxJs, and I wanted to convert the old .subscribe() way to rxjs way, but I also dont want to change my project structure. So I want to have a separate service.ts file & in component I want to simply console.log the data the way we do with .subscribe(), but in rxjs way. Is it possible to do so ? Below is a stackbliz for the same : https://stackblitz.com/edit/subscribe-pcnfrt?file=src/app/app.component.ts
Basically I still want to keep API end point call in .service.ts file & do all data manipulation with rxjs in component file. Is doing so possible ?
Daniel Willis
Daniel Willis
@abhip5369 I refactored it to work with the type of the object coming back from the API. I've also implemented search on the 'body' field, so you can see how the search term from the input field gets incorporated into the Employee Observable stream to do the filtering in the component (rather than in the template with a pipe)
Pablo Maurer
Somebody up for help? why isn't it possible to convert a promise to a observable and then use it with withLatestFrom?
          map((res) => {
            console.log('never resolving', res);
I haven't tested the code and I'm no export so what I'm saying might be wrong.
I think your code does work, however at the point in time where withLatestFrom is being executed, the promise didn't resolve yet. This means that the latest value of your promise is nothing. withLatestFrom does not wait for the Promise or Observable to finish, it will merely take the last emitted value (if one was emitted).
You could checkout forkJoin. It's kinda like Promise.all but for Observables
Xiang Wu

Hi all, I would like to ask a question about fetching the json Data.
Assume the I need the results.name part. but the data is in serveral url with different page number,
starting from 1, then 2,3,4,5.. And i need to use the 'next' to obtain the next page, when 'next':'', then this is the
last page. How should I create the Observable, and use it?

    "next": "http://swapi.dev/api/planets/?page=2", 
    "previous": null, 
    "results": [
            name: planet1
type Planet = {
    name: string;

type Data = {
    next: string;
    results: Planet[];

getData(num: number = 1):Observable<Data>{
    return this.http.get('https://swapi.dev/api/planets/?page='+ num)
           .pipe(map(res: {next: string, results: Planet[]})=>{
               return {
                   next: res.next,
                   results: res.results,

then in the target component, I need to subscribe the getData method


        this.planets = res.results,

then I could display all the name of planets from the first url (http://swapi.dev/api/planets/?page=1) on the template.

<div *ngFor = let item of planets>{{item.name}}</div>

My question is how to concat the data from the rest of pages to here (page number is unknown) Thank you!

Daniel Willis
@mnewmedia_twitter I think the reason it's not working is because of('') is never emitting. I haven't tested it, but maybe you want something like this:
  tap((res) => console.log('never resolving', res)),
Pablo Maurer
@danww thanks but actually i found out of('') is emitting, but when it comes to withLatestFrom the promise just start's resolving but did not emit yet so there's no latestValue that withLatestFrom can take. But thanks for trying =)
@lake-toya Thanks you, you where right ;)
Daniel Willis
@mnewmedia_twitter Ah right. Why do you need to use withLatestFrom, rather than working directly from the from(myPromise)?
Brandt B.
Remind me what’s the use of the second parameter to catchError? I can’t remmeber when that stream is useful
Daniel Willis
@babeal It's the source Observable, more detail here: https://rxjs-dev.firebaseapp.com/api/operators/catchError
@babeal probably usefull when the source is cold and you want to retry?
Brandt B.
@Dorus that’s what i was wondering but I read it different. Since the retry was being using in an effect, so inside of a switch or merge map, there was a closure around the original value that I used to redispatch the event.
akarpov л


Which operator could I use for the next usecase?

I have a Behavior subject, I start to push data into the subject, and I would like to wait certain time (like 50-100-200ms) before hitting the subscription, then after some time, again I push data into the subject, and I would like to wait Xms before hitting the subscription, I'm only interested in the last value.

@Norby125 throttle or audit
Or the variations throttleTime and auditTime ofc.
@babeal ah i only read your last comment, i didnt look at all those other things :)
Brandt B.
@Dorus it’s all good. I’m a little rusty…
which operator is like combineLatest, but does not wait for all observables to have value? So technically hits subscription whenever any of those fires
4 replies


how to make sure that I unsubscribe from the observable array when the switchMap cancels the previous flow?

3 replies
I thought that switchMap will unsubscribe the previous observables where I return mouseOver and mouseOut, but I was wrong
Are there any examples of bidrectional rxjs operators other than websocket?
Simone Bianchin
I don't understand why the second observable of concat is subscribed without waiting for the first completition
1 reply
Florian Spier

Hi, can somebody help me with this question... https://stackoverflow.com/questions/61024321/extend-observable-and-set-external-source

It is about extending Observable and set an external source. I found NgRx doing it here:
However RxJS Observable gives a deprecation warning on ´Observable.source´: https://github.com/ReactiveX/rxjs/blob/6.5.5/src/internal/Observable.ts#L25

Is it safe to use source although its deprecation warning?
And is it necessary to implement lift?

here is a small stackblitz demo to play with: https://stackblitz.com/edit/extend-observable


what do I do with an empty subscription:

const sub = interval(5).subscribe()

if I omit the callbacks to subscribe, then what good is it? just curious

2 replies
Screen Shot 2020-12-26 at 3.17.04 AM.png
Screen Shot 2020-12-26 at 3.18.16 AM.png

also what is going on here, it says when we do this:

const o = new Observer(obs => {
        // obs = o,  right?

that's what typescript types say?

4 replies
anyone know how to create an async queue with observables, where concurrency is a variable? this is my attempt but not sure if it's right: https://gist.github.com/ORESoftware/e73257ee495f949d2e2ce262af4bac9e
2 replies
can somebody explain why rxjs doesnt work for me with webpack? getting tons of these errors: https://paste.centos.org/view/raw/6dc6d716

Hello I have a problem. I have a "combineLatest" with 3 observables

1) onSortChange$ - is a BehaviorSubject (Mat-table sort change event (Sort class)) => emit '{sort: "name", direction: "asc"}'
2) onPageChange$ - is a BehaviorSubject (Mat-table change page event (PageEvent class)) => emit '{pageIndex: 1,2,3, ..., etc}'
3) onFilterChange$ - is a BehaviorSubject (My custom filter implementation).

When some observable emits a value, I call a service, so far, everything is perfect. What I need is that "onPageChange$" resets (again pageIndex = 1) when observable 2 or 3 change value. For example:

First values (default)

onSortChange$ = name, asc
onPageChange$ = 2
onFilterChange$ = blah blah

CALL_HTTP_WITH_PARAMETERS ("name, asc", 2, "blah blah")

// Observable 1 emit "lastName, desc" value

onSortChange$ = lastName,desc
onPageChange$ = 2 ==> has to be 1 again
onFilterChange$ = blah blah

CALL_HTTP_WITH_PARAMETERS ("lastName, desc", 1, "blah blah")