Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
    Ross A. Baker
    @rossabaker
    I'll fix it I'm a couple hours.
    Ross A. Baker
    @rossabaker
    Jeez, this is annoying.
    It's one module, and I don't see what makes it different. And I ran all those RCs specifically because this is fragile.
    If I flag codegen as an sbt plugin, it seems to work. But it's not. It's related to the cross build matrix somehow.
    Ross A. Baker
    @rossabaker
    Oh, it's only codegen, because the default version is Scala 3. It shows up on all the published modules on ++2.13.5.
    Because I retagged the same commit, locally it thinks it was 1.1.0-RC3 and not 1.1.0. And then I think the logic is falsely assuming 1.1.0-RC3 > 1.1.0.
    Ross A. Baker
    @rossabaker
    Alex Henning Johannessen
    @ahjohannessen
    Thank you for fixing this Ross :) Now 1.1.2 is on scala index :+1:
    Ross A. Baker
    @rossabaker
    :tada: Sorry about the burnt tags. I thought we were clean.
    As I say after all my other release tire fires, we won't run out of numbers.
    Alex Henning Johannessen
    @ahjohannessen
    @rossabaker Seems like something strange is going on with mima and protoc, I have a PR that tries to fix it, but somehow coursier is told to resolve the protoc-gen project. Any idea what might cause it doing that?
    Alex Henning Johannessen
    @ahjohannessen
    @rossabaker I think I have fixed it. Removed protoc-gen project from root aggregation and explicitly defined mimaFailOnNoPrevious := false, mimaPreviousArtifacts := Set() for protoc-gen. However, I have not tried to publish anything as there is nothing new to publish.
    Ross A. Baker
    @rossabaker
    Oh, the NoPublishPlugin is good for this. It sets publish / skip := true and exempts it from mima checks.
    I'm not sure why aggregation would matter.
    Alex Henning Johannessen
    @ahjohannessen
    the protocgen is not normal project, it is an aggregation project that does no have enablePlugins on it
    I am not sure either, but removing it from root aggregate apparently solved that weird resolve error
    Ross A. Baker
    @rossabaker
    The aggregate is the only thing even referencing it.
    Alex Henning Johannessen
    @ahjohannessen
    @rossabaker I have two open PRs with a critical bug fix for 1.x and 0.10.x with regards to back-pressure on pulling streams from a server.
    @rossabaker Never noticed this bug because I have not used the client where server has hundreds of thousands of messages until now - We cannot use the client in production due to heap fills up with memory. I would be grateful if you could do a v0.10.3 with this included.
    Alex Henning Johannessen
    @ahjohannessen
    @rossabaker I can see a potential for optimizing the code a bit by using buffering up to, say, 1000-10000 messages in the queue using some smart strategy on when to request more messages - Not entirely sure what that should be right now. A client consumer can always do prefectch by using a fs2.Stream combinator like prefetchN or groupWithin - However, currently in the PR it effectively means that the queue only contains zero or one element.
    Alex Henning Johannessen
    @ahjohannessen
    @rossabaker Currently the queue just fills up when the server delivers messages as the client just requests more messages as soon a message is enqueued. However, my change might decrease throughput considerably in some cases where server just has a few thousand messages that fit into memory.
    @rossabaker Do you have some idea what we could do? I am thinking perhaps allowing, say, 10000 messages in the queue, and draining that until a lower bound and then request messages again.
    Alex Henning Johannessen
    @ahjohannessen

    @rossabaker I was thinking, perhaps something like this?

    class Fs2StreamClientCallListener[F[_], Response] private (
        request: Int => F[Unit],
        queue: Queue[F, Either[GrpcStatus, Response]],
        dispatcher: Dispatcher[F],
        maxBuffer: Int
    )(implicit F: MonadThrow[F])
        extends ClientCall.Listener[Response] {
    
      private val requestOnPush: F[Boolean] = queue.size.map(s => s < maxBuffer)
      private val requestOnPull: F[Boolean] = queue.size.map(_ == 0)
      private val requestOneIfNeeded: F[Boolean] => F[Unit] = _ >>= request(1).whenA
    
      override def onMessage(message: Response): Unit =
        dispatcher.unsafeRunSync(requestOneIfNeeded(requestOnPush) *> queue.offer(message.asRight))
    
      override def onClose(status: Status, trailers: Metadata): Unit =
        dispatcher.unsafeRunSync(queue.offer(GrpcStatus(status, trailers).asLeft))
    
      def stream: Stream[F, Response] = {
    
        val run: F[Option[Response]] =
          queue.take.flatMap {
            case Right(v) => v.some.pure[F] <* requestOneIfNeeded(requestOnPull)
            case Left(GrpcStatus(status, trailers)) =>
              if (!status.isOk) F.raiseError(status.asRuntimeException(trailers))
              else none[Response].pure[F]
          }
    
        Stream.repeatEval(run).unNoneTerminate
      }
    }
    
    object Fs2StreamClientCallListener {
    
      private[client] def apply[F[_]: Concurrent, Response](
          request: Int => F[Unit],
          dispatcher: Dispatcher[F],
          maxBuffer: Int
      ): F[Fs2StreamClientCallListener[F, Response]] =
        Queue.unbounded[F, Either[GrpcStatus, Response]].map { q =>
          new Fs2StreamClientCallListener[F, Response](request, q, dispatcher, maxBuffer)
        }
    }

    So it goes something like this:

    • listener is started entails request(1)
    • onMessage sees that we can request more upto max buffer
    • onMessage stops requesting more when it hits max buffer and stops requesting more messages.
    • pulling the queue keeps serving downstream until it drains no message left in the buffer and then requests one message.

    There are probably some edge cases around when the initial call is made and having a slow server, i.e. requesting new messages onMessage and on pull, but in the grander scheme I think that is a reasonable approach compared to the overload that happens today with taking no account to pull's.

    WDYT?

    @rossabaker I know there are issues with the server stuff wrt. back-pressure. I know what can be done, but it needs some baking. This issue is more manageable as it is just a matter of when we request more messages from a well-behaved server that does not flood us unless told.
    Alex Henning Johannessen
    @ahjohannessen
    However, all things considered, I think if we merge those two PRs we can state that one can optimize by using fs2.Stream like groupWithin or prefetchN (until we get some sort of optimization in place)
    Ross A. Baker
    @rossabaker
    v0.10.3 is released
    Alex Henning Johannessen
    @ahjohannessen
    Thanks @rossabaker :) I will look into getting some kind of buffering back into the queue.
    Alex Henning Johannessen
    @ahjohannessen
    @rossabaker I put v0.10.3 into production with groupWithin as a way to always have ready data. Seems to work great :)
    Ross A. Baker
    @rossabaker
    :tada:
    Alex Henning Johannessen
    @ahjohannessen
    @rossabaker What do you think about adding yourkit sponsorship acknowledgement to readme? typelevel/fs2-grpc#365
    Alex Henning Johannessen
    @ahjohannessen
    @rossabaker I’ll go ahead and merge it as I see the same thing on http4s and cats-effect, so I assume it is an accepted thing to do.
    Ross A. Baker
    @rossabaker
    Yeah, :+1:
    Alex Henning Johannessen
    @ahjohannessen
    Great :)
    Alex Henning Johannessen
    @ahjohannessen

    @rossabaker Does this look reasonable?

    class Fs2StreamClientCallListener[F[_], Response] private (
        messages: Messages[F, Response],
        dispatcher: Dispatcher[F]
    ) extends ClientCall.Listener[Response] {
    
      override def onMessage(message: Response): Unit =
        dispatcher.unsafeRunSync(messages.onMessage(message))
    
      override def onClose(status: Status, trailers: Metadata): Unit =
        dispatcher.unsafeRunSync(messages.onClose(GrpcStatus(status, trailers)))
    
      def stream: Stream[F, Response] =
        messages.stream
    }
    
    object Fs2StreamClientCallListener {
    
      @deprecated("Internal API. Will be removed from public API.", "1.1.4")
      def apply[F[_]: Concurrent, Response](
          request: Int => Unit,
          dispatcher: Dispatcher[F]
      ): F[Fs2StreamClientCallListener[F, Response]] =
        create(request.andThen(Applicative[F].pure), dispatcher)
    
      private[client] def create[F[_]: Concurrent, Response](
          request: Int => F[Unit],
          dispatcher: Dispatcher[F]
      ): F[Fs2StreamClientCallListener[F, Response]] =
        Messages[F, Response](request)
          .map(new Fs2StreamClientCallListener[F, Response](_, dispatcher))
    
      /// Server Messages
    
      private[client] trait Messages[F[_], T] {
        def onMessage(msg: T): F[Unit]
        def onClose(status: GrpcStatus): F[Unit]
        def stream: Stream[F, T]
      }
    
      private[client] object Messages {
    
        def apply[F[_]: Concurrent, T](
            request: Int => F[Unit],
            prefetchSize: Int = 1
        ): F[Messages[F, T]] = (Concurrent[F].ref(1), Queue.unbounded[F, Either[GrpcStatus, T]])
          .mapN((d, q) => create[F, T](request, prefetchSize, d, q))
    
        def create[F[_], T](
            request: Int => F[Unit],
            prefetchSize: Int,
            demand: Ref[F, Int],
            queue: Queue[F, Either[GrpcStatus, T]]
        )(implicit F: Concurrent[F]): Messages[F, T] = new Messages[F, T] {
    
          def onMessage(msg: T): F[Unit] = {
    
            val ensureMessages: F[Unit] = (currentDemand, queue.size).mapN { (cd, qs) =>
              val nextAmount = math.max(prefetchSize, 1) - (qs + cd)
              fetch(nextAmount).unlessA(nextAmount < 1)
            }.flatten
    
            decreaseDemandBy(1) *> ensureMessages *> queue.offer(msg.asRight)
          }
    
          def onClose(status: GrpcStatus): F[Unit] =
            queue.offer(status.asLeft)
    
          def currentDemand: F[Int] =
            demand.get.flatTap(d => F.pure(println(s"current demand $d")))
    
          def decreaseDemandBy(n: Int): F[Unit] =
            demand.update(d => math.max(d - n, 0))
    
          def increaseDemandBy(n: Int): F[Unit] =
            demand.update(_ + n)
    
          def fetch(n: Int): F[Unit] =
            request(n) *> increaseDemandBy(n)
    
          val stream: Stream[F, T] = {
    
            val ensureMessages: F[Unit] = (currentDemand, queue.size)
              .mapN((cd, qs) => fetch(1).whenA((cd + qs) < 1))
              .flatten
    
            val run: F[Option[T]] =
              queue.take.flatMap {
                case Right(v) => v.some.pure[F] <* ensureMessages
                case Left(GrpcStatus(status, trailers)) =>
                  if (!status.isOk) F.raiseError(status.asRuntimeException(trailers))
                  else none[T].pure[F]
              }
    
            Stream.repeatEval(run).unNoneTerminate
    
          }
    
        }
    
      }
    
    }

    I have not tested this, but the idea is that we try prefetch when messages arrive, but not more than what the client can take.

    and onPull we check that we keep messages flowing
    Ross A. Baker
    @rossabaker
    That seems fairly reasonable.
    Greg Dorrell
    @Grogs

    Hey. fs2-grpc is pretty awesome, thank you for working on it! I'm wondering how people are handling metrics and/or access logs?

    My team's adopting this library, and we want logs and metrics for observability. I'm guessing a grpc-java interceptor is the way to go? Is there a library anyone's using or do you roll your own?

    Alex Henning Johannessen
    @ahjohannessen
    Hi @Grogs As of now there is nothing in fs2-grpc that does that, so it is more or less roll your own. I have thought about natchez integration, but never came around to look at it closer
    João Ferreira
    @jtjeferreira
    FYI: created a benchmark using fs2-grpc in LesnyRumcajs/grpc_bench#143
    Alex Henning Johannessen
    @ahjohannessen
    @jtjeferreira Latest version is 2.0.1 - We use Discord chat as well these days. How slow is it?
    João Ferreira
    @jtjeferreira
    @ahjohannessen I noticed that I was looking at 2.x README but was already using 1.x code... I shared some numbers in https://discuss.lightbend.com/t/akka-grpc-performance-in-benchmarks/8236/8?u=jtjeferreira
    Walter Chang
    @weihsiu
    the following client code uses fs2-grpc 2.0.1 with scala 3, pretty much out of readme:
    import fs2.grpc.syntax.all.*
    val managedChannelResource: Resource[IO, ManagedChannel] = ManagedChannelBuilder
        .forAddress("localhost", 8080)
        .resource[IO]
    and the compiled error:
    [error] -- [E008] Not Found Error: .../fs2-grpc-sandbox/client/src/main/scala/fs2grpcsandbox/client/HelloClient.scala:13:5 
    [error] 11 |  val managedChannelResource: Resource[IO, ManagedChannel] = ManagedChannelBuilder
    [error] 12 |    .forAddress("localhost", 8080)
    [error] 13 |    .resource[IO]
    [error]    |                                                             ^
    [error]    |value resource is not a member of io.grpc.ManagedChannelBuilder[?2.CAP].
    [error]    |An extension method was tried, but could not be fully constructed:
    [error]    |
    [error]    |    fs2.grpc.syntax.all.fs2GrpcSyntaxManagedChannelBuilder[MCB](
    [error]    |      io.grpc.ManagedChannelBuilder.forAddress("localhost", 8080).$asInstanceOf$
    [error]    |        io.grpc.ManagedChannelBuilder[?1.CAP]
    [error]    |      ]
    [error]    |    )
    [error]    |
    [error]    |where:    ?2 is an unknown value of type scala.runtime.TypeBox[Nothing, io.grpc.ManagedChannelBuilder[LazyRef(Object)]]
    Walter Chang
    @weihsiu
    it seems scala 3 doesn't like the following F-bounded polymorphism too much
    implicit final def fs2GrpcSyntaxManagedChannelBuilder[MCB <: ManagedChannelBuilder[MCB]]( builder: MCB ): ManagedChannelBuilderOps[MCB]
    what can i do?
    Alex Henning Johannessen
    @ahjohannessen
    @weihsiu ManagedChannelBuilder - I think this is a mistake in the docs. If you use NettyChannelBuilder, does it work?
    Walter Chang
    @weihsiu
    @ahjohannessen yes, it worked! thanks!
    Alex Henning Johannessen
    @ahjohannessen
    @weihsiu I'll fix the docs to use NettyChannelBuilder as an example.
    Walter Chang
    @weihsiu
    @ahjohannessen cool, thanks! btw, do you know why fs2-grpc is so much slower than the pure java solution per @jtjeferreira ?
    Alex Henning Johannessen
    @ahjohannessen
    The same goes for the zio and akka solutions. Probably something about how the benchmark is set up.
    João Ferreira
    @jtjeferreira
    @weihsiu indeed there were some problems with the setup of the benchmark. See https://discuss.lightbend.com/t/akka-grpc-performance-in-benchmarks/8236/14