Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Jan 31 2019 04:19
    404- forked
    404-/fs2
  • Jan 31 2019 03:01
    SethTisue commented #1232
  • Jan 30 2019 17:22
  • Jan 30 2019 13:45
  • Jan 30 2019 10:48
    pchlupacek commented #1406
  • Jan 30 2019 10:47
    pchlupacek commented #1406
  • Jan 30 2019 10:39
    pchlupacek commented #1407
  • Jan 30 2019 09:58
    lJoublanc commented #870
  • Jan 30 2019 09:42
    vladimir-popov commented #1407
  • Jan 30 2019 08:10
    vladimir-popov closed #1407
  • Jan 30 2019 08:10
    vladimir-popov commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 18:57
    SystemFw commented #1406
  • Jan 29 2019 17:47
    pchlupacek commented #1406
  • Jan 29 2019 17:42
    pchlupacek commented #1406
  • Jan 29 2019 17:39
    pchlupacek commented #1407
  • Jan 29 2019 17:39
    vladimir-popov edited #1407
  • Jan 29 2019 17:38
    vladimir-popov commented #1406
  • Jan 29 2019 17:37
    pchlupacek commented #1406
Fabio Labella
@SystemFw
@nikiforo I would advise not to think about Pipe too much
a pipe is just a type synonym for a function, and it's only there for brevity
Gabriel Claramunt
@gclaramunt
anyone here managed to covert from grpc's StreamObserver to fs2 ?
mn98
@mn98
@Daenyth @SystemFw I dug further into my 'deadlock' and it turns out I'm running out of heap. I increased the heap and it took longer to grind to a halt. I then tested Stream(streams: _*).parJoinUnbounded vs SortMerge.sortMerge(streams). The former works fine while the latter gradually runs out of heap. @johnynek helpfully shared the SortMerge gist on this forum a while back and it works great apart from the heap issue.
Is this behaviour expected? Any thoughts on why the sortMerge gradually eats up the heap?
Fabio Labella
@SystemFw
@mn98 mm no, I'd say that is a potential memory leak
mn98
@mn98
@SystemFw Ok, I have a quick repro now, running with a 15MB heap. I'm attempting to make sense of the heap dump in VisualVM, but this is not exactly my forte...
mn98
@mn98
image.png
image.png
Could it be related to cats.collection.Heap holding on to the data? Or at least the way it's being used in the sortMerge somehow?
mn98
@mn98
Here's a repro which can be run locally (not in Scastie because it will time-out).
Any tips/thoughts/hints/links/whatever are very welcome to help me out of this hole!
mn98
@mn98
Here is the 'dominator' of those objects in the heap dump (I'm learning as I go here...)
image.png
rnd4222
@rnd4222_gitlab
Must be pretty expensive, considering amount of $$$ in the heap
Artem Nikiforov
@nikiforo
Hi,
typelevel/fs2#2158
This MR is about lines pipe. It doesn't break current property-based tests, retains the throughput, and I think that it also simplifies the implementation.
Michael Pilquist
@mpilquist
cool, added a couple comments
Matthew Dornfeld
@mdornfe1_gitlab
Is there a way to get a stream to execute inside of a blocking thread pool using the Blocker monad? I have a data stream that writes elements to a map with a max size and blocks until that map has space available. I have something working with the IO monad, but this seems like a situation that's appropriate for Blocker. val blockingStream: Stream[IO, IO[Unit]] = dataStream.evalMap(item => IO { populateBlockingMap(item) })
Is there a way to modify this to use Blocker?
Fabio Labella
@SystemFw
blocker.delay { populateBlockingMap(item) }
Matthew Dornfeld
@mdornfe1_gitlab
@SystemFw So something like this? val blockingStream: Stream[IO, IO[Unit]] = Stream.resource(Blocker[IO]).flatMap { blocker => dataStream.evalMap(item => blocker.delay { populateBlockingMap(item) }) }
Fabio Labella
@SystemFw
yeah, although you need to decide whether you want to share the blocker with other things (in which case pass it as an argument), or create one just for this stream (what your code does)
Matthew Dornfeld
@mdornfe1_gitlab
Let's say dataStream consists of 10 concurrently run merged streams. I want each one to run concurrently using the blocking thread pool. Will the above accomplish that?
What's the difference between declaring the Blocker using Stream.resource and doing it like this? val blockingStream: Stream[IO, IO[Unit]] = dataStream.evalMap(item => Blocker[I0].use{ blocker => blocker.delay { populateBlockingMap(item) })}
Fabio Labella
@SystemFw
yeah that creates a blocker per item, rather than create a blocker per stream
ideally you don't want to do either, and create the blocker once, flatmapping the same blocker to all the streams
Matthew Dornfeld
@mdornfe1_gitlab
Michael Pilquist
@mpilquist
Hi! The comment on that question has got it right -- the switch value creates a new ref each time you evaluate it -- i.e., each time you call unsafeRunSync. To fix, you need to evaluate switch once and use the result in both the call to interruptWhen and the call to .set(true)
Fabio Labella
@SystemFw
You are creating two different signals
Create only one, and flatmap it to both places that need it
Michael Pilquist
@mpilquist
Also, all those unsafeRunSync calls in program should be removed
Fabio Labella
@SystemFw
And don't call unsafe* anywhere
I have a talk called Shared State in pure fp that explains this at length, but the gist is the above
Ferdy Moon Soo Beekmans
@fmsbeekmans
Hi, is there a reason a topic can't be created empty?
Gavin Bisesi
@Daenyth
Would be good to also note the answer here: typelevel/fs2#1330
mn98
@mn98

@SystemFw @Daenyth I fixed the memory issue with sortMerge. I pulled the method apart and, using typelevel/cats-effect#401 as a clue, I replaced the for comprehension with >> and it works fine now. I replaced this:

            for {
              _ <- Pull.output1(sl.head(0))
              nextSl = sl.setHead(sl.head.drop(1))
              nextHeap = rest.add(nextSl)
              _ <- go(nextHeap)
            } yield ()

with this:

            Pull.output1(sl.head(0)) >>
              go(rest.add(sl.setHead(sl.head.drop(1))))

@johnynek, perhaps you'd like to update your gist similarly, or I could leave a comment if you prefer?

Fabio Labella
@SystemFw
oh, that's actually very common, but it didn't occur to me
basically don't use for with Pull, or use better-monadic-for
mn98
@mn98
I'm just very pleased to have that resolved and I've learnt a little along the way! With sortMerge followed by groupAdjacentBy I now have the "merge N streams" I was rambling on about several weeks back.
If there's any material you'd recommend that would help me better understand the issues with for in this context, it would be great if you could point me to it.
Fabio Labella
@SystemFw
yield () add map(_ => ()) on every iteration of the recursive function
that garbage accumulates
that's it really
mn98
@mn98
Thanks!
And on the topic @mdornfe1_gitlab brought up, I appreciate that the received wisdom says use a single Blocker throughout a program, but what are the reasons behind that? What are the benefits of a single blocker or, conversely, what are the problems of having multiple blockers?
mn98
@mn98
For context, I have multiple blockers in my program currently and I don't perceive any issues. I'd like to understand the above to help me prioritise the work to "move to a single, shared blocker".
Fabio Labella
@SystemFw
The first bit of context is that Blocker is gone in CE3 anyway
There are no huge issues having multiple blockers, it's just a bit of a waste to create multiple threadpools, given that a single blocking pool will create threads as needed
mn98
@mn98
I guess I'll kick that can down the road until I move to CE3 then! :-)
Clearly I have some reading to do, if you have any helpful links...
P. Oscar Boykin
@johnynek
@mn98 ahh I was using better monadic for with that code so I didn’t see any issue. I think it rewrites it to what you have.
mn98
@mn98
@johnynek well, that would make sense, thanks again for that method!
Yotam Hochman
@Yotamho

Hi, trying to troubleshoot why I get an error when using fs2.blobstore.azure.AzureStore I've managed to recreate the error just by using fs2 and the reactor interop package, the code snippet:

  fromPublisher[IO, java.nio.ByteBuffer](bac
    .getBlobContainerAsyncClient(cont)
    .getBlobAsyncClient(path)
    .download() // .download() returns reactor's Flux[ByteBuffer]
  ).map(println).compile.drain.unsafeRunSync

the error message: [reactor-http-kqueue-1] ERROR r.n.channel.ChannelOperationsHandler - [id: 0xf5d1e42e, L:/192.168.1.28:61511 - R:$$$$$$$$$$$$] Error was received while reading the incoming data. The connection will be closed.
Any ideas why is this happening? (I am able to just print the data inside the Flux[ByteBuffer], I get the error just when I try to use fs2.