Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Feb 01 10:11
    @SystemFw banned @Hudsone_gitlab
  • Jan 31 2019 04:19
    404- forked
    404-/fs2
  • Jan 31 2019 03:01
    SethTisue commented #1232
  • Jan 30 2019 17:22
  • Jan 30 2019 13:45
  • Jan 30 2019 10:48
    pchlupacek commented #1406
  • Jan 30 2019 10:47
    pchlupacek commented #1406
  • Jan 30 2019 10:39
    pchlupacek commented #1407
  • Jan 30 2019 09:58
    lJoublanc commented #870
  • Jan 30 2019 09:42
    vladimir-popov commented #1407
  • Jan 30 2019 08:10
    vladimir-popov closed #1407
  • Jan 30 2019 08:10
    vladimir-popov commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 18:57
    SystemFw commented #1406
  • Jan 29 2019 17:47
    pchlupacek commented #1406
  • Jan 29 2019 17:42
    pchlupacek commented #1406
  • Jan 29 2019 17:39
    pchlupacek commented #1407
  • Jan 29 2019 17:39
    vladimir-popov edited #1407
  • Jan 29 2019 17:38
    vladimir-popov commented #1406
Paul Snively
@paul-snively
@sbuzzard: Um, OK. I would like to go on record as being shocked that "little used" was considered sufficient reason to remove it without replacing it.
That makes it instantly extremely difficult to recommend fs2 3.x for many use-cases I actually have.
Swoorup Joshi
@Swoorup
How do you have implicit of Handle here?
import fs2.*
import cats.mtl.*
import cats.mtl.implicits.*
import cats.data.*

def testStreamAE[F[_]](using AE: Handle[F, String]): Stream[F, Int] = Stream.force { AE.raise("sdsd") }
type Bla[A] = EitherT[IO, String, A]
val testErrors = testStreamAE[Bla]
testErrors.compile.toList.value.unsafeRunSync() // fine
testErrors.handle[String](x => 99999) // missing implicit `Handle[([O] =>> fs2.Stream[App.this.Bla, O]), String] `
I am bit confused how do I achieve it
Swoorup Joshi
@Swoorup
How to complete this?
given [F[_]: Monad, E, O]: Handle[[O] =>> Stream[F, O], E] with
  type StreamF[O] = Stream[F, O]

  val applicative: Applicative[StreamF] = 
    ???
  def raise[E2 <: E, A](e: E2): Stream[F, A] = 
    ???
  def handleWith[A](fa: Stream[F, A])(f: E => Stream[F, A]): Stream[F, A] =
    ???
Swoorup Joshi
@Swoorup
trying to have an alternate Error Channel
Swoorup Joshi
@Swoorup
this is the furthest I have been.
given [F[_]: Monad, E, O](using AE: Handle[F, E]): Handle[[O] =>> Stream[F, O], E] with
  type StreamF[O] = Stream[F, O]

  val applicative: Applicative[StreamF] = fs2.Stream.monadInstance

  def raise[E2 <: E, A](e: E2): Stream[F, A] = 
    Stream.eval(AE.raise[E2, A](e))

  def handleWith[A](fa: Stream[F, A])(f: E => Stream[F, A]): Stream[F, A] =
    // fa.handleErrorWith
    // fa.pull.handle(f).as
    // (Pull.scope(fa.pull).handleErrorWith(e => f(e).pull)).
    // fa.pull.handleErrorWith()
    // ???
Pyry-Samuli Lahti
@Pyppe

Hi!

Asking for tips about fs2 performance with http4s here as well (https://gitter.im/http4s/http4s?at=610bb2182453386b6c35ec21).

So we've got a Publisher[String] (reactivestream backed by monix, reading data from Postgres DB) having about 200k items. If I consume it eagerly on my laptop, it takes about 20s.

However, if I try to consume it thru http4s server via curl (Publisher converted to FS2 Stream via fs2.interop.reactivestreams.fromPublisher) it takes almost 10x longer (~160s). Any ideas why does it become so slow? Is there something to be done to improve the speed? We are currently using fs2 version 2.5.9. Thanks!

Andrew Roberts
@aroberts

I’m hoping someone can point out what I’m doing wrong here: I’m using file.watch in FS2 2.5.3 to consume filesystem events. I’m also generating the events, via a shell script that writes to files via >> in a bash script, which itself is going through layers of docker bind mounting and other filesystem stuff. I found when I actually deployed to production, I was getting Created and Modified events pointing to the same file (didn’t see on my dev machine), which isn’t shocking- I know it’s JNI or whatever under the covers, and it’s totally reasonable to have FS-dependent behavior in that context

What’s tripping me up is that my input stream is the file.watch stream, mapped to just the path, and then passed through changesBy(_.toString). I have verified that the paths have the same toStringrepresentation. Why aren’t these duplicate events being filtered out? Am I misunderstanding the API?

Joseph Denman
@JosephDenman

I'm creating a client for an application I'm writing using FS2 IO and I'm trying to nail down the behavior of Socket[F].reads with signature:

def reads(maxBytes: Int, timeout: Option[FiniteDuration] = None): Stream[F, Byte]

does produce a Stream of length maxBytes or does it just read maxBytes bytes repeatedly?

Adam Rosien
@arosien
@JosephDenman i believe that is the maxBytes read from the socket at a time, usually put into a single Chunk of the Stream
Joseph Denman
@JosephDenman
@arosien Thanks. It would make sense that you'd just receive one continuous stream of bytes from the socket. If that weren't the case and you want to decode the bytes into some sort of message (which is what I'm trying to do) then you'd have to manually store the bytes after each read until you could construct the message, which is very tedious. Can anyone else corroborate what Adam said?
Adam Rosien
@arosien

most socket APIs are pull based: you have to ask for bytes in order to read. fs2 Stream is also pull-based: to decode a message that spans more than element of the stream, you need to maintain state.

libraries like https://github.com/scodec/scodec-stream allow you to define codecs that can be built from streams

they use tricks to avoid copying, like ByteVector, etc.
Adam Rosien
@arosien
you build a function that goes from Stream[F, Byte] to Stream[F, Message]
Joseph Denman
@JosephDenman
Ok. Thanks.
Andrew Roberts
@aroberts
for the record, refactoring my stream from a for-comprehension to direct .flatmap() etc calls fixed my issue- not clear why, but it’s possible I was somehow calling changesBy on the wrong stream instance or something like that
Joseph Denman
@JosephDenman

So I'm writing a client that submits a request to a server, receives a bunch of additional queries from the server until it finally receives a result from the server that it returns. I've collected the server queries into a stream, but now I need a way to pull the queries one by one and process them until I receive the result. I gather I'm supposed to use Pull, but I've reached a roadblock here:

def loop(queryStream: Stream[F, Query]): Pull[Task, Result, Unit] = 
    queryStream.pull.uncons1 >>= {
        case Some((query, tail)) =>
            Pull.eval(process(query)) >>= {
                case Some(result) => Pull.output1(result)
                case None => loop(tail)
           }
        case None => ??? // error stream ended before result was reached
    }

def process(query: Query): Task[Option[Result]] = ???

The above code doesn't compile. There's an error at the second >>= because >>= takes a Pull[F, A, B] to a Pull[F, A, C] given a B => Pull[F, A, C]. Does anyone know of a more natural way to express this scenario?

Or in general how to convert a Pull[F, INothing, A] into a Pull[F, A, Unit]?
Adam Rosien
@arosien
@JosephDenman unfoldLoopEval looks like a combinator that can do what you want (you can look at the impl also)
def unfoldLoopEval[F[_], S, O](s: S)(f: S => F[(O, Option[S])]): Stream[F, O] =
    Pull
      .loop[F, O, S](s =>
        Pull.eval(f(s)).flatMap { case (o, sOpt) =>
          Pull.output1(o) >> Pull.pure(sOpt)
        }
      )(s)
      .stream
Alex
@CodeCombiner
  def withTopicPar[F[_]: Concurrent]: Stream[F, String] = for {
    topic   <- Stream.eval(fs2.async.topic("Topic start!"))
    publish = Stream.emit("1").repeat.covary[F].to(topic.publish)
    subscribe = topic.subscribe(maxQueued = 10)
    element <- subscribe concurrently publish
  } yield element

This works as expected because im running two streams in parallel and using the topic as a point of integration between them

Hi, please advice where can I find example of topic.subscribe for fs2 3.xx. The example above seems not working anymore because .to(topic.publish) requires LiftIO now. My main goal is to make stream publishing accessible for multiple subscribers.

Sam Desborough
@desbo
hi all, if I have an infinite Stream[F, A] and I want to periodically access the latest value in the stream as an F[A], what would you suggest?
It seems I could have the stream update a Ref[A] and run this in the background, accessing the Ref when I need a value but I'm curious to know if there's a simpler way. thanks!
martijnhoekstra
@martijnhoekstra:matrix.org
[m]
Going through a Ref seems the way to go. myStream.evalTap(myRef.set)?
most conversation moved to discord btw, see topic
Sam Desborough
@desbo
oops I missed that, thanks!
Dawid Furman
@dfurmans
Hi!
We have struggling with an IOException error provoked by sun.nio.ch.WindowsAsynchronousSocketChannelImpl - comes from write0 a native method from Windows. We are using java.net.InetSocketAddress for constructing our TCPSocket as a Resource[IO, Stream[IO, Unit]]
This error could be reproduced only on Windows machine, hence a question - is there any special configuration need or something specific for Windows env ? Thanks in advance!
Stephen Judkins
@stephenjudkins
I'm looking for an FS2-compatible websocket client. it appears that sttp may be my only up-to-date choice? any other suggestions?
2 replies
braginxv
@braginxv

Hi guys! Earlier it was discussed here how to merge sorted streams. I've attempted to implement my own solution that issue and encountered with some magic obstacle. Let's look at snippet

// inside in IOApp

  override def run(args: List[String]): IO[ExitCode] = {
    implicit val dataRowOrder: Ordering[DataRow] = Ordering.by[DataRow, Long](_.groupId)

    val s1: Stream[IO, DataRow] = dataStream[IO](1, genData1)
    val s2: Stream[IO, DataRow] = dataStream[IO](2, genData2)
    val s3: Stream[IO, DataRow] = dataStream[IO](3, genData3)

    // Works fine using all combinations (s1, s2), (s2, s3), (s3, s1)
   // mergeOrderedStreams(s3, s2).foreach(Console[IO].println).compile.drain.as(ExitCode.Success)

    // only traverses through the last stream s3 and takes the first element emitted 
    // by the result of the previous streams merging s1 and s2, as though the merged stream had already been traversed
    mergeOrderedStreams(s1, s2, s3).foreach(Console[IO].println).compile.drain.as(ExitCode.Success)
  }

  def mergeOrderedStreams[F[_], A](streams: Stream[F, A]*)(implicit ordering: Ordering[A]): Stream[F, A] = {
    streams.reduce(mergePairStreams(_, _))
  }

  def mergePairStreams[F[_], A](leftStream: Stream[F, A], rightStream: Stream[F, A])
    (implicit ordering: Ordering[A]): Stream[F, A] = {
    def go(left: StepLeg[F, A], right: StepLeg[F, A]): Pull[F, A, Unit] = {
      val leftHeadChunk = left.head
      val rightHeadChunk = right.head

      val traversePull = Pull.loop[F, A, StepLeg[F, A]](leg => Pull.output(leg.head).flatMap(_ => leg.stepLeg))

      (leftHeadChunk.head, rightHeadChunk.head) match {
        case (Some(leftHead), Some(rightHead)) =>
          if (ordering.lteq(leftHead, rightHead)) {
          Pull.output1(leftHead) >> go(left.setHead(leftHeadChunk.drop(1)), right)
        } else {
          Pull.output1(rightHead) >> go(left, right.setHead(rightHeadChunk.drop(1)))
        }

        case (Some(_), None) => traversePull(left)

        case (None, Some(_)) => traversePull(right)

        case (None, None) =>
          Pull.done
      }
    }

    leftStream.pull.stepLeg.flatMap { maybeLeftSource =>
      rightStream.pull.stepLeg.flatMap { maybeRightSource =>
        (maybeLeftSource, maybeRightSource) match {
          case (Some(leftSource), Some(rightSource)) => go(leftSource, rightSource)

          case (Some(leftSource), None) => Pull.output(leftSource.head) >> leftSource.stepLeg

          case (None, Some(rightSource)) => Pull.output(rightSource.head) >> rightSource.stepLeg

          case (None, None) => Pull.done
        }
      }
    }.void.stream
  }

My solution works correctly when a pair of streams is merged, but doesn't work when merging of stream and previous merging result is proceeded, eg. mergePairStreams(s1, mergePairStreams(s2, s3)). What is difference between stream and Pull turned into stream? Does stream Scope matter? The result of the previous merging is somehow reduced to single element.

Is there more detailed article/docs about the merging streams? I've found nothing in the official documentation and found out about it only in source code. What is for instance a Stream Scope?

Adam Rosien
@arosien

@braginxv all streams are implemented as pulls.

i suspect if merging 3 streams doesn't work but 2 does, then there is some fundamental problem with the algorithm used.

i've used streams for a while, but also don't understand what scopes are.

most discussions have moved to discord. maybe we can get some insight there?

braginxv
@braginxv
@arosien thanks, I've joined to discord now
braginxv
@braginxv
@arosien yes, it's my fault, i found an error
Joseph Denman
@JosephDenman

I have a program that connects to a server, receives queries and responds to them until it receives a final message.

socket.receive .evalMap { 
    case Query(query) => socket.send(response).as(none)
    case Result(value) => value.some.pure
}
.compile
.lastOrError
.map {
    case Some(result) => result
    case None => throw new IllegalStateException("Client never received result")
}

I can see that the final result is received, but this computation never completes. I think it's because socket.receive is still waiting for another message. How do I indicate that socket.receive should end when Result is received?

Joseph Denman
@JosephDenman
But I do want to call socket.receive again when the client receive the next request from the application.
Adam Rosien
@arosien
@JosephDenman those goals seem contradictory
socket.receive is a stream, so it invokes your evalMap for everything it receives, until there is nothing left (or upstream closes the connection)
there are methods like endOfInput to signal you the receive should stop, or endOfOutput to signal no more writes will occur
Joseph Denman
@JosephDenman
@arosien That seems to imply it doesn't make sense to call socket.receive twice in the same program. My need to do so is stemming from the fact that we're mixing pure and impure code in our code base while it's being migrated. The point being that the above code snippet is a method that may be called multiple times using the same socket. Are you saying this isn't expressible?
Adam Rosien
@arosien
@JosephDenman oh, i'm confused what socket.receive is. it doesn't appear to be a method in the fs2-io Socket type. but i assumed it returned an fs2 Stream, so i assumed it is the stream of data received.
Adam Rosien
@arosien
you wouldn't call something that produces a stream of responses more than once. in an imperative world, you might invoke some receive that produces one piece of data, and you then "loop" to get more. in a stream scenario, you would invoke receive and get a stream which describes producing all the data; you then describe what you want to do with it when it arrives
Joseph Denman
@JosephDenman
Oh sorry. It should be reads, which as you say returns a stream. I found a solution that uses concurrent queues to marshal the data in and out of the socket.
Adam Rosien
@arosien
:thumbsup:
https://github.com/typelevel/fs2-chat has code that i often copy
pascalsAger
@pascals-ager
I have this use case that I have been trying to solve with fs2.. I have a potentially infinite stream of alphanumeric characters. When I encounter a numeral - n, I want to append the numeral to the following 'n' alphabets..
Stream("3", "a", "b", "c", "4", "a", "b", "c", "d") 

transformed to: 

Stream("3a", "3b", "3c", "4a", "4b", "4c", "4d")
any pointers on what combinators to use?
Adam Rosien
@arosien
@pascals-ager it's going to be some kind of fold where you have state that is the current number with a counter initialized to the same value. if the next element is a letter then you concat it with the letter and decrement the counter. if the next element is a number you reinitialize the state.
Anil Singh
@asjadoun
Our application is having chunk to toList conversion and conversion was working fine prior upgrade to FS 3.1.2 from FS 2.5.9. After upgrade to 3.1.2, chunk to toList conversion is nearly 1000 times slower than fs 2.5.9. Here is the link to code - fs2-vs-fs3-chunks-to-list
Adam Rosien
@arosien
@asjadoun strange. Chunk#toList hasn't changed in many years, so i'm not sure what can explain that.
Adam Rosien
@arosien
@asjadoun if you call compact on each Chunk, the times go back down to 2-5ms:
...
.chunkN(10000)
.map(_.compact)
...