Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • 09:42
    voliva commented #7078
  • 09:35
    voliva edited #7078
  • 09:35
    voliva opened #7078
  • Oct 02 14:28
    snawaz commented #6641
  • Oct 01 19:26
    tmair edited #6952
  • Oct 01 17:34
    tmair ready_for_review #6952
  • Oct 01 17:34
    tmair commented #6952
  • Oct 01 17:31
    tmair commented #6952
  • Oct 01 17:23
    tmair synchronize #6952
  • Oct 01 15:16
    bever1337 commented #5385
  • Oct 01 15:12
    bever1337 commented #5385
  • Oct 01 15:10
    bever1337 commented #5385
  • Oct 01 15:09
    bever1337 commented #5385
  • Oct 01 15:06
    bever1337 commented #5385
  • Oct 01 15:06
    bever1337 commented #5385
  • Oct 01 15:05
    bever1337 commented #5385
  • Oct 01 06:39

    kwonoj on feat-turbo-poc

    build(package): migrate pkglock build(package): initial core/cj… (compare)

  • Oct 01 00:27

    kwonoj on feat-turbo-poc

    wip (compare)

  • Oct 01 00:23

    kwonoj on feat-turbo-poc

    build(turbo): initial pipeline … build(package): define core refactor(core): copy initial src and 3 more (compare)

  • Oct 01 00:20

    kwonoj on feat-turbo-poc

    build(package): define core refactor(core): copy initial src refactor(core): copy specs and 2 more (compare)

Lodewijk Wensveen
@lwensveen
But anyways this is too confusing for me, normally it makes sense but this is really strange, so I might just rewrite it using promises or something
Yeah that's what I did but see ^
Anyways it's still not clear to me what you meant with the next and the pipe?
A subject needs data, so you need to call next, or init it as a behavioursubject?
Dorus
@Dorus
A subject just forwards the next call to any current subscribers when you call next on it.
Lodewijk Wensveen
@lwensveen
Yes correct
Then how would I do that pipe etc without calling next?
Dorus
@Dorus
A subject is a hot primitive, it's both a observer and observable and it contains live data, other than say a cold observable like of(changes) that only holds one value and replays it each time you subscribe. Creating a new execution context each time you do.
If you use a subject, you should subscribe first and call next on it later
It's like, turn on the radio before you broadcase something, else you wont hear it.
Lodewijk Wensveen
@lwensveen
Ahh like that
Dorus
@Dorus
It's the biggest difference between promises and observable. Promises are eager, observable lazy. So with observable nothing happens till you subscribe.
Promises start running right away and store their result.
Aaron Brewer
@spaceribs

Hey friends, I've been exploring combining the concept of "Behavior Trees" (https://en.wikipedia.org/wiki/Behavior_Trees) with RxJS, and this revealed to me a combiner that I don't think currently exists, so I wanted to discuss if that's the case and if I should put up a PR to add such a combiner.

The most basic behavior tree at minimum contains an AND (sequence) combiner and an OR (selector) combiner. AND combiners run their children in sequence, and stop running if there are any failures. An AND combiner is actually very easy to translate to RxJS, it's literally just a concat.

An OR combiner instead runs it's children until success, and only fails if all it's children fail to return success. This is actually very similar to onErrorResumeNext except for the following attributes:

  • When all selectors are exhausted, throw an error.
  • Stop executing when any observable completes.

The actual code for the combiner is below:

export function selector<T, R>(
  ...sources: Array<
    | ObservableInput<any>
    | Array<ObservableInput<any>>
    | ((...values: Array<any>) => R)
  >
): Observable<R> {
  if (sources.length === 0) {
    return throwError(new Error("No selections were successfully executed."));
  }

  const [first, ...remainder] = sources;

  if (sources.length === 1 && Array.isArray(first)) {
    return onErrorResumeNext(...first);
  }

  return new Observable((subscriber) => {
    const subNext = () =>
      subscriber.add(selector(...remainder).subscribe(subscriber));

    return from(first).subscribe({
      next(value) {
        subscriber.next(value);
      },
      error: subNext,
      complete: () => {
        subscriber.complete();
      },
    });
  });
}

Should I open a PR or am I recreating/reopening a discussion that has already occurred?

also, some marble tests I've made to describe the the intended output:
  it("finds the first successful selection from a set.", () => {
    const firstObs$ = cold("--#");
    const secondObs$ = cold("--z--#");
    const thirdObs$ = cold("--x--y|");
    expect(selector(firstObs$, secondObs$, thirdObs$)).toBeMarble(
      "----z----x--y|"
    );
  });

  it("completes on the first successful selection.", () => {
    const firstObs$ = cold("--y#");
    const secondObs$ = cold("--z--|");
    const thirdObs$ = cold("--x--#");
    expect(selector(firstObs$, secondObs$, thirdObs$)).toBeMarble("--y--z--|");
  });

  it("errors if no selections are successful", () => {
    const firstObs$ = cold("--y#");
    const secondObs$ = cold("z--#");
    const thirdObs$ = cold("--x--#");
    expect(selector(firstObs$, secondObs$, thirdObs$)).toBeMarble(
      "--yz----x--#"
    );
  });
Dorus
@Dorus

@spaceribs A better place to reach the devs is slack or github. The usual response is to make a separate packages to combines with RxJs. That said they might be enthusiastic about this idea depending on how lightweight it can be added.

Also while i'm not familiar with behavior tree's myself, i do like to add RxJs is for multi value sequences and error is a rare item to emit. I'm not sure how that combines with behavior tree's.

That said, i do think what you describe is some unique behavior not easily combined with existing operators. Indeed AND could be concat or merge, but what you describe for OR (stop when any completes, error when all error) is not something i can easily produce from existing operators and sounds like a quite useful one.

Do remember that because of RxJs async nature, it would be interesting to have these operators you describe in both parallel (like merge) and sequential (like concat) flavor. Especially when you work with live data, onErrorResumeNext and concat will not work.

Ps. I just tought up of a way to get what you aim for using existing operators. onErrorResumeNext already statisfies the stop requirement, and you can easily add the error requirement by not using onErrorResumeNext on the last observable in line.
Now i'm pondering how to make a similar operator for the parallel case :D (as well as a nicer one that emits all previous errors when the last source fails)

Dominic Watson
@intellix
struggling to make something in RxJS and wondering if rubber ducking will help me
so basically, in an ideal world fromEvent(window, 'orientationchange') would emit a new width/height, but as it stands, it emits before the width/height have actually changed and it seems the only way to actually know when the orientation has changed, is to use interval hacks
so what I'm trying to create is an observable that emits when it's ACTUALLY changed by jumping into a timer loop and emitting when so
I need to know the current orientation when you start listening, so I have a starting point, and then switch into listening to the orientation change I guess:
of(getOrientation()).pipe(
  switchMap(current => {
    return fromEvent(platformService.window, 'orientationchange').pipe(
      map(() => getOrientation()),
    );
  });
);
if (current !== new) I can straight up emit the new one, but if not, I should change to a timer to keep checking every 50ms:
of(getOrientation()).pipe(
  switchMap(current => {
    return fromEvent(platformService.window, 'orientationchange').pipe(
      map(() => getOrientation()),
      switchMap(newOrientation => {
        return (current !== newOrientation) ? of(newOrientation) : timer(0, 50).pipe(
          ...
        );
      }),
    );
  });
);
Dominic Watson
@intellix
I think the timer should map each iteration to a detected orientation, filter and take 1?
of(getOrientation()).pipe(
  switchMap(current => {
    return fromEvent(platformService.window, 'orientationchange').pipe(
      map(() => getOrientation()),
      switchMap(newOrientation => {
        return (current !== newOrientation) ? newOrientation : timer(0, 50).pipe(
          map(() => getOrientation()),
          filter(v => v === current),
          take(1),
        );
      }),
    );
  });
);
I guess my only problem now is that I'm stuck on the first inner switchMap with the wrong value and it won't work the next times in the future
Dominic Watson
@intellix
maybe this? I start with an initial value and also listen to orientation changes and get the NEXT value too. I use pairwise so I always have the last 2 emits
merge(
  of(getOrientation()),
  fromEvent(platformService.window, 'orientationchange').pipe(map(() => getOrientation())),
).pipe(
  pairwise(),
  switchMap(([prev, next]) => {
    return (prev !== next) ? next : timer(0, 50).pipe(
      map(() => getOrientation()),
      filter(v => v === current),
      take(1),
    );
  }),
);
Dominic Watson
@intellix
but that's not correct cause if I start from PORTRAIT and move to LANSCRAPE and it emits PORTRAIT 2x then it's only corrected in the 2nd part (LANDSCAPE) and when I move back to PORTRAIT it could get stuck waiting forever for PORTRAIT (previous emit) to !== PORTRAIT (next emit)
Dominic Watson
@intellix
it seems like what I need, is for the whole thing to re-run, the fetching of the initial again and the re-attaching of the listener
Dominic Watson
@intellix
and I can't start with the fromEvent because I need to know the orientation BEFORE
Dominic Watson
@intellix
omg.... I've figured it out, by using a recursive expand operator
function getOrientationChange$(prev) {
  return fromEvent(window, 'orientationchange').pipe(
    map(() => getOrientation()),
    switchMap(next => {
      return (prev !== next) ? of(next) : timer(0, 50).pipe(
        map(() => getOrientation()),
        filter(next2 => next2 !== prev),
      );
    }),
    take(1),
  );
}

of(getOrientation()).pipe(
  expand(v => getOrientationChange$(v)),
).subscribe(v => {
  console.log(v);
  document.querySelector('#results').innerHTML = v;
});
the magic being that I wanted the whole thing to go back into itself and couldn't figure out how to do it. I needed an initial value to seed it being the tricky part
connor-odoherty
@connor-odoherty

Hey all! I'm in the process of cleaning up a 4 year old codebase and saw that the auth flow uses an observer pattern not found anywhere else in the codebase:

  public login(credentials: LoginCredentials, loginSource = ""): Observable<any> {
    this.loginResponseMessage = null;
    if (...) {
      return observableThrowError("Please provide both a username and a password");
    } else {
      return Observable.create((observer) => {
        let urlSearchString = ".."
        this.http
          .post(this.serverInfo.getServerBaseURL() + "/users/api_sign_in", urlSearchString, { observe: "response" })
          .subscribe(
            (response: HttpResponse<any>) => this.signInHttpDataResponse(response, credentials, observer),
            (err) => {
              console.log(err);
              observer.error("Network error. <br><br>" + err.statusText);
              observer.complete();
            }
          );
      });
    }
  }

  private signInHttpDataResponse(response: HttpResponse<any>, credentials, observer) {
    const responseData = response.body;
    this.loginResponseMessage = responseData.message;
    if (responseData.success == true) {
      const auth_header = response.headers.get("Authorization");
      let jwt_token = null;
      if (auth_header) {
        const header_pair = auth_header.split(" ");
        if (header_pair.length == 2 && header_pair[0] == "Bearer") {
          jwt_token = header_pair[1];
          responseData.email = credentials.email;
          responseData.jwtToken = jwt_token;

          let currentUser = this.userStore.getUserNow();
          if (currentUser.suduAsUser != null) {
            currentUser.suduAsUser = new CurrentUser(responseData);
            this.userStore.setUserWithCurrentUser(currentUser);
          } else {
            this.userStore.setUserWithJSON(responseData);
          }
          observer.next(response);
        } else {
          observer.error({
            message: "The server failed to return a valid authorization token",
          });
        }
      } else {
        observer.error({
          message: "The server failed to return an authorization token",
        });
      }
    } else if (responseData.failureReason && responseData.failureReason == "TWO_FACTOR_REQUIRED") {
      // login failed because two factor is required
      // save the current user data for use by the two factor page and return
      responseData.email = credentials.email;
      // this will store the twoFactorTempId for us
      this.userStore.setUserWithJSON(responseData);
      observer.next(response);
    } else {
      observer.error(responseData);
    }

    observer.complete();
  }

Everywhere else, I've been using a pattern like this:

  public destroyExpenseDocument(expenseToUpdate: Expense, expenseDocument: ExpenseDocument): Observable<any> {
    return this.expenseAccessor.getExpense(expenseToUpdate.uuid).pipe(
      map((expense: Expense) => {
        expense.expense_documents = filter(expense.expense_documents, (doc: ExpenseDocument) => {
          return doc.uuid !== expenseDocument.uuid;
        });
        return expense;
      }),
      mergeMap((expenseHere) => {
        return this.expenseAccessor.setExpense(expenseHere);
      }),
      tap(() => {
        this.hydrateCachedExpenses();
      })
    );
  }

Is there any reason to use the Observer.next pattern instead of the pipe/mergeMap pattern?

I could see the Observer.next pattern making sense if we expected that observer to emit multiple times, but as far as I can tell the conditional tree ensures it will only emit once
Matt Erman
@CodeLiftSleep
So an observable created in a parent component is not subscribable in the child component?
Dorus
@Dorus
@CodeLiftSleep i'm not sure what you mean, rxjs has no concepts of (parent) components.
Matt Erman
@CodeLiftSleep
This doesn't work...header$ is always undefined---why?
//parent class
export class baseComponent implements OnInit {
  header$: Observable<headerData>;

   ngOnIt() {
      header$ = this.store.select(getHeaderData);
   }
}

//child class

export class headerComponent extends baseComponent imlpements OnInit {
   ngOnInit() {
      header$.subscribe(header => { <---- error 'cannot subscribe to undefined'
         console.log(header);
      });
   }
}
Dorus
@Dorus
@connor-odoherty the pattern above is observable.create(), the most advanced for of creating an observable i would usually only recommand if all else fail. Indeed since the code above is just subscribing to this.http.post() and then mapping the error callback and calling a function on next calls, it could have been
  public login(credentials: LoginCredentials, loginSource = ""): Observable<any> {
    this.loginResponseMessage = null;
    if (...) {
      return observableThrowError("Please provide both a username and a password");
    } else {
      return Observable.defer(() => {
        let urlSearchString = ".."
        return this.http
          .post(this.serverInfo.getServerBaseURL() + "/users/api_sign_in", urlSearchString, { observe: "response" })
      }).pipe(
        tap((response: HttpResponse<any>) => this.signInHttpDataResponse(response, credentials, observer),
            err) => console.log(err))
        catchError((err) => "Network error. <br><br>" + err.statusText)
      );
    }
  }
Matt Erman
@CodeLiftSleep
@Dorus how is an obervable declared in the parent class undefined in the child class?
Dorus
@Dorus
@CodeLiftSleep This is angular code, or rather OO code. If i understand this right you override the ngOnInit of the base class in the subclass.
Matt Erman
@CodeLiftSleep
so I would need to do something like super.header$?
Dorus
@Dorus
You would need to do something like super.ngOnInit() or something.
inside ngOnInit()
or at least that's my first guess.
Matt Erman
@CodeLiftSleep
wow...that was an easy fix
it worked
avechuche
@avechuche
Hi, I'm going crazy trying to create an observable with a state to be able to show a loading/error in the UI.
I created combineLatest and works perfectly, the pipe is only on "MyWebServiceWithPossibleError" to prevent the observable (combineLatest) from closing in case of error, the problem is that I don't know how to combine my observable with the web example, any help? thx
combineLatest([
    Observable_One,
    Observable_Two,
    Observable_Three,
  ]).pipe(
    switchMap(([R1, R2, R3]) =>
      this.MyWebServiceWithPossibleError(...)
        .pipe(catchError(() => { return EMPTY }))))    

Web example

export enum ObsevableStatus {
  SUCCESS = 'Success',
  ERROR = 'Error',
  LOADING = 'Loading',
}

export interface ObservableWithStatus<T> {
  status: string;
  data?: T;
  error?: Error;
}

export function observableWithStatus<T>(x: Observable<T>): Observable<ObservableWithStatus<T>> {
  return x.pipe(
    map(x => ({ status: ObsevableStatus.SUCCESS, data: x })),
    startWith({ status: ObsevableStatus.LOADING }),
    catchError(x => {
      return of({ status: ObsevableStatus.ERROR, error: x });
    })
  );
}
Derek
@derekkite
@avechuche you consume the data in two ways. One is the api call results, the second is some status. The second one needs to be sent status from the first. If the status is a subject, the api observable can tell it by calling status Subject.next(status). Before the switchMap tap(()=> statusSubject.next(loading)),
avechuche
@avechuche
@derekkite Perfect, I'm going to try this option. Thank you
Nicholas Cunningham
@ndcunningham
hey all i have a question if i have an array that contains [Ob, Ob, Ob] and i want to unwrap the ob inside the array and still keep the array
what op could i use?
mergeAll?