Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Loïc Descotte
@loicdescotte
but I don't know how to do the same thing with Monix
Loïc Descotte
@loicdescotte
I think it would work with Observable.flatMap and a split on \n to create several observables (1 for each line), but is there a more direct way?
sken
@sken77
if someone would want ot wirte a bitcoin node in monix. is that even good idea or possible? is monix good for bidirectional comm?
moritz bust
@busti
How would I create an observable collection from another collection, whose elements only change when the according element in the source collection changes?
moritz bust
@busti
So lets say, given an observable of users, I would like to turn it into a stream of Vector(UserInfoComponent), but I would like to efficiently update the internal vector, such that, when a new user is emmited by the observable, only that user is added, without all other vector entries being regenerated.
case class User(firstName: String, lastName: String)

val users = userListEvents.collect { case AddUser(user) => user }.scan(Vector())(_ ++ _)

val userInfoComponents = users.map(_.map(new UserInfoComponent(_)))
moritz bust
@busti
I'd probably have to diff the contents of the source array, but I was wondering if there already was a solution for that...
Oleg Pyzhcov
@oleg-py
@Busti not in Monix
moritz bust
@busti
@oleg-py Thank you for the answer. As a workaround I am now wrapping the User object in a class that generates the instance of the component on construction.
Piotr Gawryś
@Avasil
@floreslorca Monix doesn't have good support for bidirectional communication out of the box. Although you can cover many use cases quite easily if you add MVar and/or Ref + Deferred. I don't know much about bitcoin nodes so I can't tell how if that's enough
Piotr Gawryś
@Avasil
@loicdescotte I don't think we have equivalent function, maybe there is something in monix-nio but IIRC it's not production ready
Evan Chan
@velvia
@alexandru and others: what is the status of monix-nio? Is it stable? Is it tested at all in any kind of environment?
Alexandru Nedelcu
@alexandru

@velvia It isn't stable. And I don't like the state it is in.

I'm actually considering archiving it and starting it from scratch, but I don't have time for that.

Piotr Gawryś
@Avasil
I think either this or other connectors are decent idea for something to do after successful release. Especially if we plan it well and make it contributor-friendly with lots of tagged issues
Evan Chan
@velvia
hm ok. Right now I’m looking for a reactive streams like connector for TCP sockets.
  • Akka IO is too low level
  • akka streams is fine, though I’d rather have a Monix one :-p
  • and I don’t want to have to learn Akka streams just to do a TCP socket :-p
Max Lanin
@sirmax

@phongngtuan sorry for not replying that long, I’m on a vacation right now.

your implementation using Observable only works Either right?

No, you can use it for any A. The Either part is for handling errors-or-results produced by .attempt.
In your Task[Option[A]] case you’d have something like this.

Observable.fromTask(t.attempt).collect { case Right(Some(a)) => a }

This observable will emit a for Some(a) and will complete empty on None or an error.
Now, when you combine these observables with merge, you’d have your tasks running in parallel, emitting results as soon as they arrive.
Finally, taking firstOptionL on the merged observable will give you a Task[Option[A]] which will emit the first result if it’s available, or None.

To summarise, try this

val tasks: List[Task[Option[A]]] = ???
val observables = tasks map { t =>
  Observable.fromTask(t.attempt).collect { case Right(Some(a)) => a }
}
Observable.fromIterable(observables).merge.firstOptionL
Phong Nguyen
@phongngtuan
@sirmax no problems :) I think I know where I got it wrong. I supposed a Observable.fromIterable(observables).merge.firstOptionL won't give any result if observables all return None, didn't know that it will also return None if the all the underlying Observable are "finished". Am I right?
Max Lanin
@sirmax

@phongngtuan yup. Quoting the scaladoc

Returns an Option because the source can be empty.

FYI, there is also .firstL: Task[A] that fails with NoSuchElementException on empty source.

Phong Nguyen
@phongngtuan
@sirmax got it, many thanks :)
Mateusz Górski
@goral09
is there an option that would shift at every flatMap/map call ?
Oleg Pyzhcov
@oleg-py
@goral09 task.executeWithModel(ExecutionModel.AlwaysAsyncExecution)
Mateusz Górski
@goral09
Thanks @oleg-py , this works great :)
it's much better thank shift-ing after every call xD
Ilya Murzinov
@ilya-murzinov

hi guys, I started working on #690 and can't figure out why my test is not starting :(
here is the code
monix/monix@b827aaf

reactiveJVM/testOnly *TypeClassLawsForObservableSuite <- works as expected
reactiveJVM/testOnly *TypeClassLawsForSubjectSuite <- doesn't do anything like there no test

Piotr Gawryś
@Avasil
I dont have access to my computer right now so I dont have ideas other than reloading sbt :D what if you put this test with the rest?
Ilya Murzinov
@ilya-murzinov
lol, if I just put checkAllAsync("Profunctor[Subject]") { implicit ec => ??? } in TypeClassLawsForObservableSuite it stops running at all
Oleg Pyzhcov
@oleg-py
maybe that's because ??? throws
Ilya Murzinov
@ilya-murzinov
not only my test, it doesn't run whole suite anymore
??? is just a placeholder
Martijn Hoekstra
@martijnhoekstra
??? is just def ???: Nothing = throw new NotImplementedException("an implementation is missing") -- bringing that into an implicit scope does sound like it could seriously mess something up
mark lister
@marklister
Is it possible to restart an observable?
Mateusz Górski
@goral09
@marklister what do you want to achieve?
mark lister
@marklister
I have one observable performing a expensive operation every twenty minutes and another filling in once a second. I'd like to restart the second one from zero whenever the second one successfully emits
Maybe if I use onErrorRestartUnlimited and induce an error?
Oleg Pyzhcov
@oleg-py

@marklister I'd do

val ticker = expensive.switchMap(_ => mk1SecondObservable)

(if expensive is a hot observable)

mark lister
@marklister
@oleg-py Ok thanks! Will give that a go!
Ilya Murzinov
@ilya-murzinov
@martijnhoekstra yep, that was the case, I removed all ??? from every place and now everything works
mark lister
@marklister
@oleg-py switchmap worked perfectly thank you!
Itamar Ravid
@iravid
Is there a published hash version that works with cats-effect 1.0.0?
Alexandru Nedelcu
@alexandru
None that I can recommend. But today I'm wrapping up the auto-cancelable PR and will make a hash version available tonight.
Will ping you when available.
Oleg Pyzhcov
@oleg-py
Guess it's finally gonna be time to include Monix in my pet project :)
Itamar Ravid
@iravid
Thanks @alexandru!
Felix Dietze
@fdietze
ScalaJS: I just found that window.setTimeout(..., 0) has a small delay. Would it be possible to use setImmediate using a polyfill instead? I'd like to know if it makes a difference for my application. Is it correct that I have to reimplement AsyncScheduler using setImmediate?
Jules Ivanic
@guizmaii

Hi everyone,

It’s possible to cancel a pure computation task ?
Example of what I’m trying to do:

val heuristic: Task[Option[A]] = ???
val bruteForce: Task[Option[A]] = ???

      Task
        .racePair(heuristic, bruteForce)
        .flatMap {
          case Left((Some(a), slower))  => slower.cancel.map(_ => Some(a))
          case Left((None, slower))     => slower.join
          case Right((slower, Some(b))) => slower.cancel.map(_ => Some(b))
          case Right((slower, None))    => slower.join
        }
We observe strange results in our benchmarks.
Oleg Pyzhcov
@oleg-py
@guizmaii yes, it is possible, but Tasks are only cancelable on async boundaries (e.g. when doing Task.shift) and if executed with autoCancelable (IIRC that's the name), or when doing Task.cancelBoundary