Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • May 23 09:37
    sanmaopep edited #6976
  • May 23 09:33
    sanmaopep opened #6976
  • May 23 06:29
    ronag commented #6675
  • May 20 16:11
    driskell commented #6827
  • May 20 16:11
    driskell commented #6827
  • May 18 14:48
    martinsik commented #6500
  • May 11 20:01
    patrickkunka commented #6942
  • May 11 19:12
    jonapgartwohat commented #6654
  • May 11 01:53
    ctfdavis opened #6970
  • May 10 13:11
    PowerKiKi synchronize #6969
  • May 10 13:01
    PowerKiKi edited #6969
  • May 10 13:00
    PowerKiKi opened #6969
  • May 10 12:51
    PowerKiKi commented #6367
  • May 10 05:13
    imcotton opened #6968
  • May 09 08:54
    nanditsaini commented #5671
  • May 06 11:29
    voliva commented #6963
  • May 06 11:19
    diginikkari commented #6851
  • May 05 22:29
    jsoref opened #6966
  • May 05 02:06
    MaximSagan commented #5671
  • May 04 21:47
    chrisguttandin synchronize #6955
Ghet
@ghetolay
@pbadenski skipWhile(v => v === undefined)
Kay Khan
@kaykhancheckpoint
Hi friends im using rxjs and i am getting quite significant event loop lag (20k ms). how do i debug this issue?
1 reply
Razak Wasiu
@madiba2:matrix.org
[m]
hi
jarkkoskyttala
@jarkkoskyttala

Hi, I have this problem when using share(). The problem is that when I subscribe multiple times only the first subscribtion is going through as expected. The previous subscriptions trigger with a previous value from interval$. If I don't call share() on trigger$ then behavior is as expected and all triggers have correct values. Obviously in this case it doesn't matter if trigger$ is multicasted but in application code it is necessary.

This is a piece of code extracted and simplified from our application.

let count = 0;

const interval$ = interval(100).pipe(
  map(() => count++),
  take(10),
  share()
);

const trigger$ = interval$.pipe(
  filter((val) => val === 5),
  map((val) => {
    return `trigger_${val}`;
  }),
  share()
);

const test$ = trigger$.pipe(withLatestFrom(interval$)));

test$.subscribe((val) => console.log('test$', val));
test$.subscribe((val) => console.log('test$', val));

/**
 * console:
 * test$ [ 'trigger_5', 5 ]
 * test$ [ 'trigger_5', 4 ] <--- This should be identical to above
 */
14 replies
lake-toya
@lake-toya

Hi! I have a quick questions. How do I use bufferWhen with distinctUntilChanged?
I have an array that looks a little like this

const myArray = [
  { key: 'A', something: 'else' },
  { key: 'A', something: 'else' },
  { key: 'B', something: 'else' },
  { key: 'B', something: 'else' },
  { key: 'B', something: 'else' },
  { key: 'C', something: 'else' }
]

What I want to do is to buffer the items until the key property changes so that the items are grouped like this in the end

[
  { key: 'A', something: 'else' },
  { key: 'A', something: 'else' }
],
[
  { key: 'B', something: 'else' },
  { key: 'B', something: 'else' },
  { key: 'B', something: 'else' },
],
[
  { key: 'C', something: 'else' }
]

I tried doing something like this, but it doenst work

from(myArray).pipe(
  bufferWhen(distinctUntilChanged((curr, prev) => curr.key !== prev.key))
)

Can anyone point me into the right direction?

10 replies
Dmitry Pishchalka
@dpischalka
Hi people, is it alive? :)
danilaapp
@danilaapp
yep!)
Dmitry Pishchalka
@dpischalka
Great! :-D.
Who can tell me, why is it because of withLatestFrom subjects subscribers can't get the value? or withLatestFrom can't stop the pipeline at all?
Dorus
@Dorus
@dpischalka That question made no sense at all
Bogomip
@Bogomip

So I am creating an app with several layers of asynchronisity, and working in Angular with RxJS for the majority of my async needs. I am relatively new to RxJS and this is by far my biggest use of it so far so learning on the go.

Below are three functions which are meant to invoke a duplication on a report object, then return the id of the newly created document from firebase. Getting the ID and actually duplicating the data is working fine, but passing it back is where I am struggling.

So it should work like this:

  • I click on a button and duplicateReport() runs.
  • This calls duplicateReportService() which will get the old report, update it with a new name, and then:
  • Send it to addNewReport() which will add it as a new report.

This works so far! But then I want to return the ID of the newly created database document back through the line to the very first duplicateReport() function, but im struggling. Currently the original report is being passed back :)

Any help would be gratefully received, thanks!

let reportId: string = "pies";
let reports: ReportTemplate[] = [];
/**
 * This is the function that is called directly froma  pushed button...
 */
duplicateReport(): void {
    // invoke the duplicate report function
    this.duplicateReportService(this.reportId).subscribe((newReport: any) => {
        // this is outputting the report I want duplicating, not the ID of the new report.
        console.log(newReport);
    })
}
/**
 * This is the service function that might be called from multiple places and deals with
 * storage of the reports (new and old)
 */
duplicateReportService(id: string): Observable<any> {
    return this.getReport(id).pipe(take(1), tap({
        next: (report: ReportTemplate) => {
            // create a duplicate report
            let newReport: ReportTemplate = { ...report };
            newReport.name = 'Duplicate of ' + report.name;
            newReport.id = "";

            // and add to the db...
            return addNewReport(newReport).pipe(take(1)).subscribe(({
                next: (res: DocumentReference<string>) => {
                    newReport.id = res.id;
                    // and add to the common reports list.
                    this.reports.push(newReport);
                    // THIS IS A STRING, AND IS WHAT I WANT RETURNED TO duplicateReport()
                    return res.id;  
                }, 
                error: (error) => {
                    console.log("Could not duplicate report because of: " + error);
                }
            }));

    },  error: (error) => {
        console.log("Did not duplicate: " + error);
    }}))
}
/**
 * This is the function that deals directly with firebase...
 */
addNewReport(data: ReportTemplate): Observable<any> {
    return from(this.firebase.collection('reports').add(data));
}
2 replies
George Svarovsky
@gsvarovsky

Hi, I’ve created a library/pattern for lossless back-pressure in RxJS: https://www.npmjs.com/package/rx-flowable

I pulled this out of my main project, m-ld, because a) it would be great to get feedback on the approach from the RxJS community, and b) it might be useful to you.

Thanks for looking & happy to discuss!

Sebastian Ferreyra
@saabi

Hey everybody. I'm wondering if it's possible to combine two or more observables such that any changes to any of the observables only cause one invocation of the observer. In the following code c$ will be invoked twice but I would like it to be invoked only once only after both a$ and/or b$ are updated.

const a$ = new Subject
const b$ = new Subject

const c$ = combineLatest(a$, b$).pipe(map(([a,b]) => a + b))

a$.next(1) // c$ is invoked here
b$.next(1) // and here
// Ideally c$ should be invoked only here and only once after one or more updates on its dependencies.
// In other words, I want to be able to update subjects atomically and only start down chain computations when the 'transaction' is done

I imagine a Scheduler might do the job but I'm fairly new to RxJS so I will appreciate any help!

17 replies
Alexey Tirman
@Stexxe
Hey. Is there a way to create an observable that releases some resources when a subscription is complete? I'm trying to wrap the readline from NodeJS in an observable but I don't know how to close an interface when a subscription is complete, e.g. when take(1) is used.
4 replies
j1m-renwick
@j1m-renwick
Hi, I'm trying to find an example of how to consume a streaming API endpoint using RxJS - the only examples I can find involve creating an observable that makes calls to non-streaming APIs, and my attempts at tweaking those examples have failed - can someone give me a pointer?
1 reply
Nicholas Cunningham
@ndcunningham
hey all!
it's been awhile

lets say i have an observable
const aOrb$ = combineLatest(a$, b$).pipe(map(([a, b]) => a || b))

This wont work unless both a and b emit atleast once right ?
Is there anyway to force this ? maybe startWith

4 replies
Kasir Barati
@kasir-barati
Hey all
I am newbie in RxJS. please give me some advise to learn with less struggling with dummy things
2 replies
László Matthias Gréczi
@greczi_gitlab
hi,

what should i use here?

obs1.pipe(
....((value1) => {
return myService1(value1);
}),
....((val1) => {
return myService2(value1);
})
).subscribe((valueFromService1, valueFromService2) => {
console.log(valueFromService1);
console.log(valueFromService2);
})

Could you help me, please?

2 replies
Adam Powers
@apowers313:matrix.org
[m]
is there a tutorial that shows how to use rxjs pipes and different operators with typescript?
1 reply
Fran
@Paker30
Hi guys, it's the first time I speak here but... I'd appreciate any help :) I need to create a flow with two observables (A, B), but I need to ignore all the events from B until A emits a value, thus if A emits a new value, I want to take the next value from B.
Do you guys have any clue on how to do it? Thanks a lot :)
4 replies
manavortex
@manavortex

Hi... I'm losing my mind over this observable chain, I hoped y'all could help me out.

I have a HTTP request that'll return me a list, and I want a cached list. So far, so good - I can do that with an interval - but I also want to be able to purge the cached list immediately and then basically trigger the next timer, and I can't get it to work >.<

1 reply
    const data = this.getDataOnce();

    const update$ = this.updates$.pipe(
      mergeMap(() => this.getDataOnce()),
    );

    this.data$ = merge(data , update$);
Now, how do I get the timer into that? :sweat_smile:
manavortex
@manavortex
And how do I make sure that all subscriptions from external components get the same observable chain?
Daniel Karp
@karptonite
I’ve got an rxjs problem that seems easy to describe and difficult (for me) to construct. I have an unknown number of tasks that will start and finish at random times. It is a bit more complicated than this, but the core is: I want to create an observable that emits after the last task finishes, and there are no pending tasks that have started and not yet finished.
I’m trying to work with a Subject of Subjects, where each inner subject corresponds to one of the tasks. I’d like then to combine those (with scan, into an array of subjects? not sure.) so I can see the status of each task. In principle, I don’t care about a task anymore once it has completed, so ideally, I would stop tracking each task (somehow?) once it has completed.
the final goal is to have a behavior subject of type boolean, that tells me whether the system is “stable” (i.e., no tasks begun but not yet completed).
Daniel Karp
@karptonite
OK, I think I've got it. Thanks, Rubber duck gitter. :)
A'braham Barakhyahu
@BlessYAHU
Anyone know if RxJS play well with Reactjs 18? It has global state management, but I would still like to use observable streams and operators RxJS provides.
2 replies
Attoumane
@akuma8
Hi there,
I am currently learning Angular but RxJS operators seem to cause me some trouble.
There are too many operators that I don't understand their goal.
As a Java developer I try to mirror Observable to the Stream API.
Regarding the Stream Api there are 2 kind of operators: intermediate operators and terminal operator.
Is there something similar to RxJS: intermediate operator and terminal operators (which unsubscribe from the observable).
If yes, do you have a good reference where we can find which operator belongs to which category?
Thank you
Dorus
@Dorus

@akuma8 There are operators that unsubscribe if you're looking for that. Like takeUntil, take(n), timeout, find...
However in RxJs, the goal of operators is to transform a stream to another one, and finally subscribe to observe the final elements and use them, common scenario's in Angular are:
map to transform values on a stream to something else;
tap to have some side effect for each value;
mergeMap or switchMap to do another async operator/call;
takeUntil(onDestroy$) is a common pattern to clean up long running subscriptions;
and forkJoin to combine the results of multiple http calls.

you should probably start somewhere and slowly get familiar with everything. You do not commonly need most operators, and you can always use the decision tree to find what you need. Just remember RxJs is a async query language that can transform and combine most async streams and calls.

Attoumane
@akuma8
Thank you @Dorus
Toprak Nihat Deniz Ozturk
@toprakdeniz

Hi everybody,
Currently working on a project with many objects in queues for time consuming tasks.
object properties might change while they are waiting for the task to be executed on them.
Only the most Recent data must be executed.
There are two classes I want to nerve to each other:

  • 'object' class as data class with observable values
  • 'task' class with functions and a objectQueue$.

Inputs for specific task are subscribed to obj[combinationName].behaviorSubject$ with combineLatest which taps obj[combinationName].changed$.next() ,
changed$ is initiator for object to be enqueued task.objectQueue$ .
When a task consume from objectQueue$ and read its value it needs from the behaviorSubject it taps obj[processName].lastChangeConsumed$.next(). lastChangeConsumed$ is a throttle for objects to be enqueued on objectQueue$.

object registers to a task:
changed$ = this[combinationName].change$
lastChangeConsumed$ = this[processName].lastChangeConsumed$
changed$.pipe(throttle(lastChangeConsumed$, map(()=> this )).subsribe( obj => Task.objectQueue$.next(obj))

task consumes:
this.objectQueue.pipe( tap( (obj)=> obj[this.name].lastChangConsumened$.next() ).subscribe( obj => {
var dataToProcess;
obj[combinationName].behaviorSubject$.pipe(take(1)).subscribe( value => dataToProcess = value);
}

Dorus
@Dorus
@toprakdeniz please use `code` and
```Ts
code
```
Beside that, it's unclear to me what you're asking.
Toprak Nihat Deniz Ozturk
@toprakdeniz

Hi @Dorus
Different process listens for different variables of the object. when a change happens on a variable, related tasks should run on them.
The variables continue to be updated until the task is run on them. Only the most recent update should be processed.

class DataClass{
    ...
    ...
    createCombination(combinationName, combinationInputs){
        this.combinations[combinationName].changed$ = new rxjs.Subject();
        this.combinations[combinationName].behaviorSubject$ = new rxjs.BehaviorSubject();
        this.combinations[combinationName].subscription = rxjs.combineLatest(combinationInputs) 
            .pipe(rxjs.tap(this.combinations[combinationsName].changed$.next())).subscribe(x => this.combinations[combinationsName].behaviorSubject$.next(x));
    }
    registerProcess(theProcess, combinationName ){
        this.processes[theProcess.name].consumed$ = new rxjs.Subject();
        this.processes[theProcess.name].combinationName = combinationName;
        this.processes[theProcess.name].output$ = new rxjs.BehaviorSubject();
        this.combinations[combinationName].changed$.pipe(
            rxjs.throttle(()=> this.processes[theProcess.name].consumed$),
            ).subscribe( () => theProcess.Queue$.next(this) )
        this.processes[theProcess.name].consumed$.next(); // open throttle so first change$ can enqueue
    }
    getLastCombination(processName){
        var data;
        const combinationName = this.processes[theProcess.name].combinationName;
        this.combinations[combinationName].behaviorSubject$.subscribe(x => data = x).unsubscribe();
        this.processes[processName].consumed$.next();
        return data
    }
}

Process class class wraps task an a queue$:

class Process{
    constructor(theProcess, processName){
        this.ouputNames = outputNames;
        this.name = processName;
        this.objectQueue$ = new rxjs.Subject();
        this.processPipe$ = objectQueue$.pipe(
            rxjs.map((obj) => this.processObj(obj)), 
            );
        this.subcription = this.processPipe$.subscribe();
    }
    processObj(obj){
            var data = obj.getLastCombination(this.name);
            obj.processes[this.name].output$.next( this.theProcess(data));
            return obj;
        }
}

Process and DataClass hand shakes over compositions where combineLast and states are managed.
I tried to implement dependency injection. Hope it is readable.
Questions are:
Is it ok to use subjects, observable instead of variable?
Is it ok to hold combineLast out in a behaviorSubject?
Ideas and suggestions are appreciated.

Dorus
@Dorus

@toprakdeniz Overall in RxJs you want to avoid using subjects wherever possible
Reason for this is, Subjects allow you to easily write spaghetti and also allow you too easily to write imperative code, where reactive code would be preferable. Usually in RxJs the reactive solution is much more elegant but you need to wrap your head around it on how to write that way.
Generally the rules for Subjects are: Avoid where possible, and if you do need them, keep them local. But usually you will be able to replace them with either different combinations of RxJs operators, or use more native solutions like event patterns.
For exmaple to avoid anyone form the outside pushing to behaviorSubject$ where you want them to use combinationInputs. Expose the subject with BehaviorSubject.asObservable()

this.combinations[combinationName].behaviorSubject$.subscribe(x => data = x).unsubscribe();

Prefer to use data = this.combinations[combinationName].behaviorSubject$.value

Also, with throttle you are not leveraging RxJs ability to async schedule based on incoming events. Look into functions the *Map operators that can cancel ongoing calculations when a new value comes in (switchMap), or ignore new values if you're still processing the last one (exhaustMap).
Also there are some variant of throttle with different behaviors. Also look at sample and audit, both are more certain to emit a value now and then. Throttle can choke if your value is always changing.


Only the most recent update should be processed.

Ah, audit is going to a be a lot better fit for you than throttle.
I also had a variant of this in one of the *Map operators. I called it auditMap. let me see if i can still dig it up.

Dorus
@Dorus
ReactiveX/rxjs#1777
https://gist.github.com/ghetolay/ddc737d8552aeaa8ef34fc6a3e085c32
found these back. Sadly still no official operator, and i also never made a proper package out of mine either.
2 replies
Toprak Nihat Deniz Ozturk
@toprakdeniz
Using state variables which lead to imperative code was annoying. Looks like audit the turn around. I build again. RxJs is like lego and I have a new shape. Thanks @Dorus
Peter
@AnderssonPeter
Hi I'm using retryWhen to re execute a http request, but when that happens i want to emit a custom message to the subscriber, how would I achieve that?
2 replies
Peter
@AnderssonPeter
I solved it using interval, takeUntil and merge
Craig
@dotnetcraig

Hope someone can help me, I'm trying to subscribe to an observable to set properties on a class in an angular component but it doesn't appear to be possible. This is what I've got

export class MyComponent implements OnInit {
    message: string = '';
    title: string = '';

    constructor(private myService: MyService) {}

    ngOnInit() {
        this.myService.getMessage().subscribe({
            next(res) {
                this.message = res.message;
                this.title = res.title;
            }
        });
    }
}

the error message I am getting is "property 'message' does not exist on type 'Partial<Observer<MessageResponse>>'." if I remove the this from message then it still can't find the class property. What am I doing wrong? Or what am I missing?

2 replies
Craig
@dotnetcraig
it works fine with a function inside the subsribe like ...getMessage().subscribe((res: MessageResponse) => { this.message = res.message; }) but there doesn't appear to be a nice way to handle an error other than adding a null check on 'res'?
Dominic Watson
@intellix
I'd like to run something when someone subscribes to an observable (setting a loading state to true)... I kind of need like a tap for the beginning, is it possible?
Dominic Watson
@intellix
I guess like this?
of(null).pipe(
  tap(() => this.loading$.next(true)),
  switchMap(() => someExpensive$),
  tap(() => this.loading$.next(false)),
)
Dorus
@Dorus
@intellix
defer(() => {
  setting = x;
  return someExpensive$;
})
Omri
@omridevk
Hi, wanted to share a cool library I wrote that helps managing keyboards in your application:
https://github.com/omridevk/ng-keyboard-shortcuts