merge(sub)
Notice
merge([sub])
.subscribe(
(e) => console.log('subscribe' + e),
() => {},
() => console.log('complete'));
will emit
subscribe[object Object]
complete
the [object Object]
here is the behaviorsubject.
merge(sub1, sub2)
zip
, concat
, merge
, etc , is treated like an array of arguments.
getLastCommits(): Observable<PRData[]> {
return this.bitbucketService
.getLatestCommits()
.pipe(
concatMap((listOfCommits: any) => listOfCommits.values),
map((commit: any) => {
const hash = commit.hash;
return this.bitbucketService.getPullRequestIDfor(hash)
.pipe(
map(id => ({
commitHash: hash,
pullRequestID: id
} as PRData))
);
})
);
}
I'm trying to do this convoluted thing....
Bitbucket service gets my last 10 commits in an object with an array called values containing all my commits
I'm trying to use concatMap to peel away that object into an individual object for every value
/commit in that object
From here I want to take the has for every commit....
reach out and get the pull request data for that hash
and somehow wrap and return both the hash and the pull request ID returned from that new subscription call
In the end I want an array of { commitHash, pullRequestID }, { commitHash, pullRequestID }, { commitHash, pullRequestID }
But right now with this I seem to be getting on single { commitHash, pullRequestID }
Ignoring the fact that at this point I don't really know what I'm doing, what am I doing wrong?
getLastCommits(): Observable<PRData[]> {
return this.bitbucketService
.getLatestCommits()
.pipe(
map((listOfCommits: any) => listOfCommits.values),
tap((whatever: any) => console.log(whatever)),
map((listOfCommits: any) => {
return listOfCommits.map((commit: any) => {
const hash = commit.hash;
return this.bitbucketService.getPullRequestIDfor(hash)
.pipe(
map(id => ({
commitHash: hash,
pullRequestID: id
} as PRData))
);
});
}),
tap((whatever: any) => console.log(whatever)),
);
}
I've gotten it down to this...source.mergeMap(e => httpCall(e))
map((listOfCommits: any) => listOfCommits.values),
At this point I have an array,
using
mergeMap((listOfCommits: any) => {
return listOfCommits.map((commit: any) => {
here instead of map just tells me that mergeMap is deprecated and that I should use an inner map instead... I'm not sure what that means or if this is where I was supposed to mergeMap to begin with
map<T, T2>(e => httpcall(e))
you're going from Observable<T> -> Observable<Observable<T2>> right?mergeMap
?
mergeMap(selector, resultSelctor)
in favor of mergeMap(selector)
https://rxjs-dev.firebaseapp.com/guide/v6/migration#howto-result-selector-migration
mergeMap
on an array will flatten the array. That on itself could be useful, but not the primary purpose of mergeMap
. the selector should return an observable that is then merged together with other observables emitted from other runs of the selector.
so currently with
getLastCommits(): Observable<PRData[]> {
return this.bitbucketService
.getLatestCommits()
.pipe(
map((listOfCommits: any) => listOfCommits.values),
tap((whatever: any) => console.log(whatever)),
concatMap((listOfCommits: any) => {
return listOfCommits.map((commit: any) => {
const hash = commit.hash;
return this.bitbucketService.getPullRequestIDfor(hash)
.pipe(
map(id => ({
commitHash: hash,
pullRequestID: id
} as PRData))
);
});
}),
tap((whatever: any) => console.log(whatever)),
);
}
when I call
ngOnInit(): void {
this.getLastCommits()
.subscribe((commitData: PRData[]) => {
this.commitData = commitData;
console.log(commitData);
});
}
commitData is an observable... subscribing still nets me a single observable?
like commitData is still a single observable
listOfCommits.map
, what i think will be an array of observable
getLastCommits(): Observable<PRData[]> {
return this.bitbucketService
.getLatestCommits()
.pipe(
map((listOfCommits: any) => listOfCommits.values),
tap((whatever: any) => console.log(whatever)),
mergeMap((listOfCommits: any) => {
return forkJoin(...(listOfCommits.map(commit =>
getPRId(commit.hash)
)))
}),
tap((whatever: any) => console.log(whatever)),
);
}
getPRId(hash) {
return this.bitbucketService.getPullRequestIDfor(hash).pipe(
map(id => ({
commitHash: hash,
pullRequestID: id
} as PRData))
}
export class ProfileService {
private publisher: Subject<Account>;
public profile: Observable<Account>;
constructor(private httpClient: HttpClient) {
this.publisher = new Subject<Account>();
this.profile = this.publisher.pipe(
shareReplay(1)
);
this.refreshProfile();
}
refreshProfile(): void {
this.httpClient.get<Account>('/profile')
.pipe(
map<any, Account>(Account.deserialize)
).subscribe(account => this.publisher.next(account), err => this.publisher.error(err));
}
}
tap
, or have an open websocket that does so.
Hey guys,
I'm a little lost right now
I want to build a fairly simple queue.
The queue should only execute one job at a time.
When it is empty it should run a scheduling function (on a x second interval) which checks, if there are new jobs
I was thinking about doing something like this, but it feels a little wrong. Can anyone help me out?
const { of, interval, empty } = require('rxjs')
const { tap, concatMap, skipWhile } = require('rxjs/operators')
const state = {
isIdle: true,
pendingJobs: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] // mocked jobs
}
const schedule = () => {
console.log('scheduling')
return !!state.isIdle && state.pendingJobs.length
? of(state.pendingJobs[0])
: empty()
}
const executeJob = async (i) => {
await new Promise(resolve => setTimeout(resolve, 1500))
return i
}
interval(500).pipe(
skipWhile(_ => !state.isIdle),
concatMap(schedule),
tap(() => { state.isIdle = false }),
concatMap(executeJob),
tap((i) => {
state.pendingJobs = state.pendingJobs.slice(1)
state.isIdle = true
console.log('done:', i)
})
).subscribe()
interval(500).pipe(
exhaustMap(_ => defer(() => scheduleAndExecute())), // return empty() when no new jobs are found and of(...) when a job was finished
mergeAll(),
tap(i => { console.log(i) })
).subscribe()
maybe something like this?