Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Feb 01 2021 10:11
    @SystemFw banned @Hudsone_gitlab
  • Jan 31 2019 04:19
    404- forked
    404-/fs2
  • Jan 31 2019 03:01
    SethTisue commented #1232
  • Jan 30 2019 17:22
  • Jan 30 2019 13:45
  • Jan 30 2019 10:48
    pchlupacek commented #1406
  • Jan 30 2019 10:47
    pchlupacek commented #1406
  • Jan 30 2019 10:39
    pchlupacek commented #1407
  • Jan 30 2019 09:58
    lJoublanc commented #870
  • Jan 30 2019 09:42
    vladimir-popov commented #1407
  • Jan 30 2019 08:10
    vladimir-popov closed #1407
  • Jan 30 2019 08:10
    vladimir-popov commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 18:57
    SystemFw commented #1406
  • Jan 29 2019 17:47
    pchlupacek commented #1406
  • Jan 29 2019 17:42
    pchlupacek commented #1406
  • Jan 29 2019 17:39
    pchlupacek commented #1407
  • Jan 29 2019 17:39
    vladimir-popov edited #1407
  • Jan 29 2019 17:38
    vladimir-popov commented #1406
Milan van der Meer
@milanvdm
Quick question, can you use foldMapM with fs2? Can't seem to get the imports right.
Fabio Labella
@SystemFw
@milanvdm Stream is not Foldable, although it has the logically equivalent operation
what types are you working with?
basically evalMap(...).foldMonoid or evalMap(...).compile.foldMonoid should give you what you need in the two cases
Milan van der Meer
@milanvdm
Was trying to change some Pull logic by possibly trying out a foldMapM that uses a Ref for some state if that makes sense.
Fabio Labella
@SystemFw
I would say that Pull, Ref and foldMapM are three mutually exclusive approaches
like, either you do recursive Pull
or, a non-concurrent stateful operator (start with mapAccumulate, then see if you need more powerful)
or, if all fails, concurrency and/or Ref (sometimes Ref is easier for non-concurrent stuff as well, at which point it becomes a matter of style/preference)
what are trying to do in terms of logic?

are three mutually exclusive approaches

as a guideline ofc, I'm sure there are plenty of cases where this statement is false

Milan van der Meer
@milanvdm
A Stream[F, String] needs to be split into parts based on a List[Int] that represents the length of each split.
So Stream(ab, cdef) and List(1,3,2) needs to become List(a, bcd, ef).
Fabio Labella
@SystemFw
ah, actually I think that Pull is the easiest way to do that, based off feeling
although I do have a vague memory that we have an op that does something similar somewhere
Milan van der Meer
@milanvdm
I was looking at bufferBybut didn't really see an easy way to use that.
ybasket
@ybasket
@milanvdm a quick-and-dirty solution using mapAccumulate: https://scastie.scala-lang.org/CBzo7dS0QGCzvTnbwUGJGQ
It's by far not optimized (String concatenation and similar), but it should give an idea on how it could be done.
Fabio Labella
@SystemFw
Milan van der Meer
@milanvdm
Thanks a bunch, gave me some nice ideas :)
Daniel Robert
@drobert

I have a use case where streamA publishes to some internal (bounded) queue while streamB consumes from said queue and publishes data to some external system. My current approach is something like this:

trait MyPublisher[A] { 
  def publish(in: A): F[Unit]
}

// publisher should complete external publishing of all elements 
// internally published before the stream terminates
def makeSafePublisher[A]: Stream[F, MyPublisher] {
  val queue = Queue.boundedNoneTerminated[F, A](maxSize)
  val internalPublisher = new MyPublisher[A] {
    override def publish(in: A) = queue.enqueue1(in)
  }

  val internalPublish = Stream.emit(internalPublisher).onComplete(queue.enqueue1(None))
  val externalPublish = queue.dequeue.evalMap(publishToKafka)

  internalPublish concurrently externalPublish

However, it seems the system can shutdown while elements remain in the internal queue. What is the safest way to implement makeSafePublisher that will ensure the internal queue is fully drained?

Or, reasonable alternative approach
Gavin Bisesi
@Daenyth
hm, that reminds me of this conversation the other day: https://gitter.im/functional-streams-for-scala/fs2?at=5e849fd48bb4966bc49bf03b
Daniel Robert
@drobert
Thanks @Daenyth
Daniel Robert
@drobert
@hudclark in your working example, what is stop and what manages its lifecycle?
Daniel Robert
@drobert
actually, I think the suggestion of merge rather than concurrently is what I'm looking for
Ukonn Ra
@UkonnRa
Hi all. New to fs2 and having a question. Why fs2.Stream does not implement Traverse like other Collection classes? Say I have the same situation in https://stackoverflow.com/questions/53806418/chaining-a-number-of-transitions-with-the-state-monad
But the question is, fs2.Stream has no traverse, so the answer does not work in my situation
Gavin Bisesi
@Daenyth
Traverse requires the ability to go from F[A] to A, and Stream can't do that in general
You'll notice that most effect types lack such an instance, any IO monad can't because it represents a non-referentially transparent computation
Stream has /analogous/ operations to traverse, like evalMap and others, but the "leave stream" part needs to go through the .compile step
It's possible that Stream for F=Pure could have such an instance
but any general F couldn't
Ukonn Ra
@UkonnRa
So the only solution is to compile the Stream to a list if I want to "traverse" a Stream?
Michael Pilquist
@mpilquist
There's likely a better way to address the use case that stays within the world of streams. Could you describe more of your use case?
Fabio Labella
@SystemFw
@UkonnRa look into evalMap as well
Gavin Bisesi
@Daenyth
yeah, s.compile.toList is the appropriate way to get a list from a stream
Fabio Labella
@SystemFw

It's possible that Stream for F=Pure could have such an instance

Btw no, it couldn't. The issue is not effects (only), it's that a stream can be infinite

Ukonn Ra
@UkonnRa
My situation is just like the A code example part from the link. I have a StateT Stream and I want to chain them up
Fabio Labella
@SystemFw
evalMap should work
btw with Stream you can have stateful computations that don't require the use of StateT, but there is nothing wrong with doing what you're doing at first, so definitely keep exploring :)
Ukonn Ra
@UkonnRa
Yes, evalMap works, but in the StateT situation, the evalMap is a little ugly
Fabio Labella
@SystemFw
why?
genuine question
Gavin Bisesi
@Daenyth
@SystemFw aha I forgot that
Fabio Labella
@SystemFw
it should literally be the difference between
yourList.traverse(stateComputation) and yourStream.evalMap(stateComputation)
@Daenyth in fairness, the same can be said of compile.list :P
Gavin Bisesi
@Daenyth
yeah
but it's at least used in a monomorphic context
the traverse instance means that code could be calling traverse not knowing F=Stream and assuming finite-ness
and it kind of depends on your philosophical stance whether returning Nothing for a method impl satisfies the contract or not
Ukonn Ra
@UkonnRa
Well, let me try evalMap first, maybe I misunderstood something