Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
  • Dec 07 07:19
    mergify[bot] closed #787
  • Dec 07 07:19

    mergify[bot] on master

    Update cats-effect to 3.4.2 Merge pull request #787 from sc… (compare)

  • Dec 07 07:14
    scala-steward opened #787
  • Dec 07 07:14
    scala-steward closed #782
  • Dec 07 07:14
    scala-steward commented #782
  • Dec 07 07:14
    scala-steward opened #786
  • Nov 28 19:39

    mergify[bot] on master

    Update scala-collection-compat … Merge pull request #785 from sc… (compare)

  • Nov 28 19:39
    mergify[bot] closed #785
  • Nov 28 19:32
    scala-steward opened #785
  • Nov 22 21:09

    mergify[bot] on master

    Update fs2-core to 3.4.0 Merge pull request #784 from sc… (compare)

  • Nov 22 21:09
    mergify[bot] closed #784
  • Nov 22 21:04
    scala-steward opened #784
  • Nov 20 18:57

    mergify[bot] on master

    Update metrics-core, metrics-jm… Merge pull request #783 from sc… (compare)

  • Nov 20 18:57
    mergify[bot] closed #783
  • Nov 20 18:55

    mergify[bot] on master

    Update logback-classic to 1.4.5 Merge pull request #781 from sc… (compare)

  • Nov 20 18:55
    mergify[bot] closed #781
  • Nov 20 18:51
    scala-steward opened #783
  • Nov 20 18:51
    scala-steward closed #771
  • Nov 20 18:51
    scala-steward commented #771
  • Nov 20 18:51
    scala-steward opened #782
Gavin Bisesi
@Daenyth
yeah nothing jumps out at me
What I'm afraid of is that this is a difference in fs2's scope handling somehow between current and 1.x
Gavin Bisesi
@Daenyth
no it's something else
Gavin Bisesi
@Daenyth
oh I found it out btw
My .take wasn't on the queue envelopes, it was on the stream providing me the (acker, msg) tuple
:finnadie:
Gavin Bisesi
@Daenyth
huh, the amqp spec has an xml :thought_balloon: :suspect:
makes me curious to write a code generator for the main shapes

The protocol definition conforms to a formal grammar that is
published seperately in several technologies.

How nice.

What's the formal grammar called, and what places publish it? :anger:

Gavin Bisesi
@Daenyth
so uh, I made the java library throw a TimeoutException
so that's fun
do you happen to know if one channel is able to consume from one queue multiple times and have multiple qos on that?
Gavin Bisesi
@Daenyth
So more learning: qos is a per-channel or per-connection setting depending on global. If you construct multiple consumers on the same channel, the last qos you set "wins"
for rabbitmq this means each consumer has its own independent prefetch buffer.
For amqp per the spec I believe it's one buffer period on the channel for all consumers
The createAckerConsumer api is very misleading the way it's typed
we should consider how to appropriately change that
but if people want to use prefetching, they're best to create a channel per consumer
Gavin Bisesi
@Daenyth

@gvolpe Do you know why in ConsumingProgram for createConsumer we read from the InternalQueue using Stream.repeatEval(queue.dequeue1.....) instead of using queue.dequeue?

Using dequeue1 wrecks the chunk structure and processing many small envelopes gets less efficient

Gavin Bisesi
@Daenyth
I can make a PR to that effect, I don't think it should change any behaviors
Gabriel Volpe
@gvolpe
Hey @Daenyth , not sure to be honest, that's as old as the library itself I guess
If I had a reason when I wrote that code I should have added a comment cause I don't recall now :smile:
Gavin Bisesi
@Daenyth
K, I'll PR it
Gabriel Volpe
@gvolpe
:+1:
Gavin Bisesi
@Daenyth
Gavin Bisesi
@Daenyth
yeah wow I think fs2-rabbit is very slow when you have high prefetch values
if I look in my docker rabbit's console I can see "unacked = 131,072" and "ready = 0" but I'm having to use Agitation to set >4second timeouts on every single message in order to not be timing out
I wonder if this is because of the dequeue1
or if it's something I'm doing myself
Gavin Bisesi
@Daenyth
Running a single test case, in 464,085ms according to logback, I dequeued 47 envelopes. That's not right, we definitely go faster in prod. Something I'm doing is being very slow, maybe it's the Agitation?
lemme try throwing in a groupWithin
maybe bounded by internalQueueSize?
Gavin Bisesi
@Daenyth
Thanks for the quick merge btw. Github actions looks neat. Is there any way to have it also show the html-ized test report?
Gabriel Volpe
@gvolpe
No worries, is there such a thing?
Gavin Bisesi
@Daenyth
sort of. There's the standard junit-style xml that you can have scalatest emit, and many things allow you to navigate through it interactively
even pytest in python emits it, so I wouldn't be surprised if GH can read it
Gabriel Volpe
@gvolpe
no idea tbh, we used to have something like that when we had test coverage metrics
I don't find so much value in it, though
Gavin Bisesi
@Daenyth
more useful for coverage than tests, but I've found them useful especially in projects with thousands of tests - grepping sbt output is a bear
looks like nothing out of the box
oh well
Gavin Bisesi
@Daenyth
ok part of it was slow because my docker rabbit was running out of memory, because I wasn't declaring my test queues with TTLs or as AutoDelete
Adriani Furtado
@Adriani277
Hi all, I was looking at the Publishing Algebra and noticed that most return types
are F[A => F[Unit]]. Is it possible to have a SimplePublish that takes a payload as a parameter and simply publishes the message?
Something like
def simplePublish[A](
      payload: A,
      channel: AMQPChannel,
      exchangeName: ExchangeName,
      routingKey: RoutingKey
  )(implicit encoder: MessageEncoder[F, A]): F[Unit] =
FlatMap[F].flatMap(client.createPublisher(exchangeName, routingKey))(_.apply(payload))
Gavin Bisesi
@Daenyth
I mean it won't be broken
I don't recall off hand what effect is being captured in the "create publisher" method
better to reuse it if you can, but wouldn't be broken if you didn't, I think
Adriani Furtado
@Adriani277
Alright I see, I suspected there would be some performance hit by re-creating the publisher
Sebastian Voss
@sebastianvoss

I have an application which creates a publisher and consumer to interact with an AMQP server.

When the app receives an SIGINT it needs to send a logout message and wait for the response before
shutting down. I got some hint from @Daenyth to use Resource.makeCase but I'm failing to see where
to inject this. Maybe you can help me out again.

val resources = for {
  conn    <- R.createConnection
  resChan <- R.createChannel(conn)
  reqChan <- R.createChannel(conn)
} yield (resChan, reqChan)

resources.use {
  case (responseChannel, requestChannel) =>
    for {
      publisher <- createPublisher(R)(requestChannel)
      consumer  <- createConsumer(R)(responseChannel)
      _         <- program(consumer, publisher).compile.drain
    } yield ()
}
Gavin Bisesi
@Daenyth
below reqChan in resources
oh well no..
actually, onFinalize on program should be able to get you done