Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Jan 31 2019 04:19
    404- forked
    404-/fs2
  • Jan 31 2019 03:01
    SethTisue commented #1232
  • Jan 30 2019 17:22
  • Jan 30 2019 13:45
  • Jan 30 2019 10:48
    pchlupacek commented #1406
  • Jan 30 2019 10:47
    pchlupacek commented #1406
  • Jan 30 2019 10:39
    pchlupacek commented #1407
  • Jan 30 2019 09:58
    lJoublanc commented #870
  • Jan 30 2019 09:42
    vladimir-popov commented #1407
  • Jan 30 2019 08:10
    vladimir-popov closed #1407
  • Jan 30 2019 08:10
    vladimir-popov commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 18:57
    SystemFw commented #1406
  • Jan 29 2019 17:47
    pchlupacek commented #1406
  • Jan 29 2019 17:42
    pchlupacek commented #1406
  • Jan 29 2019 17:39
    pchlupacek commented #1407
  • Jan 29 2019 17:39
    vladimir-popov edited #1407
  • Jan 29 2019 17:38
    vladimir-popov commented #1406
  • Jan 29 2019 17:37
    pchlupacek commented #1406
Paulius Imbrasas
@CremboC

yeah I'm trying a mapAccumulate that returns a Some when it's done for that "batch", and basically I do a

                    .collect {
                      case (_, Some(v)) => v
                    }

afterwards

hmm issue is I still need to to output even if my condition is never satisfied..
Fabio Labella
@SystemFw
output what and when?
Napas (Tian) Udomsak
@tian000

In this example:

import fs2._
import fs2.concurrent._
import cats.effect.{ConcurrentEffect, ContextShift, IO}

type Row = List[String]

trait CSVHandle {
  def withRows(cb: Either[Throwable,Row] => Unit): Unit
}

def rows[F[_]](h: CSVHandle)(implicit F: ConcurrentEffect[F], cs: ContextShift[F]): Stream[F,Row] =
  for {
    q <- Stream.eval(Queue.unbounded[F,Either[Throwable,Row]])
    _ <- Stream.eval { F.delay(h.withRows(e => F.runAsync(q.enqueue1(e))(_ => IO.unit).unsafeRunSync)) }
    row <- q.dequeue.rethrow
  } yield row

Why do we do F.runAsync(q.enqueue1(e))(_ => IO.unit).unsafeRunSync vs just doing q.enqueue1(e)

This is taken from the guide
Jakub Kozłowski
@kubukoz
I believe that will enqueue but not wait in case the queue is full
to be honest I'm not sure that's an ideal solution... it depends on whether the callback in withRows can be triggered before the previous one finishes processing
if it can - I'd actually block the thread, so q.enqueue1(e).toIO.unsafeRunSync()
if it can't - I think it's fine to runAsync
it boils down to having up to one background wait vs an unlimited amount of background waiting
Christopher Davenport
@ChristopherDavenport
Cormorant lets you stream your csv’s directly, which may be easier to work with than doing that all yourself.
Napas (Tian) Udomsak
@tian000
@kubukoz Does it make sense to do q.enqueue1(e).unsafeToFuture?
If i want unlimited amount of background waiting when the queue is full
the unsafeRunSync at the end of F.runAsync(q.enqueue1(e))(_ => IO.unit).unsafeRunSync would still block the thread wouldn't it?
Fabio Labella
@SystemFw
ok wait
first question, why not just q.enqueue1: because that's pure and doesn't do anything , and the shape of withRows expects => Unit with side-effects
so you have to use an unsafe operation
then question number 2: yes it does make sense to use unsafeToFuture if you prefer, that example predates the existence of unsafeToFuture

F.runAsync(q.enqueue1(e))(_ => IO.unit).unsafeRunSync would still block the thread wouldn't it?

only until the first async boundary, but again when this example was written toIO and unsafeToFuture didn't exist, that's why it's written with runAsync

Napas (Tian) Udomsak
@tian000
got it, thanks!
Jakub Kozłowski
@kubukoz
I suppose we could update that example (and also mention that sometimes it might be better to block the thread)
Bob Glamm
@glammr1
noob question: I want to pull from a queue. If the queue is empty, wait (e.g.) 20 seconds for the next queue polling operation; otherwise, pull as many elements from the queue while they are available, then when all elements have been consumed, wait 20 seconds for the next polling operation again. I could model this as a Stream of Streams with the outer one using fixedDelay and the inner one constructed from the available queue items, or is this better modeled as a single-level Stream via flatMap ?
Gavin Bisesi
@Daenyth
I think it would be pretty clean as a Pull
You can build it on top of tryDequeue1
I was planning a very similar approach for prioritized merge of multiple queues
Bob Glamm
@glammr1
ahh, I have some reading to do then, thanks
Gavin Bisesi
@Daenyth
maybe unfoldEval
Fabio Labella
@SystemFw
I'm not sure you need a queue. A Ref of Vector should be enough. A large part of having a queue is about being notified of new elements, which you don't need here (just get stuff every 20 secs)
Bob Glamm
@glammr1
Well, conceptually I would like to present to the consumer of this system "here is an indefinite stream of elements", but mechanically within the system I'd like the behavior I described above. (Also, the source queue that feeds into this stream == SQS in this instance.)
Jakub Kozłowski
@kubukoz
OTOH if you use a bounded fs2 queue, you'll get backpressure on enqueues
Christopher Davenport
@ChristopherDavenport
Is there any way to activate stream finalizers without evaluating the entire body?

Like in the DefaultHead middleware, we return the body as resp.copy(body = resp.body.drain) but if that response was several gigabytes in size we would pull all that data through the app just for those finalizers. Would be nice to get all the finalizers onto some other empty body, that way we don’t process information we know we don’t care about just for finalizers.

Not sure if that makes sense, but I was thinking about it when looking at middlewares today.

Fabio Labella
@SystemFw
interrupt the stream?
Christopher Davenport
@ChristopherDavenport
private[this] def drainBody[G[_]: Concurrent](response: Response[G]): Response[G] =
    response.copy(body = response.body.interruptWhen[G](Stream(true)).drain)
Means I need a concurrent restriction but I think that is better than pulling a whole body we don’t need
Bob Glamm
@glammr1
Hmm, I thought I could construct an infinite stream by recursion, but it appears the stream is constructed eagerly in that case. How do I make a stream pull lazily/on-demand?
For instance, the following will generate the stream but the consumer will never run on a call to receiveAsStream (with the usual call to evalMap(...).compile.drain.unsafeRunSync):
  def receive(): ReaderT[F, SqsEnvironment, ReceiveMessageResponse] =
    AwsAsyncUtils.request(createReceiveRequest())(receive _)

  private def receiveStreamMapper(response: ReceiveMessageResponse): Stream[F, Message] =
    Stream.chunk(Chunk.buffer(response.messages().asScala)).covary[F]

  def receiveAsStream(): ReaderT[F, SqsEnvironment, Stream[F, Message]] =
    receive().map(receiveStreamMapper _)
      .flatMap { stream => receiveAsStream().map(stream ++ _)
Ethan
@esuntag
I'm looking at fs2-aws, it seems that the S3 connector provides a Stream[F, Byte]. I'd like to get a newline separated string stream, what's the best way to go about that?
My initial thought was to use Pull and glom the bytes until newline, then emit, but not sure if that's the best way
Fabio Labella
@SystemFw
@esuntag
.through(text.utf8Decode)
.through(text.lines)
@glammr1 you are returning a Stream, not evaluating it
try building it this way first
def receiveAsStream(env: SqsEnvironment): Stream[F, Message]
especially if you're not sure about the differences between
  • ReaderT[F, Env, Stream[F, Message]
  • ReaderT[Stream[F, *], Env, Message]
  • Stream[ReaderT[F, Env, *], Message
Fabio Labella
@SystemFw
I would also (in general) see if you actually need ReaderT in every function (a la Haskell) vs just building a class with SqsEnvironmentin its constructor
Bob Glamm
@glammr1
I think exploring the differences in those types will be a useful exercise to me. Thanks.
And yes, my SqsOps class needs SqsEnvironment; the methods do not need Kleisli or ReaderT
Fabio Labella
@SystemFw
that will probably fix your issue with Stream as well
also, for the logic itself
I think just Stream.repeatEval(receive).map(receiveStreamMapper) should do
(I also suspect you can go to Stream.emits rather than chunk(buffer(...)) but depends what type you get from the java api)
Bob Glamm
@glammr1
I think I'll go through the exercise, regardless, but if I kind of understand the difference between returning and evaluating a stream then the last suggestion seems logical to me
And I think you're likely right about Stream.emits. I was pulling from the TCP socket code in fs2.io
Anthony Cerruti
@srnb_gitlab

So I have this architecture where I have a ton of streams of events
Those come in to:

  • 1 stream of diffs that apply to all connections
  • n streams of diffs that apply to n connections

Those diffs are composable; I want to merge those into n streams that output 1 diff every 20th of a second
The thing is, I don't want those final n streams to ever miss their 20th of a second, i.e. I want them to run on an isolated thread to all the others
So even if the original ton of streams don't work at the speed they were meant to, I never stop outputting diffs every 20th of a second