AdamChlupacek on v0.4.0-M4
AdamChlupacek on 0.4-spinoco-compat
Setting version to 0.4.0-M4 Setting version to 0.4.0-SNAPSH… (compare)
mraulim on 0.4-spinoco-compat
Add timeout for publishing in k… Added trusty as a dist for runn… Fixed timeout on leader query. and 2 more (compare)
AdamChlupacek on publish-connection-timeout
Add timeout for leaders stream. (compare)
AdamChlupacek on publish-connection-timeout
Fixed timeout on leader query. (compare)
AdamChlupacek on publish-connection-timeout
Added trusty as a dist for runn… (compare)
override def send(t: KafkaClient[IO])(top: String)(bodies: List[String]): IO[ValidatedNel[String, Unit]] = {
Stream.emits(bodies).covary[IO].evalMap{s =>
Stream.emit(s).covary[IO].through(text.utf8Encode[IO]).compile.toVector.map(x => (ByteVector.empty, ByteVector(x)))
}.chunkLimit(1024).evalMap { ms =>
t.publishN(topic(top), partition(0), requireQuorum = true, serverAckTimeout = 10 seconds, compress = None)(ms)
}.compile.toVector.map { r =>
logger.error(s"Finished publishing kafka messages, result was $r")
().validNel[String]
}
}
import scala.concurrent.ExecutionContext.Implicits.global
val pool = Executors.newScheduledThreadPool(8)
implicit val scheduler = fs2.Scheduler.fromScheduledExecutorService(pool)
implicit val ag = AsynchronousChannelGroup.withThreadPool(pool)
kafka.client[IO](
ensemble = Set(broker("localhost", port = 9092))
, protocol = ProtocolVersion.Kafka_0_10_2
, clientName = "testclient"
)