Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Aug 10 20:04
    benlesh closed #7010
  • Aug 10 20:04

    benlesh on master

    docs: fix typo word 'occurrance… (compare)

  • Aug 10 20:03

    benlesh on master

    docs(observable.md): add missin… (compare)

  • Aug 10 20:03
    benlesh closed #7026
  • Aug 10 20:02

    benlesh on master

    docs(delayWhen): improve docs r… (compare)

  • Aug 10 20:02
    benlesh closed #6977
  • Aug 10 20:02
    benlesh closed #7028
  • Aug 09 08:09
    nick-bailey-uk commented #2601
  • Aug 09 05:52
    iofedurin edited #7035
  • Aug 09 05:45
    MeetzhDing commented #6926
  • Aug 08 17:39
    bdirito commented #7030
  • Aug 08 17:39
    bdirito commented #7030
  • Aug 08 10:05
    iofedurin edited #7035
  • Aug 08 09:56
    iofedurin edited #7035
  • Aug 08 02:04
    tamtakoe commented #7033
  • Aug 08 02:03
    tamtakoe commented #7033
  • Aug 07 20:05
    iofedurin edited #7035
  • Aug 07 20:05
    iofedurin edited #7035
  • Aug 07 16:13
    iofedurin opened #7035
  • Aug 05 20:09
    jkossis commented #6760
Attoumane
@akuma8
Thank you @Dorus
Toprak Nihat Deniz Ozturk
@toprakdeniz

Hi everybody,
Currently working on a project with many objects in queues for time consuming tasks.
object properties might change while they are waiting for the task to be executed on them.
Only the most Recent data must be executed.
There are two classes I want to nerve to each other:

  • 'object' class as data class with observable values
  • 'task' class with functions and a objectQueue$.

Inputs for specific task are subscribed to obj[combinationName].behaviorSubject$ with combineLatest which taps obj[combinationName].changed$.next() ,
changed$ is initiator for object to be enqueued task.objectQueue$ .
When a task consume from objectQueue$ and read its value it needs from the behaviorSubject it taps obj[processName].lastChangeConsumed$.next(). lastChangeConsumed$ is a throttle for objects to be enqueued on objectQueue$.

object registers to a task:
changed$ = this[combinationName].change$
lastChangeConsumed$ = this[processName].lastChangeConsumed$
changed$.pipe(throttle(lastChangeConsumed$, map(()=> this )).subsribe( obj => Task.objectQueue$.next(obj))

task consumes:
this.objectQueue.pipe( tap( (obj)=> obj[this.name].lastChangConsumened$.next() ).subscribe( obj => {
var dataToProcess;
obj[combinationName].behaviorSubject$.pipe(take(1)).subscribe( value => dataToProcess = value);
}

Dorus
@Dorus
@toprakdeniz please use `code` and
```Ts
code
```
Beside that, it's unclear to me what you're asking.
Toprak Nihat Deniz Ozturk
@toprakdeniz

Hi @Dorus
Different process listens for different variables of the object. when a change happens on a variable, related tasks should run on them.
The variables continue to be updated until the task is run on them. Only the most recent update should be processed.

class DataClass{
    ...
    ...
    createCombination(combinationName, combinationInputs){
        this.combinations[combinationName].changed$ = new rxjs.Subject();
        this.combinations[combinationName].behaviorSubject$ = new rxjs.BehaviorSubject();
        this.combinations[combinationName].subscription = rxjs.combineLatest(combinationInputs) 
            .pipe(rxjs.tap(this.combinations[combinationsName].changed$.next())).subscribe(x => this.combinations[combinationsName].behaviorSubject$.next(x));
    }
    registerProcess(theProcess, combinationName ){
        this.processes[theProcess.name].consumed$ = new rxjs.Subject();
        this.processes[theProcess.name].combinationName = combinationName;
        this.processes[theProcess.name].output$ = new rxjs.BehaviorSubject();
        this.combinations[combinationName].changed$.pipe(
            rxjs.throttle(()=> this.processes[theProcess.name].consumed$),
            ).subscribe( () => theProcess.Queue$.next(this) )
        this.processes[theProcess.name].consumed$.next(); // open throttle so first change$ can enqueue
    }
    getLastCombination(processName){
        var data;
        const combinationName = this.processes[theProcess.name].combinationName;
        this.combinations[combinationName].behaviorSubject$.subscribe(x => data = x).unsubscribe();
        this.processes[processName].consumed$.next();
        return data
    }
}

Process class class wraps task an a queue$:

class Process{
    constructor(theProcess, processName){
        this.ouputNames = outputNames;
        this.name = processName;
        this.objectQueue$ = new rxjs.Subject();
        this.processPipe$ = objectQueue$.pipe(
            rxjs.map((obj) => this.processObj(obj)), 
            );
        this.subcription = this.processPipe$.subscribe();
    }
    processObj(obj){
            var data = obj.getLastCombination(this.name);
            obj.processes[this.name].output$.next( this.theProcess(data));
            return obj;
        }
}

Process and DataClass hand shakes over compositions where combineLast and states are managed.
I tried to implement dependency injection. Hope it is readable.
Questions are:
Is it ok to use subjects, observable instead of variable?
Is it ok to hold combineLast out in a behaviorSubject?
Ideas and suggestions are appreciated.

Dorus
@Dorus

@toprakdeniz Overall in RxJs you want to avoid using subjects wherever possible
Reason for this is, Subjects allow you to easily write spaghetti and also allow you too easily to write imperative code, where reactive code would be preferable. Usually in RxJs the reactive solution is much more elegant but you need to wrap your head around it on how to write that way.
Generally the rules for Subjects are: Avoid where possible, and if you do need them, keep them local. But usually you will be able to replace them with either different combinations of RxJs operators, or use more native solutions like event patterns.
For exmaple to avoid anyone form the outside pushing to behaviorSubject$ where you want them to use combinationInputs. Expose the subject with BehaviorSubject.asObservable()

this.combinations[combinationName].behaviorSubject$.subscribe(x => data = x).unsubscribe();

Prefer to use data = this.combinations[combinationName].behaviorSubject$.value

Also, with throttle you are not leveraging RxJs ability to async schedule based on incoming events. Look into functions the *Map operators that can cancel ongoing calculations when a new value comes in (switchMap), or ignore new values if you're still processing the last one (exhaustMap).
Also there are some variant of throttle with different behaviors. Also look at sample and audit, both are more certain to emit a value now and then. Throttle can choke if your value is always changing.


Only the most recent update should be processed.

Ah, audit is going to a be a lot better fit for you than throttle.
I also had a variant of this in one of the *Map operators. I called it auditMap. let me see if i can still dig it up.

Dorus
@Dorus
ReactiveX/rxjs#1777
https://gist.github.com/ghetolay/ddc737d8552aeaa8ef34fc6a3e085c32
found these back. Sadly still no official operator, and i also never made a proper package out of mine either.
2 replies
Toprak Nihat Deniz Ozturk
@toprakdeniz
Using state variables which lead to imperative code was annoying. Looks like audit the turn around. I build again. RxJs is like lego and I have a new shape. Thanks @Dorus
Peter
@AnderssonPeter
Hi I'm using retryWhen to re execute a http request, but when that happens i want to emit a custom message to the subscriber, how would I achieve that?
2 replies
Peter
@AnderssonPeter
I solved it using interval, takeUntil and merge
Craig
@dotnetcraig

Hope someone can help me, I'm trying to subscribe to an observable to set properties on a class in an angular component but it doesn't appear to be possible. This is what I've got

export class MyComponent implements OnInit {
    message: string = '';
    title: string = '';

    constructor(private myService: MyService) {}

    ngOnInit() {
        this.myService.getMessage().subscribe({
            next(res) {
                this.message = res.message;
                this.title = res.title;
            }
        });
    }
}

the error message I am getting is "property 'message' does not exist on type 'Partial<Observer<MessageResponse>>'." if I remove the this from message then it still can't find the class property. What am I doing wrong? Or what am I missing?

2 replies
Craig
@dotnetcraig
it works fine with a function inside the subsribe like ...getMessage().subscribe((res: MessageResponse) => { this.message = res.message; }) but there doesn't appear to be a nice way to handle an error other than adding a null check on 'res'?
Dominic Watson
@intellix
I'd like to run something when someone subscribes to an observable (setting a loading state to true)... I kind of need like a tap for the beginning, is it possible?
Dominic Watson
@intellix
I guess like this?
of(null).pipe(
  tap(() => this.loading$.next(true)),
  switchMap(() => someExpensive$),
  tap(() => this.loading$.next(false)),
)
Dorus
@Dorus
@intellix
defer(() => {
  setting = x;
  return someExpensive$;
})
Omri
@omridevk
Hi, wanted to share a cool library I wrote that helps managing keyboards in your application:
https://github.com/omridevk/ng-keyboard-shortcuts
Matti Järvinen
@nemeciii_twitter

I'm having hard time testing a component using a rxjs with repeat(). What I'm doing is polling a bluetooth device while the component exists.

This doesn't work.

import * as rxjsObservables from 'rxjs/observables';
spyOn(rxjsObservables, 'repeat').and.returnValue( () => EMPTY );

And I'm using Jasmine.

2 replies
dojchek
@dojchek

Hello guys,
I'm having troubles getting the stream being executed in the right order. Here is the example:

of("Anything")
  .pipe(
    switchMap(() =>
      of("Anything").pipe(
        tap(() => console.log("First started")),
        finalize(() => console.log("Finalize first"))
      )
    ),
    switchMap(() =>
      of("Anything").pipe(
        tap(() => console.log("Second started")),
        finalize(() => console.log("Finalize second"))
      )
    )
  )
  .subscribe();

I need it to execute in the following order:

  • First started
  • Finalize first
  • Second started
  • Finalize second

Instead I'm getting the following sequence:

  • First started
  • Second started
  • Finalize first
  • Finalize second

If I put delay(0) after the first switchMap it behaves as expected - but it really feels dirty to do so..

Anyone knows what is happening here and if this can be elegantly solved?
Thank you very much

5 replies
Brandt B.
@babeal
@Dorus Yo, it's been a while! Is there a new version of the marble visualization you built to show the behavior of the different time operators?
Dorus
@Dorus
@babeal idk what the last one you saw is, but i havnt touched that in years
Brandt B.
@babeal
@Dorus I still have the one but it needs to be upgraded. Do you have any other links to apps like that?
Dorus
@Dorus
@babeal A bunch of people made thngs. But even the old one we made shouldn't be too hard to update to latest rxjs

Let me share my rxjs link dump. Hopefully these still work, i havn't really looked at them much the last few years

https://rxjs.dev/
https://jsbin.com/migogu/4/edit?js,output
http://reactivex.io/learnrx/
https://egghead.io/search?query=rx
https://egghead.io/technologies/rx
http://jaredforsyth.com/rxvision/examples/playground/
https://rxjs-playground.github.io/#/
https://fingerpich.github.io/rx-flow/load/%7B%22nodes%22:%5B%7B%22id%22:1,%22x%22:579.5,%22y%22:462,%22node_type%22:%22Subscribe%22,%22properties%22:%7B%7D%7D,%7B%22id%22:3,%22x%22:579,%22y%22:168,%22node_type%22:%22Filter%22,%22properties%22:%7B%22filter%22:%223%22%7D%7D,%7B%22id%22:4,%22x%22:680,%22y%22:311,%22node_type%22:%22First%22,%22properties%22:%7B%22filter%22:0%7D%7D,%7B%22id%22:5,%22x%22:399,%22y%22:167,%22node_type%22:%22Map%22,%22properties%22:%7B%22mapFunc%22:%221%22%7D%7D,%7B%22id%22:7,%22x%22:399,%22y%22:471,%22node_type%22:%22Range%22,%22properties%22:%7B%22start%22:1,%22count%22:16%7D%7D,%7B%22id%22:8,%22x%22:289,%22y%22:312,%22node_type%22:%22Filter%22,%22properties%22:%7B%22filter%22:%221%22%7D%7D%5D,%22edges%22:%5B%7B%22source%22:5,%22target%22:3%7D,%7B%22source%22:3,%22target%22:4%7D,%7B%22source%22:4,%22target%22:1%7D,%7B%22source%22:7,%22target%22:8%7D,%7B%22source%22:8,%22target%22:5%7D%5D%7D
https://rxviz.com/
http://rxfiddle.net/
http://reactive.how/
https://developers.google.com/web/updates/2018/01/devtools#overrides
https://reactive.how/
https://rxmarbles.com/
http://introtorx.com/
https://blog.hediet.de/post/a_typescript_playground_for_rx_js#XQAAAAL7BAAAAAAAAABKINBuYDZsN5YTW6OHrMtN89aFQcJaFUKfsoWjXpBupTEj3QLZNqO4ofgmGWnqZhNgiTqPGcDKwnq0GEYZObuUdaylZEoDuK-aE73Tm9Odt4xNGb0o5qJUE5B0IabMZ1lJSS2aiiPx19AROKK9DTkE8DaRrTh2boUbP88fS9OXdFTc2XeLtmiM5iAzGvCALE_SHMSPV_sDCPtsalUhxvUeUGobYb-lju4MoT1J_2eylZjl7eudk2ua2WJqcSnclg_hyIMgZEKf0TKnWiPjEunCK8FX_IIFjuVvWNeOAlSiXOvlgdbnjcYOb46FErbNdjx1OguBnWIxaBkh1YWzNoQMuzGNXbJ9R57pjzZ50Qw-QwYEaxEv-MawRwVqzFZi--SYCLiKaZy1iGqnaKOh3UWwgC_A79aJtonchBj1piYu6bX3icE3ogod6pSNqCRYubCXFfuB-Qay2SAFoPV_MmJF9QhFKc3Q2bKrfwJqx0EnN-9pxejfDwgGhRVPrjuS1iwDJAko9Eeyxnicg2vy_qBLI0aAeskSRnVYLnw2T2k6W3pTYdvdLu101PBsPVQYIwgpOBTQHHv9fZPcyCn2H7QC0cYYFTuzz2rLkkWlcl2OST5jiy--l6x5W1IFUoEfPFPN9BrQdtfnCFXYHudpKDOgFWxT0xEKKMi5wZk66scxmIfqi8n7Y3eR7jl_604GsctYO5Tmp6ehQZBikt7PTGiSFqFgtEIbJcplkmOPT4vbedJM14ZtAu315E2-9k3CvPdaUAUv_UF4undefinedQ
https://mailchi.mp/9c8299ec62c2/rxjsunittesting
https://scotch.io/tutorials/rxjs-operators-for-dummies-forkjoin-zip-combinelatest-withlatestfrom/amp
https://stackblitz.com/edit/jasmine-marbles-testing-xpro7q?file=src%2Ftime.spec.ts

Akshay3651
@Akshay3651
const onNoteUpdate = dialogRef.componentInstance.noteData
.pipe(
switchMap(caseData => {
if (caseData) {
if (isAddOperation) {
return this.caseService.addNote$(caseId, caseData as CaseNote);
} else {
return this.caseService.updateNote$(
caseId,
note.id,
caseData as CaseNote
);
}
} else {
return of(null);
}
}),
9 replies

Test suit faild : apps/starfruit/src/app/samples-module/notes-tab/notes-tab.component.ts:68:8 - error TS2339: Property 'pipe' does not exist on type 'EventEmitter<any>'.

68 .pipe(

This was worked in my angular 8 version and rxjs 6.5 now i have upgraded angular to 11 and rxjs from 6.5 to 7.5.5
I am using MatDialog of Angular Material
Material also upgraded from 8.2.3 to 11.2.13
but error is coming after I upgraded rxjs to 7
I have upgraded rxjs from 6 to 7
Not able to fix this issue.Please help me for this kind of error.
Matthew Wagerfield
@wagerfield

Hey Everyone :wave: could someone explain to me whether or not I can/should still be using of?

It's marked as deprecated in the docs but the link to Schedular Argument page where it has a couple of examples of how to refactor code using of is a little confusing since the latter example refactoring merge, concat etc. still uses of.

I have read the docs and understand that the schedular argument of of (and other functions) is being deprecated...but the whole of function is marked as deprecated which is really confusing?

3 replies
mtt456987
@mtt456987:matrix.org
[m]

hi guys,
repeatWhen seems to be depraciated
what is the alternative to the code below please :

const array$ = Observable.from(["arr1", "arr2", "arr3", "arr4"]).do(console.log);
const click$ = Observable.fromEvent(document, 'click');

array$
.repeatWhen(() => click$)
.subscribe();

thanks

18 replies
mtt456987
@mtt456987:matrix.org
[m]
all i want is to tepeat the latest value of an observable on the next value of another observable
mtt456987
@mtt456987:matrix.org
[m]
:point_up: Edit: all i want is to repeat the latest value of an observable on the next value of another observable
mtt456987
@mtt456987:matrix.org
[m]
the rx's version in my project is 7.5.5
mtt456987
@mtt456987:matrix.org
[m]
ok many thanks @dojchek
Rob
@borriej
Hi
how do i merge a from() that fires mulitplle api calls into a single stream again?
    api
      .pipe(
        // check for role change, then we must delete
        switchMap(res => {
          // current rolIds 
          const currentRolIds = _.orderBy(this.data[e.i].roles.map(x => x.id), 'asc');
          // new rolIds
          const newRolIds = _.orderBy(model.rolIds, 'asc');
          // get the removed role ids
          const difference = _.difference(currentRolIds, newRolIds);
          // check
          if (difference.length > 0) {
            return from(difference)
              .pipe(
                concatMap(x => this.userService.deleteRolesByUserId(this.data[e.i].id, x))
              );
          }

          return of(res);
        }),
        tap(res => {
i dont want my tap to fire multiple times
but only once, once its done
Rob
@borriej
aah found it: toArray()
1 reply
dojchek
@dojchek
@borriej You may want to execute these http reqs simultaneously with forkJoin instead?
Robert Main
@robertmain
Hey folks
I'm working on building a server-side RxJS app
A user connects via websocket, at which point a new Subject is created and the websocket client subscribed
The problem is unsubscribing the client when they disconnect
My understanding is that one Subject has many Subscribers. How am I supposed to correctly identify the Subscriber to unsubscribe?
Sure, I can call Subject.complete but that's going to kill things for all subscribers, not just the one that disconnected
5 replies
Jordan Hurt
@jth0024

Hey everyone! I'm struggling with a seemingly simple problem that is actually proving quite difficult for me to solve.

How can I alter the behavior of pairwise to emit [undefined, next] when the first value comes through rather than waiting until two values have been emitted?

I've tried pipe(startWith(undefined), pairwise()), but this causes every subscriber to receive [undefined, next] regardless of whether or not the observable has previously emitted. For example, given a -> b -> c, I want the first emission to be [undefined, a], and subsequent emissions to be [a, b], [b, c], etc. Instead, I get [undefined, b], etc. depending on when subscribe() is called.

Any help would be greatly appreciated!

3 replies
Rafał Łużyński
@mindbrave
Hey folks, I have perhaps easy question, but can't find solution for it. I want to have scan operator that takes async function and doesn't take next value from source obs until that Promise from acc function resolves and yields value for acc. Like in this example https://codesandbox.io/s/reactive-playground-forked-7sy8k0?file=/src/index.js but it of course doesn't work with just scan.
1 reply
manavortex
@manavortex

Hey there,

I have a little problem with two input sources and one subscription:

onInput1Change(event) => {
  this.myPipe.method(event.newValue).subscribe((result) => {
    this.myProperty1 = result;
    this.doBackendStuff();
  })
}

onInput2Change(event) => {
  this.myPipe.method(event.newValue).subscribe((result) => {
    this.myProperty2 = result;
    this.doBackendStuff();
  })
}

// Input1 and Input2 can be set programmatically (two events in very short order) or manually (one event).
// How do I share this observable so that it waits a little bit before firing and then shares a replay?
doBackendStuff() => {
  this.backendCall(this.myProperty1, this.myProperty2)
    .pipe(someMagicOperator)
    .subscribe((resultsOfBackendCall) => {
      // things happen
    })
}
13 replies