Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Feb 01 10:11
    @SystemFw banned @Hudsone_gitlab
  • Jan 31 2019 04:19
    404- forked
    404-/fs2
  • Jan 31 2019 03:01
    SethTisue commented #1232
  • Jan 30 2019 17:22
  • Jan 30 2019 13:45
  • Jan 30 2019 10:48
    pchlupacek commented #1406
  • Jan 30 2019 10:47
    pchlupacek commented #1406
  • Jan 30 2019 10:39
    pchlupacek commented #1407
  • Jan 30 2019 09:58
    lJoublanc commented #870
  • Jan 30 2019 09:42
    vladimir-popov commented #1407
  • Jan 30 2019 08:10
    vladimir-popov closed #1407
  • Jan 30 2019 08:10
    vladimir-popov commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 18:57
    SystemFw commented #1406
  • Jan 29 2019 17:47
    pchlupacek commented #1406
  • Jan 29 2019 17:42
    pchlupacek commented #1406
  • Jan 29 2019 17:39
    pchlupacek commented #1407
  • Jan 29 2019 17:39
    vladimir-popov edited #1407
  • Jan 29 2019 17:38
    vladimir-popov commented #1406
Matt Hughes
@matthughes
Yes but am I the only one that finds that error message circular?
Fabio Labella
@SystemFw
it makes sense to me actually
FlatMap expects things that take 1 type parameter
in one case you give it something that takes 0, in the other one that takes 2
so you fail the target of 1 both times
more precisely, you give it something of kind * one time, and of kind (* -> *) -> * -> * the other time
you need * -> *
David Flynn
@WickedUk_gitlab
@mpilquist That looks like what I'm after, thanks. I'll try to implement it.
Fabio Labella
@SystemFw
or in the ugly scala syntax, you need F[_], and you give it A one time, and G[_[_], _] the other time
Matt Hughes
@matthughes
So … FlatMap[fs2.Stream[Pure, _]].flatten(???)
Fabio Labella
@SystemFw
does that make any sense? I agree that kind errors in scala aren't the clearest, but the two above are coherent (if a bit cryptic at first)
well, that's conceptually right but that syntax unfortunately means something else in scala
Dotty fixes it, and what you have is how you'd write it in dotty
in scala 2 you need FlatMap[Stream[Pure, *]], where * comes from kind projector
you also see it as FlatMap[Stream[Pure, ?]] in older versions of kind projector
your intuition is correct in that you need to partially apply that type, but in scala 2 an underscore in that position means a type of kind * (so shape A), with an existential parameter (a wildcard type _), which is a massive inconsistency syntax-wise, since at the value level _ does indeed mean "partially apply"
but as I said, that's fixed in dotty
Lucas Satabin
@satabin
Alternatively, you can name the type as in type PureStream[T] = Stream[Pure, T] and then use FlatMap[PureStream] if kind-projector is not an option for you
Matt Hughes
@matthughes
Right. Error message throws me off because it doesn’t mention FlatMap and the “expected” seems to refer back to Stream. Should be: "fs2.Stream[fs2.Pure,Int] takes no type parameters, FlatMap expects: 1"
Fabio Labella
@SystemFw
heh, FlatMap does not expect 1
it expects something that expects 1
Matt Hughes
@matthughes
It expects a type of arity 1, something with 1 type parameter.
right?
Was trying to see if I could even find where this error is constructed in the compiler but can’t.
Fabio Labella
@SystemFw
the definition in terms of arity unfortunately is not precise enough
I don't normally say this, but in this case learning the haskell notation is very useful
thankfully that's pretty straightforward
* (pronounced star or type) is the kind of types that require no extra info to be instantiated, so A, Int, Stream[IO, Int], Stream[F, Int] etc
something of shape F[_] requires a type of kind * (an A), and produces another type of kind * (F[A])
so F[_] has kind * -> *
Either requires two types of kind star and produces types of kind star, so the kind of Either is * -> * -> * (it's curried)
comparing Either with Stream is where a simple description in terms of arity breaks
Stream also requires two type params, but it does not have kind * -> * -> *
since that fits Stream[Int, Int], which is incorrect
instead, it takes two types, where the first has kind * -> *, and the second kind *
so overall Stream has kind (* -> *) -> * -> *
Fabio Labella
@SystemFw
similarly FlatMap expects something of kind * -> * and produces something of kind * (the instance of the FlatMap trait)
so the kind of FlatMap is (* -> *) -> *
so if you say FlatMap expects one, it's ambiguous with * -> *, i.e. trait Foo[A]
when the compiler matches the two types you are passing, it sees Stream[Pure, Int], of kind *, and the F[_] parameter, of kind * -> *
so the kind error is that you expect * -> *, but Stream[Pure, Int] has kind *
or, as the error phrases it, Stream[Pure, Int] takes zero type params, but I expect it to take 1
I agree the phrasing could be improved btw
Matt Hughes
@matthughes
Definitely helps but I’m still seeing stars ;)
casian
@casian
Hi I am currently developing a program that reads messages from a Stream containing a userId and dispatches them to worker streams according to their userId.
Fabio Labella
@SystemFw
ahahah sure, I can try converting some of them to scala syntax, although I find that quite confusing to work with when you go past F[_] (Stream is [_[_], _], for example)
casian
@casian
To do this I'm using queues, but for some reason when I enqueue a message, it does not get dequeued
Fabio Labella
@SystemFw
@casian the most likely scenario is that you are using two different queues when you think you're sharing one
casian
@casian
This is a snippet of my code:
private def messageStreamProcessor(queues: Map[Int, Queue[F, PlatformMessageStreamedDelivery[F]]]): fs2.Pipe[F, PlatformMessageStreamedDelivery[F], Unit] = {
    val numberOfProcesses = queues.values.size
    in => in
      .evalMap((streamedDelivery: PlatformMessageStreamedDelivery[F]) => streamedDelivery.delivery match {
        case d:Delivery.Ok[PlatformMessageTo] =>
          queues((d.body.userId % numberOfProcesses).toInt).enqueue1(streamedDelivery)
        case msg: Delivery.MalformedContent =>
          helper.rejectInvalidMessage(streamedDelivery, s"Malformed MESSAGE RECEIVED: $msg.") *> ().pure[F]
      })
  }

private def mergeAllStreams(streams:List[fs2.Stream[F, StreamedResult]], mergedSteam: fs2.Stream[F, StreamedResult]) : fs2.Stream[F, StreamedResult] =
    streams match {
      case Nil => mergedSteam
      case _ => mergeAllStreams(streams.tail, streams.head.merge(mergedSteam))
    }

  def mergedStream(numberOfProcesses:Int): F[fs2.Stream[F, Any]] = {
    val queues: F[List[Queue[F, PlatformMessageStreamedDelivery[F]]]] = (0 until numberOfProcesses).map(_ => Queue.unbounded[F,PlatformMessageStreamedDelivery[F]]).toList.sequence
    for {
      queueMap <- queues.map(_.mapWithIndex((q,n) => n -> q).toMap)
      streams <- queues.map(_.mapWithIndex((q,n) => q.dequeue.through(queueProcessor(n))))
      platform <- platformClient.messageConsumer.deliveryStream.through(messageStreamProcessor(queueMap)).pure[F]
      merged <- platform.merge(mergeAllStreams(streams,fs2.Stream.empty)).pure[F]
    } yield merged
  }