Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Glen Marchesani
@fizzy33
so my next use case (previous one takeByTimespan worked great) is if the Observable emits no items for a period of time to close / complete it.
Glen Marchesani
@fizzy33
val messages: Observable[Message] = ...
I want that to close if no messages are seen for some FiniteDurection
Glen Marchesani
@fizzy33
staring at TakeLeftByTimespanObservable which backs takeByTimespan I think I can easily make my own
Piotr Gawryś
@Avasil
@fizzy33 I think you need timeoutOnSlowUpstream
Glen Marchesani
@fizzy33
yup tx
Glen Marchesani
@fizzy33
so fwiw I am working on server sent events server and client code for jvm and js... https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
one more I have is error handling with state. Ideally I need to handle an error AND I need the most recent value as part of that error handling (the use case is lastEventId). If the server sent event connection drops then the re-connect needs to provide the Last-Event-ID head on the reconnect. All the error handling I see is orthogonal to the actual data. Any methods that let me see the last item with the error handling ?
should I just shift an Observable[A] -> Observable[Either[Throwable,A]]
then I can process things with mapAccumulate ?
sachin-rajwade
@sachin-rajwade
I need to called Observable on a List of objects and then based on some parameter of Object call a Datasource - mainly mongo or Cassandra. Am not able to do so
Jorge
@jvican
@aleczorab_twitter not yet, but I can open source them if you want
I'm planning to do that actually, I just haven't got to it yet
Piotr Gawryś
@Avasil

@sachin-rajwade

Could you be more specific?
If you want to create an Observable from a list of objects then you can call Observable.fromIterable(list)
And then you could call mapEval (if your mongo/cassandra connector returns Task ) or mapEvalF (if it's something else), or flatMap (if it's Observable)

9 replies
Piotr Gawryś
@Avasil

@fizzy33
In case you're still wondering about it, nothing built-in comes to my mind.

I think mapAccumulate / scan can work well.
Other option is keeping the last value in a shared variable (Ref / Atomic / volatile ) or modifying OnErrorRecoverWithObservable to keep track of the latest value.
I feel like the last option should work best if you want to restart Observable in case of an error and Observable[Either[Throwable, A]] is better if you want to have an option to handle the error without restart.

Glen Marchesani
@fizzy33
@Avasil thanks your input is appreciated as a sanity check... I have a large app in the wild and wonder sometimes if there is a purely FP to do this ;-)
I am doing the AtomicRef approach, plan to migrate that to a Ref someday but not in a rush
I did have it working with Either[] and mapAccumulate and scan but it was just too messy
and seemed to affect performance more than I wanted
where the AtomicRef just worked
I like the idea of a OnErrorRecoverWithObservable that tracks the last value... That has legs...
fwiw the move to Monix / Task / OBservable has been very smooth. We are moving a ton of our scala js stuff over to it. Fixed a pretty pathological issue with a very large model (1m rows) getting saved...
and it was really bogging down the browser. Switched to using Observable's and just efficiently chunking up the stuff to the server and it runs much more smoohtly.
My team also now starts to see how easy it is to do stuff like that and they are picking it up themselves, which says a lot since many times I have to tell them to pick it up, but not in this case
Fabien Chaillou
@fchaillou
Hello,
i'm developping an app that use monix-kafka, our source of data is actually sending data in batch to kafka (every X minutes or hours), so we're doing a batch app or our side.
Basically i want my kafka observable to complete when there was no new message for a specified amount of time (let's say one minute).
Does timeoutOnSlowUpstream do what i want ?
I think but i'm not a 100% sure
Piotr Gawryś
@Avasil
@fchaillou Yes
@fizzy33 That's fantastic to hear, I'm glad the library is serving you well!
Fabien Chaillou
@fchaillou
@Avasil perfect, thanks for your quick response !
Fabien Chaillou
@fchaillou
@Avasil congrats on merging the tracing support !
Piotr Gawryś
@Avasil
thanks, I would like to do a release on Sunday but we'll see if I can stick to my schedule :sweat_smile:
Glen Marchesani
@fizzy33
wait, what, tracing support :-D
Jorge
@jvican
@Avasil is it possible that Task.create somehow creates a new local context?
I'm investigating an issue where locals are not propagated when a task that's constructed with Task.create runs
Piotr Gawryś
@Avasil
@jvican It shouldn't create a new context unless there is a bug.
Running Task creates a new one if you're trying to share the context between concurrent "run-loops"
Does it happen if you use a different constructor like Task.eval?
@fizzy33 it's about better stack traces for Task, I've included few examples in the PR: monix/monix#1267
Jorge
@jvican
@Avasil I think I'm onto something, replacing Task.create by Task.fromCancelablePromise makes the context propagation work
Piotr Gawryś
@Avasil
Okay, I am going to look into it later today
Alec Zorab
@aleczorab_twitter
@jvican They'd definitely be pretty useful for me - at the moment I'm busy relying on a very roughly hacked together edit of grpcmonix. It's not urgent though, so if there's a lot of extra work for you to get them into a releasable state, I'm not in a massive hurry
Glen Marchesani
@fizzy33
hey I am working on an RPC framework using monix (well converting our existing RPC to be based on monix). Short description is the RPC is defined as

// api layer

case class DoSomethingRequest(
  foo: Int,
  numbersStream: Observable[Int],
)

case class DoSomethingResponse(
  bar: Int,
  summedNumbers: Observable[Int], 
)

trait MyApi[M[_, _]] extends ApiBuilder[M[_,_]] {
  val doSomething = remoteCall[DoSomethingRequest, DoSomethingResponse]
}


... // in some client else where

implicit val messagingFabric: MessagingFabric = ...  // lots of ways to do this 1 to 1 RPC or 1 to many RPC lets assume MessagingFabric is hard wired to a single server

object MyApiClient extends MyApi[RemoteCall]

// RemoteCall exposes the call as a Task[Response]

val request: DoSomethingRequest = ... 

val response: Task[DoSomethingResponse] = MyApiClient.doSomething(request)
noting that the summedNumbers in DoSomethingResponse is all managed and pass from server to client via the MessagingFabric
Would this be useful to other people ?
Glen Marchesani
@fizzy33
to be clear this isn't a pet project, this is a heavily hardened RPC layer we use with at all our clients that can handle a diverse range of distributed topologies
Glen Marchesani
@fizzy33
since it now relies on monix / observable I think it is generic enough and open source able we are just trying to decide that if we make the open source effort if anyone will actually use it
Glen Marchesani
@fizzy33
I have a use case where I have a global val allMessages: Observable[Message]
then a lot of routing where a message has a correlation which routes it.
right now things that need a "subChannel" i.e. all message matching a particular correlation. I am using
val subChannel = allMessages.filter(_.id === subChannelId)
I am going to run into use cases where there are thousands if not 100K's of sub channel's filtering off the same allMessages.
Is there anything in Monix that can support a more robust distributor pattern ?
I have my hand coded one I am trying now but figured to ask here first ;-)
Glen Marchesani
@fizzy33
interestingly. On the UI side we are heavy users of https://github.com/OlivierBlanvillain/monadic-html and ran into the same thing there...
Fabien Chaillou
@fchaillou

Hello,
I have an issue with monix-kafka.
I have the following code :

  kafkaObservable
     .map(data => Element(parseData(data), data.offset)))
     .timeoutOnSlowUpstreamTo(10.seconds, Observable.now(End))
     ...
     .mapEval(offsetBatch: CommittableOffsetBatch => offsetBatch.commitASync())

this fails because the timeoutOnSlowUpstream probably cancel the upstream and that closes the kafka consumer and the commit fails with :
This consumer has already been closed

any idea to fix that ?