by

Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • 02:46
    yokots commented #5180
  • Jun 03 23:50
    cartant synchronize #5465
  • Jun 03 23:25
    cartant synchronize #5465
  • Jun 03 21:15
    leggechr synchronize #5403
  • Jun 03 21:11

    niklas-wortmann on 6.x

    docs(index): support blm moveme… (compare)

  • Jun 03 20:58
    benlesh commented #5080
  • Jun 03 20:50
    benlesh assigned #5391
  • Jun 03 20:50
    benlesh unlabeled #5391
  • Jun 03 20:50
    benlesh unlabeled #5391
  • Jun 03 20:48
    benlesh commented #5431
  • Jun 03 20:48
    benlesh unlabeled #5431
  • Jun 03 09:58
    kwonoj closed #5466
  • Jun 03 09:58
    kwonoj commented #5466
  • Jun 03 09:44
    cartant commented #5466
  • Jun 03 09:05
    Carnageous commented #5466
  • Jun 03 09:03
    Carnageous opened #5466
  • Jun 02 08:41
    timdeschryver commented #5380
  • Jun 01 23:14
    cartant labeled #5465
  • Jun 01 23:04
    cartant review_requested #5465
  • Jun 01 23:03
    cartant synchronize #5465
Eliot Lash
@fadookie
I guess I could map a subsection of events into smaller observables that complete using takeWhile or somesuch but right now I don't have a central observable managing all the other ones
Gatej Andrei
@Andrei0872
hey @stupidawesome, is there any reason you use materialize() in repeat()(materialize()(operator(this._source))) ? AFAIK, materialize will turn everything into a Notification instance, which is passed along as a nexted value, which makes me think that adding repeat() after it is redundant, as repeat reacts on complete notifications
n9WDgrUEZfUo7r
@n9WDgrUEZfUo7r
@Andrei0872 Thanks. That's an elegant solution. The interval count doesn't reset, nice! Didn't even consider race and ignoreElements for my solution. So many operators to choose from. I'm like a child lost with endless legos :).
stupidawesome
@stupidawesome
@Andrei0872 without repeat the subject will be closed on error/complete.
SAGO
@SAGOlab

Hi there, I'm trying to learn rxjs.
I have 2 http get request, one depend on other:

public getMarkerDBNew(): Observable<any> {
    return this.http.get(this.xmysql + '/v_BFP_fornitori_loc?_size=100', httpOptions);
  }

  public getFornitori(fornitoreId: string): Observable<any> {
    return this.http.get(this.xmysql + '/BFP_listino?_where=((id_negozio,eq,1)~or(id_negozio,eq,nd))~and(modalita_prodotto,ne,Sfuso)~and(id_fornitore,eq,' + fornitoreId + ')')
  }

I'm trying to get getMarkerDBNew() that return me a array of objects, get all id from each one of this object and do the second get request with that id for parameter, my goal is to have both subscription, the first and then the second.

this is my attempt, but I'm missing something.

this.getMarkerDBNew()
      .pipe(
        switchMap(plans => {
          const t = _.forOwn(_.map(plans, 'ID'), id => {
            return this.getFornitori(id)
          })
          return forkJoin(t)
        }),
        tap(plans => console.log(plans)),
      ).subscribe(t => console.log(t))

where I'm failing?

Dorus
@Dorus
@SAGOlab if you want to retain the original value, you can put map after forkJoin to put the plan back in.
forkJoin(t).pipe(map(arr => ({plans, arr})))
something like that
SAGO
@SAGOlab
@Dorus ..thanks I'll give it a shot
Dorus
@Dorus
Anyway that is an interesting way to use forkJoin, but it should work. I usually do src.pipe(mergeMap(e => from(e).pipe(mergeMap(el => ...))))
But i kinda like yours :D
Your retains order
and still does all calls in paralelle
Mine is more useful when you do not need to join the results back together
SAGO
@SAGOlab
ok it works, but not return me both get request
return two array, first with indexes, second with all object of first get response
My goal is to retrive both get http request
Dorus
@Dorus
@SAGOlab you mean emit both results seperate?
Try return merge(of(plans), forkJoin(t))
SAGO
@SAGOlab
alt
this is the response
yes return me only first get
but not the second with product relative to each id field of first get request
this.getFornitori(id)
public getFornitori(fornitoreId: string): Observable<any> {
    return this.http.get(this.xmysql + '/BFP_listino?_where=((id_negozio,eq,1)~or(id_negozio,eq,nd))~and(modalita_prodotto,ne,Sfuso)~and(id_fornitore,eq,' + fornitoreId + ')')
  }
Dorus
@Dorus
@SAGOlab i'm not sure i understand all the lodash, but can you describe what output you expect?
SAGO
@SAGOlab

the first get this.getMarkerDBNew() return me an array of objects

0: {ID: "1", NAME: "Agribosco srl", LATITUDE: 43.3313496, LONGITUDE: 12.7422841, WEBSITE: "", …}
1: {ID: "2", NAME: "Alberti Guido - Poggio Aquilone az.agr.", LATITUDE: 42.869337, LONGITUDE: 12.269859, WEBSITE: "", …}
2: {ID: "3", NAME: "Oro di Spello - Anfatis Centro spa ", LATITUDE: 42.96646, LONGITUDE: 12.6820451, WEBSITE: "", …}
3: {ID: "4", NAME: "Bacci Noemio Az. Agraria", LATITUDE: 42.9094087, LONGITUDE: 12.5558159, WEBSITE: "", …}
4: {ID: "5", NAME: "Betti Franco Az. Agr.", LATITUDE: 43.0711952, LONGITUDE: 12.6146669, WEBSITE: "", …}
5: {ID: "6", NAME: "Bettini Bio- Agrisviluppo Todiano srl", LATITUDE: 42.9561825, LONGITUDE: 12.703334, WEBSITE: "", …}
6: {ID: "7", NAME: "Blife srls - La Boutique del Biologico", LATITUDE: 42.43855, LONGITUDE: 12.0942648, WEBSITE: "", …}
7: {ID: "8", NAME: "Bioitalia distribuzione srl", LATITUDE: 40.419441649999996, LONGITUDE: 15.310756230322482, WEBSITE: "", …}
8: {ID: "9", NAME: "Bruno Renato Az. Agr.", LATITUDE: 43.432655, LONGITUDE: 12.857263, WEBSITE: "", …}
9: {ID: "10", NAME: "Casale dei pozzi az. agr. di Lupi Paola", LATITUDE: 42.0255444, LONGITUDE: 12.0260493, WEBSITE: "", …}
10: {ID: "11", NAME: "Colforcella- Rossi Rita az.agr.", LATITUDE: 42.7170082, LONGITUDE: 13.0135022, WEBSITE: "", …}

with lodash I can retrive an array with all the ID key value of this objects with _.map(allId, 'id'), the result is:

["0","1","2","3" ...]
Dorus
@Dorus
So that's the lodash way to do arr.map(e => e.id)?
SAGO
@SAGOlab
yes
Dorus
@Dorus

This is how you normally get all sub items.

getMarkerDBNew().pipe(
  mergeAll(),
  map(el => el.id),
  mergeMap(id => getFornitori(id))
)

If you want to retain the original:

getMarkerDBNew().pipe(
  mergeAll(),
  mergeMap(el => getFornitori(el.id).pipe(
    map(r => ... )// combine el and r here.
  ))
)

If you want to keep the original items together:

getMarkerDBNew().pipe(
  mergeAll(),
  mergeMap(el => getFornitori(el.id).pipe(
    toArray(),
    map(ar => ... )// combine el and ar here.
  ))
)

If you want to keep the original order:

getMarkerDBNew().pipe(
  mergeMap(ar => forkJoin(ar.map(el => getFornitori(el.id).pipe(
    map(r => ... )// combine el and r here.
  ))
)
SAGO
@SAGOlab
WOW! faster then postgres
Dorus
@Dorus
What output do you want/expect?
SAGO
@SAGOlab
I expect output with both response
SAGO
@SAGOlab
@Dorus thanks for your help, now all work like a charm
n9WDgrUEZfUo7r
@n9WDgrUEZfUo7r
is there a way to make the inner observable complete first when the source emits an error? https://stackblitz.com/edit/rxjs-zr5qqb
Jorrit
@jorrit-wehelp
catchError on the source :/
n9WDgrUEZfUo7r
@n9WDgrUEZfUo7r
This message was deleted
n9WDgrUEZfUo7r
@n9WDgrUEZfUo7r
nvm, can't believe it was this easy. I should get some sleep.
thanks @jorrit-wehelp
cedvdb
@cedvdb
If i have an observable of array, how can I continue the pipe in sequence ?
source$: Observable<Array<string>>
tried like this:
    return rpc.getPendingHashes(address).pipe(
      map(hashes => hashes.map(hash => this.receiveBlock(wlt, hash))),
      switchMap(obsArray => concat(obsArray))
    );
Gatej Andrei
@Andrei0872
@cedvdb do you mean something like this?
src$.pipe(
 map(arr => arr.map(v => toAnObs(v))),
 concatAll(), // Send each element individually
 concatAll(), // Apply `concatMap` to each observable
)
cedvdb
@cedvdb
yes
Derek
@derekkite
you can also
src$.pipe(
    mergeMap(arr => from(arr).pipe(
          map(item => modifyand return item)
   ),
   toArray()
)
Shobhit Gupta
@shobhitg

Hello, I am trying to achieve a stream that is slow in the beginning but at full speed later on. The important thing to note is that my source stream is hot and it has to be that way, I can't do anything about its temperature.

Here is a playground I made of what I am trying to achieve:
https://stackblitz.com/edit/rxjs-sxpy17?devtoolsheight=60

But in my output I see 1st 10 events, and then followed by events starting from event number 450 or 460 something. Its as if some of the events got skipped because I couldn't process them.

What I am trying to make work is that 1st 10 event should come slowly at 2 events/second, and after that the remaining events should come at source's speed.

Here is a gif I recorded to show what I am seeing currently: https://imgur.com/FOm0fEP
Any pointers would be appreciated. I am not very well versed in dealing with hot streams. This should have worked in theory if the stream was cold. Not sure how to deal with the situation.
Derek
@derekkite
const source$ = timer(0,10).pipe(
  take(500),
  map(x => `Event ${x}!`),
  share() // note, this observable is hot.
);
const [first10$, rest$] = partition(source, (value, index) =>index < 11);

first10$.pipe(
   mergeMap(v => process).pipe(delay(500));
)

merge(first10$, rest$).pipe(
or forkJoin.
@shobhitg