Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Glen Marchesani
@fizzy33
and the amount of code to do that is almost nil
I just get it for free
so concatWithLast is a very mild hack for the gains it promises
this is a TON of things listening to a large pulsar cluster
Oleg Pyzhcov
@oleg-py
not sure I even understand what it does
calls a function on a last element and then emits the results concatenated with a current one?
Glen Marchesani
@fizzy33
yes that is what it does
Oleg Pyzhcov
@oleg-py
does it not work without unsafe stuff?
def concatWithLast[A](s: Observable[A])(f: A => Observable[A]) = {
  s.publishSelector { hot =>
    Observable.from(hot.lastL.start).flatMap { fa =>
      hot ++ Observable.from(fa.join).flatMap(f)
    }
  }
}
Glen Marchesani
@fizzy33
I can try it :-)
my concatWithLast is incredibly and admittedly naive
This message was deleted
Glen Marchesani
@fizzy33
that doesn't work I haven't debugged too much to figure out why
Ilya
@squadgazzz

Can anyone please explain what's going on here?

  val obs = Observable.fromIterable(1 to 2)
  val result = obs.publishSelector { o =>
    o ++ Observable.fromIterable(11 to 12)
  }
  result.toListL.executeAsync.runToFuture.foreach(l => println(l.mkString("[", ",", "]")))
  Thread.sleep(5.seconds.toMillis)

result: [1,2,11,12]

  val obs = Observable.fromIterable(1 to 2)
  val result = obs.publishSelector { o =>
    o ++ Observable.fromIterable(11 to 12)
  } ++ Observable.fromIterable(101 to 102)
  result.toListL.executeAsync.runToFuture.foreach(l => println(l.mkString("[", ",", "]")))
  Thread.sleep(5.seconds.toMillis)

result: [11,12,101,102]

Where did obs's elements gone?

Glen Marchesani
@fizzy33
fwiw @oleg-py in all my micro tests the concatWithLast you supplied works...
Glen Marchesani
@fizzy33
What I am seeing in the production use case is all the messages are pumped before the hot ++ Observable.from(fa.join).flatMap(f) happens
Alexandru Nedelcu
@alexandru
@squadgazzz that's a good question ­čÖé from the looks of it, doesn't seem right; could be some sort of bug
I smell a subscription timing issue, since publishSelector works with a PublishSubject underneath, and it might lose initial events if the underlying implementation subscribes too early.
Ilya
@squadgazzz
@alexandru how can I handle this? I have to use puplishSelector in real code. And the same time I need to append another observable later.
Glen Marchesani
@fizzy33
what about
val result0 = obs.publishSelector { o =>
    o ++ Observable.fromIterable(11 to 12)
  }
val result1 = obs.publishSelector { o => o ++ Observable.fromIterable(101 to 102) }
Alexandru Nedelcu
@alexandru
@squadgazzz I don't know why that happens, from the look of it you're not doing anything incorrectly. This needs to be investigated. Let me think.
Glen Marchesani
@fizzy33
fwiw this does have the behaviour you expect

  val obs = Observable.fromIterable(1 to 2)
  val result =
    obs
      .publishSelector { o =>
        o ++ Observable.fromIterable(11 to 12)
      }
      .publishSelector(_ ++ Observable.fromIterable(101 to 102))
  result.toListL.executeAsync.runToFuture.foreach(l => println(l.mkString("[", ",", "]")))
  Thread.sleep(5000)
but that becomes untenable to anywhere you use the obs you have to use it inside of publishSelector
Alexandru Nedelcu
@alexandru
Yeah, well, if it's a bug, we'd have to fix it.
@squadgazzz can you please open an issue? I'll take a look.
Ilya
@squadgazzz
@alexandru monix/monix#1292 thank you
@fizzy33 yeah, this works if I know about previous publishSelector call.
Cyrill Sitnikov
@InfiniteDisorder
Hello guys, have you ever experienced migrating custom akka flow stages to monix, is it possible to do it without scheduler evidence? In the example below is it possible to emit batches from OutputStream to something observable-like
val buf = new OutputStream {
     override def write(b: Int): Unit = {
        /* previously bytes were emitted to the next stage of  the flow */
    }
}
Alexandru Nedelcu
@alexandru
@InfiniteDisorder in that write, you could write those bytes into a ConcurrentSubject.
ClementWee
@ClementWee
Does Observable cancel itself on encountering an error?
Piotr Gawry┼Ť
@Avasil
@ClementWee it completes with an error if you subscribe in Task (L methods, consumeWith) and stops upstream. So it cancels in some way and it's immediate but it's not the same as canceling a subscription
Alex Michael Berry
@almibe
Is there anything in Monix to help with nested Tasks that you want to run?
ClementWee
@ClementWee
@almibe what do you mean by "nested tasks"?
Alex Michael Berry
@almibe
I think I have it sorted out. I think I was just running to issues with mixing cats effect Resources, Monix Tasks, and Monix Observables in the same api w/ added intellij idea false negatives goodness.
─îeslovas Lopan
@csk157

Hello, I'm using monix circuit breaker. All in all it works great, but I'd like to let some of the exceptions happen and still be thrown without counting towards failures. Does anyone have any hints on how to achieve that?

Currently it looks similar to:

circuitBreaker
  .protect { Task... }
  .toAsync(Async[M], eff)
  .recoverWith {
    case t: ErrIDontWantToCount => Effect[M].raiseError(t)
    case NonFatal(ex) if (n - 1) > 0 =>
      retry(n - 1)
    case t => Effect[M].raiseError(FailedLastRetry(t))
  }

and ErrIDontWantToCount is handled correctly, but counts towards failures

Glen Marchesani
@fizzy33
is there a way to have a Observable that gets completed / closed after a set period of time ?
Pau Alarc├│n
@paualarco
@fizzy33 you could use ob.lastL.timeout(finiteDuration) to wait either for the last emitted event or the end of the timeout
Glen Marchesani
@fizzy33
The use case is. I want to have the observable work for 30 seconds and then stop wherever it is at.
Pau Alarc├│n
@paualarco
yep so I think that would fit in your use case
Glen Marchesani
@fizzy33
if the source observable never finishes lastL would never happen
hang on I need to read the docs for timeout :-)
ah I see
nm
tx
Piotr Gawry┼Ť
@Avasil
There is takeByTimespan
Piotr Gawry┼Ť
@Avasil
@csk157 First idea that comes to my mind is exposing these failures in Either, for instance:
import cats.syntax.monadError._

circuitBreaker
  .protect { Task().map(Right(_).onErrorRecover { case e: YourError => Left(e) }}
  .rethrow
─îeslovas Lopan
@csk157
Thanks! Let me give this a go
─îeslovas Lopan
@csk157
@Avasil it seems to work. Thank you!
Glen Marchesani
@fizzy33
oh snap thanks @Avasil that is exactly it. I stared at the api docs a few times now how come I never found it :-/
Glen Marchesani
@fizzy33
so my next use case (previous one takeByTimespan worked great) is if the Observable emits no items for a period of time to close / complete it.
Glen Marchesani
@fizzy33
val messages: Observable[Message] = ...
I want that to close if no messages are seen for some FiniteDurection