Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Nov 25 21:01
    DavidWeiss2 commented #6946
  • Nov 25 18:24
    adaico commented #6406
  • Nov 25 18:14
    adaico commented #6406
  • Nov 22 07:31
    bene-we commented #7117
  • Nov 21 18:20
    BrainCrumbz opened #7119
  • Nov 20 04:30
    MeetzhDing synchronize #6926
  • Nov 19 12:25
    tmtron commented #5273
  • Nov 19 02:19
    kwonoj commented #7118
  • Nov 19 01:51
    adellamaggiora commented #7118
  • Nov 18 23:47
    kwonoj closed #7118
  • Nov 18 23:47
    kwonoj commented #7118
  • Nov 18 13:57
    adellamaggiora edited #7118
  • Nov 18 13:57
    adellamaggiora edited #7118
  • Nov 18 11:29
    adellamaggiora opened #7118
  • Nov 17 21:58
    kwonoj closed #7117
  • Nov 17 21:58
    kwonoj commented #7117
  • Nov 17 19:27
    bene-we edited #7117
  • Nov 17 19:26
    bene-we opened #7117
  • Nov 17 16:29
    voliva commented #7116
  • Nov 17 16:28
    voliva commented #7116
Dorus
@Dorus
yes it will only receive data that has been emitted from the subject after you subscribe.
Lodewijk Wensveen
@lwensveen
Which is none? :S
Dorus
@Dorus
Anyway this is unrelated to your cancel question (that i still do not quite understand)
Lodewijk Wensveen
@lwensveen
rxjs always works great till it doesn't
If I log inside that subscribe it logs like 3 times
at random really
Dorus
@Dorus
then you are calling the subject from somewhere
Lodewijk Wensveen
@lwensveen
Seems it doesn't clear it or something
Dorus
@Dorus
you could add tap() with a log in a few places to see wht's going trough it at each step
Lodewijk Wensveen
@lwensveen
But anyways this is too confusing for me, normally it makes sense but this is really strange, so I might just rewrite it using promises or something
Yeah that's what I did but see ^
Anyways it's still not clear to me what you meant with the next and the pipe?
A subject needs data, so you need to call next, or init it as a behavioursubject?
Dorus
@Dorus
A subject just forwards the next call to any current subscribers when you call next on it.
Lodewijk Wensveen
@lwensveen
Yes correct
Then how would I do that pipe etc without calling next?
Dorus
@Dorus
A subject is a hot primitive, it's both a observer and observable and it contains live data, other than say a cold observable like of(changes) that only holds one value and replays it each time you subscribe. Creating a new execution context each time you do.
If you use a subject, you should subscribe first and call next on it later
It's like, turn on the radio before you broadcase something, else you wont hear it.
Lodewijk Wensveen
@lwensveen
Ahh like that
Dorus
@Dorus
It's the biggest difference between promises and observable. Promises are eager, observable lazy. So with observable nothing happens till you subscribe.
Promises start running right away and store their result.
Aaron Brewer
@spaceribs

Hey friends, I've been exploring combining the concept of "Behavior Trees" (https://en.wikipedia.org/wiki/Behavior_Trees) with RxJS, and this revealed to me a combiner that I don't think currently exists, so I wanted to discuss if that's the case and if I should put up a PR to add such a combiner.

The most basic behavior tree at minimum contains an AND (sequence) combiner and an OR (selector) combiner. AND combiners run their children in sequence, and stop running if there are any failures. An AND combiner is actually very easy to translate to RxJS, it's literally just a concat.

An OR combiner instead runs it's children until success, and only fails if all it's children fail to return success. This is actually very similar to onErrorResumeNext except for the following attributes:

  • When all selectors are exhausted, throw an error.
  • Stop executing when any observable completes.

The actual code for the combiner is below:

export function selector<T, R>(
  ...sources: Array<
    | ObservableInput<any>
    | Array<ObservableInput<any>>
    | ((...values: Array<any>) => R)
  >
): Observable<R> {
  if (sources.length === 0) {
    return throwError(new Error("No selections were successfully executed."));
  }

  const [first, ...remainder] = sources;

  if (sources.length === 1 && Array.isArray(first)) {
    return onErrorResumeNext(...first);
  }

  return new Observable((subscriber) => {
    const subNext = () =>
      subscriber.add(selector(...remainder).subscribe(subscriber));

    return from(first).subscribe({
      next(value) {
        subscriber.next(value);
      },
      error: subNext,
      complete: () => {
        subscriber.complete();
      },
    });
  });
}

Should I open a PR or am I recreating/reopening a discussion that has already occurred?

also, some marble tests I've made to describe the the intended output:
  it("finds the first successful selection from a set.", () => {
    const firstObs$ = cold("--#");
    const secondObs$ = cold("--z--#");
    const thirdObs$ = cold("--x--y|");
    expect(selector(firstObs$, secondObs$, thirdObs$)).toBeMarble(
      "----z----x--y|"
    );
  });

  it("completes on the first successful selection.", () => {
    const firstObs$ = cold("--y#");
    const secondObs$ = cold("--z--|");
    const thirdObs$ = cold("--x--#");
    expect(selector(firstObs$, secondObs$, thirdObs$)).toBeMarble("--y--z--|");
  });

  it("errors if no selections are successful", () => {
    const firstObs$ = cold("--y#");
    const secondObs$ = cold("z--#");
    const thirdObs$ = cold("--x--#");
    expect(selector(firstObs$, secondObs$, thirdObs$)).toBeMarble(
      "--yz----x--#"
    );
  });
Dorus
@Dorus

@spaceribs A better place to reach the devs is slack or github. The usual response is to make a separate packages to combines with RxJs. That said they might be enthusiastic about this idea depending on how lightweight it can be added.

Also while i'm not familiar with behavior tree's myself, i do like to add RxJs is for multi value sequences and error is a rare item to emit. I'm not sure how that combines with behavior tree's.

That said, i do think what you describe is some unique behavior not easily combined with existing operators. Indeed AND could be concat or merge, but what you describe for OR (stop when any completes, error when all error) is not something i can easily produce from existing operators and sounds like a quite useful one.

Do remember that because of RxJs async nature, it would be interesting to have these operators you describe in both parallel (like merge) and sequential (like concat) flavor. Especially when you work with live data, onErrorResumeNext and concat will not work.

Ps. I just tought up of a way to get what you aim for using existing operators. onErrorResumeNext already statisfies the stop requirement, and you can easily add the error requirement by not using onErrorResumeNext on the last observable in line.
Now i'm pondering how to make a similar operator for the parallel case :D (as well as a nicer one that emits all previous errors when the last source fails)

Dominic Watson
@intellix
struggling to make something in RxJS and wondering if rubber ducking will help me
so basically, in an ideal world fromEvent(window, 'orientationchange') would emit a new width/height, but as it stands, it emits before the width/height have actually changed and it seems the only way to actually know when the orientation has changed, is to use interval hacks
so what I'm trying to create is an observable that emits when it's ACTUALLY changed by jumping into a timer loop and emitting when so
I need to know the current orientation when you start listening, so I have a starting point, and then switch into listening to the orientation change I guess:
of(getOrientation()).pipe(
  switchMap(current => {
    return fromEvent(platformService.window, 'orientationchange').pipe(
      map(() => getOrientation()),
    );
  });
);
if (current !== new) I can straight up emit the new one, but if not, I should change to a timer to keep checking every 50ms:
of(getOrientation()).pipe(
  switchMap(current => {
    return fromEvent(platformService.window, 'orientationchange').pipe(
      map(() => getOrientation()),
      switchMap(newOrientation => {
        return (current !== newOrientation) ? of(newOrientation) : timer(0, 50).pipe(
          ...
        );
      }),
    );
  });
);
Dominic Watson
@intellix
I think the timer should map each iteration to a detected orientation, filter and take 1?
of(getOrientation()).pipe(
  switchMap(current => {
    return fromEvent(platformService.window, 'orientationchange').pipe(
      map(() => getOrientation()),
      switchMap(newOrientation => {
        return (current !== newOrientation) ? newOrientation : timer(0, 50).pipe(
          map(() => getOrientation()),
          filter(v => v === current),
          take(1),
        );
      }),
    );
  });
);
I guess my only problem now is that I'm stuck on the first inner switchMap with the wrong value and it won't work the next times in the future
Dominic Watson
@intellix
maybe this? I start with an initial value and also listen to orientation changes and get the NEXT value too. I use pairwise so I always have the last 2 emits
merge(
  of(getOrientation()),
  fromEvent(platformService.window, 'orientationchange').pipe(map(() => getOrientation())),
).pipe(
  pairwise(),
  switchMap(([prev, next]) => {
    return (prev !== next) ? next : timer(0, 50).pipe(
      map(() => getOrientation()),
      filter(v => v === current),
      take(1),
    );
  }),
);
Dominic Watson
@intellix
but that's not correct cause if I start from PORTRAIT and move to LANSCRAPE and it emits PORTRAIT 2x then it's only corrected in the 2nd part (LANDSCAPE) and when I move back to PORTRAIT it could get stuck waiting forever for PORTRAIT (previous emit) to !== PORTRAIT (next emit)
Dominic Watson
@intellix
it seems like what I need, is for the whole thing to re-run, the fetching of the initial again and the re-attaching of the listener
Dominic Watson
@intellix
and I can't start with the fromEvent because I need to know the orientation BEFORE
Dominic Watson
@intellix
omg.... I've figured it out, by using a recursive expand operator
function getOrientationChange$(prev) {
  return fromEvent(window, 'orientationchange').pipe(
    map(() => getOrientation()),
    switchMap(next => {
      return (prev !== next) ? of(next) : timer(0, 50).pipe(
        map(() => getOrientation()),
        filter(next2 => next2 !== prev),
      );
    }),
    take(1),
  );
}

of(getOrientation()).pipe(
  expand(v => getOrientationChange$(v)),
).subscribe(v => {
  console.log(v);
  document.querySelector('#results').innerHTML = v;
});
the magic being that I wanted the whole thing to go back into itself and couldn't figure out how to do it. I needed an initial value to seed it being the tricky part
connor-odoherty
@connor-odoherty

Hey all! I'm in the process of cleaning up a 4 year old codebase and saw that the auth flow uses an observer pattern not found anywhere else in the codebase:

  public login(credentials: LoginCredentials, loginSource = ""): Observable<any> {
    this.loginResponseMessage = null;
    if (...) {
      return observableThrowError("Please provide both a username and a password");
    } else {
      return Observable.create((observer) => {
        let urlSearchString = ".."
        this.http
          .post(this.serverInfo.getServerBaseURL() + "/users/api_sign_in", urlSearchString, { observe: "response" })
          .subscribe(
            (response: HttpResponse<any>) => this.signInHttpDataResponse(response, credentials, observer),
            (err) => {
              console.log(err);
              observer.error("Network error. <br><br>" + err.statusText);
              observer.complete();
            }
          );
      });
    }
  }

  private signInHttpDataResponse(response: HttpResponse<any>, credentials, observer) {
    const responseData = response.body;
    this.loginResponseMessage = responseData.message;
    if (responseData.success == true) {
      const auth_header = response.headers.get("Authorization");
      let jwt_token = null;
      if (auth_header) {
        const header_pair = auth_header.split(" ");
        if (header_pair.length == 2 && header_pair[0] == "Bearer") {
          jwt_token = header_pair[1];
          responseData.email = credentials.email;
          responseData.jwtToken = jwt_token;

          let currentUser = this.userStore.getUserNow();
          if (currentUser.suduAsUser != null) {
            currentUser.suduAsUser = new CurrentUser(responseData);
            this.userStore.setUserWithCurrentUser(currentUser);
          } else {
            this.userStore.setUserWithJSON(responseData);
          }
          observer.next(response);
        } else {
          observer.error({
            message: "The server failed to return a valid authorization token",
          });
        }
      } else {
        observer.error({
          message: "The server failed to return an authorization token",
        });
      }
    } else if (responseData.failureReason && responseData.failureReason == "TWO_FACTOR_REQUIRED") {
      // login failed because two factor is required
      // save the current user data for use by the two factor page and return
      responseData.email = credentials.email;
      // this will store the twoFactorTempId for us
      this.userStore.setUserWithJSON(responseData);
      observer.next(response);
    } else {
      observer.error(responseData);
    }

    observer.complete();
  }

Everywhere else, I've been using a pattern like this:

  public destroyExpenseDocument(expenseToUpdate: Expense, expenseDocument: ExpenseDocument): Observable<any> {
    return this.expenseAccessor.getExpense(expenseToUpdate.uuid).pipe(
      map((expense: Expense) => {
        expense.expense_documents = filter(expense.expense_documents, (doc: ExpenseDocument) => {
          return doc.uuid !== expenseDocument.uuid;
        });
        return expense;
      }),
      mergeMap((expenseHere) => {
        return this.expenseAccessor.setExpense(expenseHere);
      }),
      tap(() => {
        this.hydrateCachedExpenses();
      })
    );
  }

Is there any reason to use the Observer.next pattern instead of the pipe/mergeMap pattern?

I could see the Observer.next pattern making sense if we expected that observer to emit multiple times, but as far as I can tell the conditional tree ensures it will only emit once
Matt Erman
@CodeLiftSleep
So an observable created in a parent component is not subscribable in the child component?
Dorus
@Dorus
@CodeLiftSleep i'm not sure what you mean, rxjs has no concepts of (parent) components.
Matt Erman
@CodeLiftSleep
This doesn't work...header$ is always undefined---why?
//parent class
export class baseComponent implements OnInit {
  header$: Observable<headerData>;

   ngOnIt() {
      header$ = this.store.select(getHeaderData);
   }
}

//child class

export class headerComponent extends baseComponent imlpements OnInit {
   ngOnInit() {
      header$.subscribe(header => { <---- error 'cannot subscribe to undefined'
         console.log(header);
      });
   }
}
Dorus
@Dorus
@connor-odoherty the pattern above is observable.create(), the most advanced for of creating an observable i would usually only recommand if all else fail. Indeed since the code above is just subscribing to this.http.post() and then mapping the error callback and calling a function on next calls, it could have been
  public login(credentials: LoginCredentials, loginSource = ""): Observable<any> {
    this.loginResponseMessage = null;
    if (...) {
      return observableThrowError("Please provide both a username and a password");
    } else {
      return Observable.defer(() => {
        let urlSearchString = ".."
        return this.http
          .post(this.serverInfo.getServerBaseURL() + "/users/api_sign_in", urlSearchString, { observe: "response" })
      }).pipe(
        tap((response: HttpResponse<any>) => this.signInHttpDataResponse(response, credentials, observer),
            err) => console.log(err))
        catchError((err) => "Network error. <br><br>" + err.statusText)
      );
    }
  }
Matt Erman
@CodeLiftSleep
@Dorus how is an obervable declared in the parent class undefined in the child class?
Dorus
@Dorus
@CodeLiftSleep This is angular code, or rather OO code. If i understand this right you override the ngOnInit of the base class in the subclass.
Matt Erman
@CodeLiftSleep
so I would need to do something like super.header$?
Dorus
@Dorus
You would need to do something like super.ngOnInit() or something.