Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Pau Alarcón
@paualarco
Generally it is used the opposite way Resource[Task, T] => Observable[T], so you initially define the resource that is going to be used for the observable until it completes
Alec Zorab
@AlecZorab
yes, but if I'm trying to conform to someone else's interface, I don't necessarily have that choice
I think I need to do Resource.eval(o.lastL) or something on those lines and just live with the fact it's a bit crap
Pau Alarcón
@paualarco
That would work... I guess that's an edge scenario :D
Alec Zorab
@AlecZorab
I mean perhaps in practice that's actually fine and does exactly what I need, and given that Resource[F, T] represents a single T, there's maybe not much else you can do
elyphas
@elyphas
Hi, is it right to do the validation process on the filter step?
I want to validate the fields of a case class and if it is valid then send to the observer
Something like this:
val validatedMainItem = hdlMainItem
                           .filter { x =>
                               Inputs.validate(x) match {
                                   case Valid(valid) =>
                                     true
                                   case Invalid(errors) =>
                                     val err = errors.foldLeft(""){ case (acc, next) => acc + "\n" + next }.mkString("\n")
                                     alert(err)
                                     false
                               }
                           }.map { x =>
                               Inputs.validate(x) match {
                                 case Valid(valid) => valid
                                 case Invalid(error) => Inputs()
                               }
                           }
3 replies
Volodymyr Barna
@RobertoUa
When monix for cats-effect 3 is going to be available?
Rohan Sircar
@rohan-sircar
We're all waiting for it, better get in line.
Alec Zorab
@AlecZorab

I mean perhaps in practice that's actually fine and does exactly what I need, and given that Resource[F, T] represents a single T, there's maybe not much else you can do

I wonder if there's potential for a subtype of Observable[T] where the grammar is onNext? (onComplete | onError)?, ie you'll get at most one T. There's quite a few places in my code where I really want a "Task-with-cleanup", and either I have to use Resource, which is clunky or Observable which doesn't assert to consumers that there's only one element

Oleg Pyzhcov
@oleg-py
That would be a bad case for the protocol. Remember that something else calls onComplete there, so it's not that you let go of cleanable thing, it'd be like third party telling you to wrap up your business.
I just got used to Resource instead :D
Alec Zorab
@AlecZorab
I don't understand how what I described is any different to the reality of the behaviour you get from Observable.fromResource?
Oleg Pyzhcov
@oleg-py
fromResource relies on the backpressure after onNext, so it's reliant on monix-specific divergence from Rx family of libraries in that internal protocol. It's a thing you can do, but it falls apart in some fairly innocuous scenarios and a lot of interop ones - and Alex argues having it is a mistake (see #1427)
Arunav Sanyal
@Khalian
is there a way to control configure how long a Task's memoizeOnSuccess actually memoizes a successful result. A coworker used this as a cache and may have messed up the timing (we are memoizing aws credentials but I suspect we memoized it for longer than its expiry time somehow)
Oleg Pyzhcov
@oleg-py
No. Once memoized, it's there for as long as you have a reference to that particular task
Arunav Sanyal
@Khalian
thanks
@oleg-py Thanks. I was expecting an answer like that
yea i think we just messed up our caching (instead ended up memoizing the results) and now we cannot reason about our code
Alec Zorab
@AlecZorab
anyone got any thoughts or prior art on implementing a headAndTail for observable that does Observable[A] => Task[(A, Observable[A])]? Would also accept the return type being Observable[(A, Observable[A])] ;)
elyphas
@elyphas

hi, this is right?

    val obsMainItem =
        publishOnEventDocument
            //.dump("En el obsMainItem ************")
            .filter {
                case Right(value@_) => true
                case Left(err@_) => false
            }
            .map {
                case Right((oldItem, newItem, event)) =>
                    event match {
                        case NotFoundDoobie(msg) => oldItem
                        case _ => newItem
                    }
                case _ => ViewProcess() //Here never must be reach
            }

The Left(err) side is for another observable where I handle the errors.

    val obsProcessSideEffects =
        publishOnEventDocument
            //.dump("Changos en el processSideEffects **************")
            .map {
                case Right((oldItem@_, newItem@_, event)) =>
                    event match {
                        case FoundDoobie(x) => div(x)
                        case SavedDoobie(x) => div(x)
                        case DeletedDoobie(x) => div(x)
                        case NotFoundDoobie(x) => div(x)
                        case ErrorDoobie(x) => div(x)
                    }
                case Left(error) => error
            }
Jasper Moeys
@Jasper-M
@AlecZorab Don't know if that counts as prior art but in fs2 you can have
def uncons[A](stream: Stream[Task, A]): Stream[Task, (A, Stream[Task, A])]
  stream.pull.uncons1.flatMap(Pull.output1).stream.unNone
Alec Zorab
@AlecZorab
yeah, I'd like that but in monix :laughing:
Jasper Moeys
@Jasper-M
Monix Iterant also has an uncons :p Don't know if that's coincidence or if pull based streams are inherently better suited for that operation
Alec Zorab
@AlecZorab
Is there a less clunky way to go from Task[Observable[A]] than t.to[Observable].flatten?
rms264
@rms264
Creating a long running task with Monix Kafka to listen to a list of topics. Should I use an IO scheduler for the KafkaConsumerObervable or a ForkJoinPool? Also should I have a separate scheduler for the consumer or just one and use it everywhere
Joseph Denman
@JosephDenman
Does anyone know what the release target for a version of Monix compatible with Cats Effect 3 is?
Arunav Sanyal
@Khalian
I wish to run a specific monix task as pure synchronous code. I am using runSyncStep and passing a simple scheduler with a single threaded executor, but it returns another task in an either. I do not really understand this behavior. Is there a way to just run the underlying code for the task synchronously without any additional complexity or handling?
Arunav Sanyal
@Khalian
nvm, for this very specific test (non production use case), i just called runToFuture and then did an await.
Rohan Sircar
@rohan-sircar
Are you looking for runSyncUnsafe?
Arunav Sanyal
@Khalian
@rohan-sircar I ended up doing executeAsyc and followed up runToFuture. It was just a little bit of test code I was writing
basicbang0x
@basicbang0x
Does Monix work with Cats Effect 3?
Arunav Sanyal
@Khalian
Are there recommendations for technical books for deep diving inside Monix. I am fascinated about the library works under the hood and would love to learn more.
1 reply
Pau Alarcón
@paualarco

Does Monix work with Cats Effect 3?

Not yet I am afraid

Henri Cook
@henricook
Funnily enough I was just looking for an issue tracking this - is there anywhere I could check in on regularly to see how Cats Effect 3 work is going?
Piotr Gawryś
@Avasil

The most up to date info can be found in Typevel discord, Alex is active there: https://discord.gg/hWd4eS244g
He answered a question about the CE3 roadmap recently:

Good question. I'm trying to come up with one and it is ... complicated, because it needs to have an identity that's different from Cats-Effect & fs2.
<br><br>
The value of Monix is in its pragmatism, its approach to FP that allows for impure interfaces to live alongside the pure ones (with established conventions), its deep integration with the host, its coherence, its fairly stable backwards compatibility.
<br><br>
Unfortunately, the task at hand on deciding what to do next, isn't easy to solve, and personally I've been away, on what people call an OSS hiatus. Trying to get back on that ride though.
<br><br>
In other news, in private I'm experimenting with a new Task. I have ideas and I'm foreseeing a philosophical divergence from cats.effect.IO. If CE evolves towards Task, the question would have been if Monix's implementation makes any sense, but CE 3 has actually moved further from Task's philosophy, and now I'm thinking Task can continue to provide value, while integrating CE3 ideas and also implement its type-classes.
<br><br>
Anyway, I was thinking of getting together with Monix contributors, discuss current issues, decide on a road-map, and call for help maybe, as there's plenty to work on.
<br><br>
This does mean that we are nowhere near a new major release. Sorry for not being able to give you a better answer.

Yannick Gladow
@yannick-cw
Hey maybe you can help me, let's say I want to fetch some data every seconds and then process it afterwards.
And I need to make sure that everything I fetch is 100% processed and never dropped, even when the application is shutdown.
What's the best way to make this work?
Right now I am thinking of making it uncancelable and then in the doOnCancel of the Task somehow stopping further fetches, sending a Stop signal upstream, but upstream from the processing part, as that needs to happen.
    Observable
      .interval(1.seconds)
      .flatMap(a => Observable.fromTask(Task.delay(println(a)).as(a))) // Fetching something from API, once fetched we need to ensure it is processed
      .mapEval(a => Task.sleep(2.seconds).as(a))
      .uncancelable
      .consumeWith(Consumer.foreachParallelTask[Long](1)(a => Task.delay(println(s"arrived at the end $a")))) // This step always HAS to be invoked
      .as(ExitCode.Success)
      .doOnCancel(Task.sleep(10.seconds) *> Task.delay(println("CIAO")))
elyphas
@elyphas

Hi, someone could tell me, please.
Why this code isn't working properly?

I want to request on different links, when the Observable triggers an event.

val eventComparative = PublishSubject[String]()
val eventComparativePublish = eventComparative.publish
val cancelEventComparativePublish = eventComparativePublish.connect()

val getMinimosRetensionISR = eventComparativePublish.dump("0").mapEval { case idComparative => ...
val getProcess =  eventComparativePublish.dump("0").mapEval { case idComparative => ...
val getComparativo =  eventComparativePublish.dump("0").mapEval { case idComparative => ...
it is collected this way:
val observableZip3 = Observable.zip3(getMinimosRetensionISR, getProcess, getComparativo)
Pau Alarcón
@paualarco
Perhaps you could share a bit more about your use case? In that snippet I don't see events being emitted
elyphas
@elyphas
@paualarco ; sorry, I will
  val cmdPrint = button("Imprimir", cls := "myButton",
                      SyncIO ( onClick.transformLifted { event: Observable[MouseEvent] =>
                          event
                              .withLatestFrom(getIdComparative) {
                                case (event@_, idComparative: String) =>
                                    idComparative
                          }} --> repoCompara.eventComparative
                      )
                  )
and thank you very much ! :-)
this is the full code where the PublishSubject live: Repository
Pau Alarcón
@paualarco
The repository you've shared is not complete as it has external dependencies, also looks complex to understand. Also don't know what's the problem hehe
See a similar small example to the first you shared, but in this case it shows a bit more how to run the program and emit events to the publisher:
  val publisher = PublishSubject[String]().publish
  val cancelEventComparativePublish = publisher.connect()
  val o1 = eventComparativePublish.dump("Event received!")
  val publishTask = Observable.repeatEvalF(Task(eventComparative.onNext("Hi")))
  Observable.zip2(o1, publishTask).completedL.runSyncUnsafe()
elyphas
@elyphas
@paualarco , sorry,
Pau Alarcón
@paualarco
Hope this is helpful
elyphas
@elyphas
@paualarco; ah thank you, I will try
elyphas
@elyphas
@paualarco , it works thank you, :-)
Trần Tiến Đức
@trantienduchn
Did any one got a problem of missing/mixed/incorrect stacktraces? In my application, I use chains of Observables mixed with Tasks with alot of parMap2, 3, 4, 5. So usually, if any error happens, the stack traces doesn't report fully where the error happend, or pointing to the wrong lines from another source code file, those lines usually point to Observale. firstL or consumeWith... so weird