Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • 17:36
    CaffeinatedCodeMonkey edited #6978
  • 16:13
    CaffeinatedCodeMonkey edited #6978
  • 16:13
    CaffeinatedCodeMonkey opened #6978
  • 12:31
    roger6106 commented #3665
  • 12:30
    roger6106 opened #6977
  • 08:11
    ducthang-vu commented #3665
  • 06:58
    tmair edited #6952
  • 06:54
    tmair synchronize #6952
  • May 23 09:37
    sanmaopep edited #6976
  • May 23 09:33
    sanmaopep opened #6976
  • May 23 06:29
    ronag commented #6675
  • May 20 16:11
    driskell commented #6827
  • May 20 16:11
    driskell commented #6827
  • May 18 14:48
    martinsik commented #6500
  • May 11 20:01
    patrickkunka commented #6942
  • May 11 19:12
    jonapgartwohat commented #6654
  • May 11 01:53
    ctfdavis opened #6970
  • May 10 13:11
    PowerKiKi synchronize #6969
  • May 10 13:01
    PowerKiKi edited #6969
  • May 10 13:00
    PowerKiKi opened #6969
dennbagas
@dennbagas
Hello, all
Reactist215
@Reactist215
Hello @dennbagas
Hope you are doing great
dennbagas
@dennbagas
I have a question about how to make parallel http call using RXJS, im using NestJS
I posted the question on stackoverflow: https://stackoverflow.com/questions/64496237/rxjs-parallel-http-call-to-paginated-api-using-nestjs-httpservice
It have bountied for 50 reputation point, i got stuck on this batching task
Kate
@tabasca
hi. am I doing it wrong or why my merge Observable completes? https://stackblitz.com/edit/jwoztx?devtoolsheight=33&file=index.ts
docs say that "The output Observable only completes once all input Observables have completed" - so why does it complete?
Dorus
@Dorus
@tabasca Because the array has a finit lenght of 1 item?
If you meant to emit the stream from the behaviorsubject instead of the behaviorsubject itself, you should do merge(sub)

Notice

merge([sub])
.subscribe(
  (e) => console.log('subscribe' + e),
  () => {}, 
  () => console.log('complete'));

will emit

subscribe[object Object]
complete

the [object Object] here is the behaviorsubject.

Kate
@tabasca
@Dorus i don't understand - i need to merge two subjects, in my example there was only one, yep, but with two of them it is all the same - merge completes - but both subjects don't, so why merge completes?
Dorus
@Dorus
Because you are emitting the array
to merge 2 subjects, do merge(sub1, sub2)
Kate
@tabasca
@Dorus wow. works :) thank you <3
Ben Lesh
@BenLesh_twitter
@tabasca FWIW: in rxjs@next (7.0.0) what you had above would totally have worked.
We're trying to standardize the behavior that a single array passed to zip, concat, merge, etc , is treated like an array of arguments.
There's a lot of work we're trying to do to promote consistency so these quirks don't confuse people.
PikaChokeMe
@PikaChokeMe
getLastCommits(): Observable<PRData[]> {
    return this.bitbucketService
      .getLatestCommits()
      .pipe(
        concatMap((listOfCommits: any) => listOfCommits.values),
        map((commit: any) => {
          const hash = commit.hash;
          return this.bitbucketService.getPullRequestIDfor(hash)
            .pipe(
              map(id => ({
                commitHash: hash,
                pullRequestID: id
              } as PRData))
            );
        })
      );
  }

I'm trying to do this convoluted thing....
Bitbucket service gets my last 10 commits in an object with an array called values containing all my commits
I'm trying to use concatMap to peel away that object into an individual object for every value/commit in that object

From here I want to take the has for every commit....
reach out and get the pull request data for that hash

and somehow wrap and return both the hash and the pull request ID returned from that new subscription call

In the end I want an array of { commitHash, pullRequestID }, { commitHash, pullRequestID }, { commitHash, pullRequestID }
But right now with this I seem to be getting on single { commitHash, pullRequestID }

Ignoring the fact that at this point I don't really know what I'm doing, what am I doing wrong?

Derek
@derekkite
The map is returning an observable. Should be mergemap or concatmap
PikaChokeMe
@PikaChokeMe
getLastCommits(): Observable<PRData[]> {
    return this.bitbucketService
      .getLatestCommits()
      .pipe(
        map((listOfCommits: any) => listOfCommits.values),
        tap((whatever: any) => console.log(whatever)),
        map((listOfCommits: any) => {
          return listOfCommits.map((commit: any) => {
            const hash = commit.hash;
            return this.bitbucketService.getPullRequestIDfor(hash)
              .pipe(
                map(id => ({
                          commitHash: hash,
                          pullRequestID: id
                        } as PRData))
              );
          });
        }),
        tap((whatever: any) => console.log(whatever)),
      );
  }
I've gotten it down to this...
where I have an array of observables.... (possibly bad because it's nested)
Is there a way to unwrap all those inner subscriptions and accumulate all this data? when I subscribe?
Was my other way better?
Dorus
@Dorus
@TheeRFG you should look at the mergeMap/concatMap operators to flatten the observable of observables.
a very common patter is source.mergeMap(e => httpCall(e))
PikaChokeMe
@PikaChokeMe
I feel like I'm confused about where to actually use that operator
map((listOfCommits: any) => listOfCommits.values),

At this point I have an array,
using

mergeMap((listOfCommits: any) => {
          return listOfCommits.map((commit: any) => {

here instead of map just tells me that mergeMap is deprecated and that I should use an inner map instead... I'm not sure what that means or if this is where I was supposed to mergeMap to begin with

Dorus
@Dorus
when you call map<T, T2>(e => httpcall(e)) you're going from Observable<T> -> Observable<Observable<T2>> right?
mergeMap will flatten this (as will concatMap, exhaustMap, switchMap, but in different concurrent patterns). Thus your Observable<Observable<T2>> back to <Observable<T2>.
Just like flatten on an array T[][] will give T[].
@TheeRFG are you using one of the deprecated overloads of mergeMap?
PikaChokeMe
@PikaChokeMe
I think I actually had a misplaced parenthesis which was giving me that issue
Dorus
@Dorus
They deprecated mergeMap(selector, resultSelctor) in favor of mergeMap(selector) https://rxjs-dev.firebaseapp.com/guide/v6/migration#howto-result-selector-migration
@TheeRFG Also using mergeMap on an array will flatten the array. That on itself could be useful, but not the primary purpose of mergeMap. the selector should return an observable that is then merged together with other observables emitted from other runs of the selector.
PikaChokeMe
@PikaChokeMe

so currently with

getLastCommits(): Observable<PRData[]> {
    return this.bitbucketService
      .getLatestCommits()
      .pipe(
        map((listOfCommits: any) => listOfCommits.values),
        tap((whatever: any) => console.log(whatever)),
        concatMap((listOfCommits: any) => {
          return listOfCommits.map((commit: any) => {
            const hash = commit.hash;
            return this.bitbucketService.getPullRequestIDfor(hash)
              .pipe(
                map(id => ({
                          commitHash: hash,
                          pullRequestID: id
                        } as PRData))
              );
          });
        }),
        tap((whatever: any) => console.log(whatever)),
      );
  }

when I call

  ngOnInit(): void {
    this.getLastCommits()
      .subscribe((commitData: PRData[]) => {
        this.commitData = commitData;
        console.log(commitData);
      });
  }

commitData is an observable... subscribing still nets me a single observable?
like commitData is still a single observable

Dorus
@Dorus
try from(listOfCommits) and use mergeMap on the result.
Right now concatmap works on the result of listOfCommits.map, what i think will be an array of observable
PikaChokeMe
@PikaChokeMe
in my previous map(listOfCommits: any) example?
Dorus
@Dorus
In this case i think you want to preserve order? Might make more sense to use forkJoin instead.
getLastCommits(): Observable<PRData[]> {
    return this.bitbucketService
      .getLatestCommits()
      .pipe(
        map((listOfCommits: any) => listOfCommits.values),
        tap((whatever: any) => console.log(whatever)),
        mergeMap((listOfCommits: any) => {
          return forkJoin(...(listOfCommits.map(commit =>
            getPRId(commit.hash)
          )))
        }),
        tap((whatever: any) => console.log(whatever)),
      );
  }

getPRId(hash) {
  return this.bitbucketService.getPullRequestIDfor(hash).pipe(
    map(id => ({
        commitHash: hash,
        pullRequestID: id
      } as PRData))
}
but i'm not sure i understand your types here
PikaChokeMe
@PikaChokeMe
I'll give that a try and if it doesn't work I'll try to better explain my types
I've to submit the project and this seems like a hurdle
Derek
@derekkite
@mkamranhamid how are you calling loadmore? you are passing an observable to a function. check your function parameters
typogod
@typogod
I've made a library that has a very similar API as RxJS but works on pulling instead of pushing. Anyone wanna help me think of a cute name for it?
Tyler Eastman
@teastman
I'm new to rxjs and creating an angular service to cache the current users profile, profile updates can be sent via websocket or periodic refresh. Observables seemed to be a really clean way to allow components to bind the user's profile and send along updates whenever they come in. Searching around I had a hard time finding many examples of this specific pattern, are there any pitfalls I'm missing here?
export class ProfileService {

  private publisher: Subject<Account>;
  public profile: Observable<Account>;

  constructor(private httpClient: HttpClient) {
    this.publisher = new Subject<Account>();
    this.profile = this.publisher.pipe(
      shareReplay(1)
    );
    this.refreshProfile();
  }

  refreshProfile(): void {
    this.httpClient.get<Account>('/profile')
      .pipe(
        map<any, Account>(Account.deserialize)
      ).subscribe(account => this.publisher.next(account), err => this.publisher.error(err));
  }
}
Dorus
@Dorus
@teastman yes, i would recommand the function to return the observable and not subscribe. Right now any error will kill the subject.
if you want to add a store, add it seperate, and just next to a behaviorSubject either from the refresh function with a tap, or have an open websocket that does so.
Calling methods can either subscribe to the behaviorSubject to retrieve the current value, or subscribe to the refresh function to get the latest one, auto updating any other listener.
Tyler Eastman
@teastman
Thank you very much @Dorus that make a lot of sense, appreciate you taking the time.
Naiwei Zheng
@naiweizheng
Is there an easy debugging technique to locate the initiator of a completion/unsubscribe on an observable chain? Going through the call stack is very painful.
Derek
@derekkite

@naiweizheng you can use something like this

tap(
   v => console.log('value', v),
   err => console.log('error', err),
   () => console.log('complete')
)

insert it in the chain and it you can figure out where it is erroring out or completing

lake-toya
@lake-toya

Hey guys,
I'm a little lost right now
I want to build a fairly simple queue.
The queue should only execute one job at a time.
When it is empty it should run a scheduling function (on a x second interval) which checks, if there are new jobs

I was thinking about doing something like this, but it feels a little wrong. Can anyone help me out?

const { of, interval, empty } = require('rxjs')
const { tap, concatMap, skipWhile } = require('rxjs/operators')

const state = {
  isIdle: true,
  pendingJobs: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] // mocked jobs
}

const schedule = () => {
  console.log('scheduling')
  return !!state.isIdle && state.pendingJobs.length
    ? of(state.pendingJobs[0])
    : empty()
}

const executeJob = async (i) => {
  await new Promise(resolve => setTimeout(resolve, 1500))
  return i
}

interval(500).pipe(
  skipWhile(_ => !state.isIdle),
  concatMap(schedule),
  tap(() => { state.isIdle = false }),
  concatMap(executeJob),
  tap((i) => {
    state.pendingJobs = state.pendingJobs.slice(1)
    state.isIdle = true
    console.log('done:', i)
  })
).subscribe()
lake-toya
@lake-toya
Or should I just combine the scheduling and executing into one function and use exhaustMap?
Jorrit
@jorrit-wehelp
sounds to me as pendingJobs.pipe(mergeMap(job => execute(job), MAX_CONCURRENCY)), where pendingJobs does the polling inside and just emits job (or jobs array)
or just concatMap if it's always 1
lake-toya
@lake-toya
Wouldnt pendingJobs keep emitting values even while the execution is still running?
the jobs will be pulled one at a time from a database so I only want to get new jobs when the queue is idling