Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Feb 01 2021 10:11
    @SystemFw banned @Hudsone_gitlab
  • Jan 31 2019 04:19
    404- forked
    404-/fs2
  • Jan 31 2019 03:01
    SethTisue commented #1232
  • Jan 30 2019 17:22
  • Jan 30 2019 13:45
  • Jan 30 2019 10:48
    pchlupacek commented #1406
  • Jan 30 2019 10:47
    pchlupacek commented #1406
  • Jan 30 2019 10:39
    pchlupacek commented #1407
  • Jan 30 2019 09:58
    lJoublanc commented #870
  • Jan 30 2019 09:42
    vladimir-popov commented #1407
  • Jan 30 2019 08:10
    vladimir-popov closed #1407
  • Jan 30 2019 08:10
    vladimir-popov commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 18:57
    SystemFw commented #1406
  • Jan 29 2019 17:47
    pchlupacek commented #1406
  • Jan 29 2019 17:42
    pchlupacek commented #1406
  • Jan 29 2019 17:39
    pchlupacek commented #1407
  • Jan 29 2019 17:39
    vladimir-popov edited #1407
  • Jan 29 2019 17:38
    vladimir-popov commented #1406
Gavin Bisesi
@Daenyth
well, normally you'd use evalOn with Blocker and not any old EC
Anton Parkhomenko
@chuwy
yup, I just a bit stuck with cats-1.0
Gavin Bisesi
@Daenyth
Blocker is in 1.4.0
I recommend using it, it will make the upgrade easier
Anton Parkhomenko
@chuwy
Still trying to wrap my head around "allows callers to run them in parallel"
Gavin Bisesi
@Daenyth
getting to cats-effect 2.x isn't too bad - it's almost entirely binary compatible
Anton Parkhomenko
@chuwy

Blocker is in 1.4.0

Oh, cool. Completely missed that. Thanks

Gavin Bisesi
@Daenyth
fs2 is harder, especially if you use a lot of libraries dependent on it
but c-e is an easy upgrade for the most part
anyway - evalOn only changes what EC the IO runs on
it doesn't make it return immediately. Just that when it runs, it runs on that EC instead of the EC backing the ContextShift
This allows you to parallelize them much higher - suppose it's waiting on network calls but needs little cpu. You could do eg 64 of them in parallel even if you only have 8 cores, whereas if the thread-blocking call wasn't shifted with evalOn, it would fully consume your 8 cores to wait for the blocking call
Anton Parkhomenko
@chuwy
Alright, thanks a bunch. I’ll try to digest that. At least now I know I didn’t do it wrong
Gavin Bisesi
@Daenyth
Without par, they still run serially
concurrency is basically always explicit with c-e/fs2
Milan van der Meer
@milanvdm
Quick question, can you use foldMapM with fs2? Can't seem to get the imports right.
Fabio Labella
@SystemFw
@milanvdm Stream is not Foldable, although it has the logically equivalent operation
what types are you working with?
basically evalMap(...).foldMonoid or evalMap(...).compile.foldMonoid should give you what you need in the two cases
Milan van der Meer
@milanvdm
Was trying to change some Pull logic by possibly trying out a foldMapM that uses a Ref for some state if that makes sense.
Fabio Labella
@SystemFw
I would say that Pull, Ref and foldMapM are three mutually exclusive approaches
like, either you do recursive Pull
or, a non-concurrent stateful operator (start with mapAccumulate, then see if you need more powerful)
or, if all fails, concurrency and/or Ref (sometimes Ref is easier for non-concurrent stuff as well, at which point it becomes a matter of style/preference)
what are trying to do in terms of logic?

are three mutually exclusive approaches

as a guideline ofc, I'm sure there are plenty of cases where this statement is false

Milan van der Meer
@milanvdm
A Stream[F, String] needs to be split into parts based on a List[Int] that represents the length of each split.
So Stream(ab, cdef) and List(1,3,2) needs to become List(a, bcd, ef).
Fabio Labella
@SystemFw
ah, actually I think that Pull is the easiest way to do that, based off feeling
although I do have a vague memory that we have an op that does something similar somewhere
Milan van der Meer
@milanvdm
I was looking at bufferBybut didn't really see an easy way to use that.
ybasket
@ybasket
@milanvdm a quick-and-dirty solution using mapAccumulate: https://scastie.scala-lang.org/CBzo7dS0QGCzvTnbwUGJGQ
It's by far not optimized (String concatenation and similar), but it should give an idea on how it could be done.
Fabio Labella
@SystemFw
Milan van der Meer
@milanvdm
Thanks a bunch, gave me some nice ideas :)
Daniel Robert
@drobert

I have a use case where streamA publishes to some internal (bounded) queue while streamB consumes from said queue and publishes data to some external system. My current approach is something like this:

trait MyPublisher[A] { 
  def publish(in: A): F[Unit]
}

// publisher should complete external publishing of all elements 
// internally published before the stream terminates
def makeSafePublisher[A]: Stream[F, MyPublisher] {
  val queue = Queue.boundedNoneTerminated[F, A](maxSize)
  val internalPublisher = new MyPublisher[A] {
    override def publish(in: A) = queue.enqueue1(in)
  }

  val internalPublish = Stream.emit(internalPublisher).onComplete(queue.enqueue1(None))
  val externalPublish = queue.dequeue.evalMap(publishToKafka)

  internalPublish concurrently externalPublish

However, it seems the system can shutdown while elements remain in the internal queue. What is the safest way to implement makeSafePublisher that will ensure the internal queue is fully drained?

Or, reasonable alternative approach
Gavin Bisesi
@Daenyth
hm, that reminds me of this conversation the other day: https://gitter.im/functional-streams-for-scala/fs2?at=5e849fd48bb4966bc49bf03b
Daniel Robert
@drobert
Thanks @Daenyth
Daniel Robert
@drobert
@hudclark in your working example, what is stop and what manages its lifecycle?
Daniel Robert
@drobert
actually, I think the suggestion of merge rather than concurrently is what I'm looking for
Ukonn Ra
@UkonnRa
Hi all. New to fs2 and having a question. Why fs2.Stream does not implement Traverse like other Collection classes? Say I have the same situation in https://stackoverflow.com/questions/53806418/chaining-a-number-of-transitions-with-the-state-monad
But the question is, fs2.Stream has no traverse, so the answer does not work in my situation
Gavin Bisesi
@Daenyth
Traverse requires the ability to go from F[A] to A, and Stream can't do that in general
You'll notice that most effect types lack such an instance, any IO monad can't because it represents a non-referentially transparent computation
Stream has /analogous/ operations to traverse, like evalMap and others, but the "leave stream" part needs to go through the .compile step
It's possible that Stream for F=Pure could have such an instance
but any general F couldn't
Ukonn Ra
@UkonnRa
So the only solution is to compile the Stream to a list if I want to "traverse" a Stream?
Michael Pilquist
@mpilquist
There's likely a better way to address the use case that stays within the world of streams. Could you describe more of your use case?
Fabio Labella
@SystemFw
@UkonnRa look into evalMap as well
Gavin Bisesi
@Daenyth
yeah, s.compile.toList is the appropriate way to get a list from a stream