softwaremill-ci on armeria-1.18.0
Update armeria to 1.18.0 (compare)
mergify[bot] on scribe-3.10.2
mergify[bot] on master
Update scribe to 3.10.2 Merge pull request #1518 from s… (compare)
softwaremill-ci on scribe-3.10.2
Update scribe to 3.10.2 (compare)
17:58:15.191 | ERROR | scala-execution-context-global-14 | a8.hermes.ServerSentEventsFactory. | error with stream will restart @ Some(30)
java.lang.IllegalStateException: failed to create a child event loop
at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:88)
at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:58)
at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:47)
at io.netty.channel.MultithreadEventLoopGroup.<init>(MultithreadEventLoopGroup.java:59)
at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:86)
at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:81)
at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:68)
at org.asynchttpclient.netty.channel.NioTransportFactory.newEventLoopGroup(NioTransportFactory.java:32)
at org.asynchttpclient.netty.channel.NioTransportFactory.newEventLoopGroup(NioTransportFactory.java:21)
at org.asynchttpclient.netty.channel.ChannelManager.<init>(ChannelManager.java:133)
at org.asynchttpclient.DefaultAsyncHttpClient.<init>(DefaultAsyncHttpClient.java:92)
at sttp.client.asynchttpclient.AsyncHttpClientBackend$.defaultClient(AsyncHttpClientBackend.scala:380)
at sttp.client.asynchttpclient.monix.AsyncHttpClientMonixBackend$.$anonfun$apply$1(AsyncHttpClientMonixBackend.scala:76)
at monix.eval.internal.TaskRunLoop$.startLight(TaskRunLoop.scala:271)
at monix.eval.Task.runAsyncOptF(Task.scala:812)
at monix.eval.Task.runAsyncOpt(Task.scala:710)
at monix.eval.Task.runAsync(Task.scala:660)
at monix.reactive.internal.builders.TaskAsObservable.unsafeSubscribeFn(TaskAsObservable.scala:30)
at monix.reactive.internal.operators.ConcatMapObservable.unsafeSubscribeFn(ConcatMapObservable.scala:76)
at monix.reactive.internal.operators.OnErrorRecoverWithObservable.unsafeSubscribeFn(OnErrorRecoverWithObservable.scala:35)
at monix.reactive.internal.operators.LiftByOperatorObservable.unsafeSubscribeFn(LiftByOperatorObservable.scala:30)
at monix.reactive.observables.ChainedObservable$.$anonfun$subscribe$1(ChainedObservable.scala:74)
at monix.execution.internal.Trampoline.monix$execution$internal$Trampoline$$immediateLoop(Trampoline.scala:66)
at monix.execution.internal.Trampoline.startLoop(Trampoline.scala:32)
at monix.execution.schedulers.TrampolineExecutionContext$JVMOptimalTrampoline.startLoop(TrampolineExecutionContext.scala:132)
at monix.execution.internal.Trampoline.execute(Trampoline.scala:40)
at monix.execution.schedulers.TrampolineExecutionContext.execute(TrampolineExecutionContext.scala:57)
at monix.execution.schedulers.BatchingScheduler.execute(BatchingScheduler.scala:50)
at monix.execution.schedulers.BatchingScheduler.execute$(BatchingScheduler.scala:47)
at monix.execution.schedulers.AsyncScheduler.execute(AsyncScheduler.scala:31)
at monix.execution.schedulers.ExecuteExtensions.executeTrampolined(ExecuteExtensions.scala:86)
at monix.execution.schedulers.ExecuteExtensions.executeTrampolined$(ExecuteExtensions.scala:85)
at monix.execution.Scheduler$Extensions.executeTrampolined(Scheduler.scala:305)
at monix.reactive.observables.ChainedObservable$.subscribe(ChainedObservable.scala:69)
at monix.reactive.internal.operators.ConcatObservable.unsafeSubscribeFn(ConcatObservable.scala:37)
at monix.reactive.observables.ChainedObservable.unsafeSubscribeFn(ChainedObservable.scala:55)
at monix.reactive.internal.operators.OnErrorRecoverWithObservable$$anon$1.$anonfun$onError$1(OnErrorRecoverWithObservable.scala:57)
at monix.execution.Ack$$anon$1.run(Ack.scala:54)
at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: io.netty.channel.ChannelException: failed to open a new selector
at io.netty.channel.nio.NioEventLoop.openSelector(NioEventLoop.java:175)
at io.netty.channel.nio.NioEventLoop.<init>(NioEventLoop.java:142)
at io.netty.channel.nio.NioEventLoopGroup.newChild(NioEventLoopGroup.java:146)
at io.netty.channel.nio.NioEventLoopGroup.newChild(NioEventLoopGroup.java:37)
at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:84)
... 42 more
Caused by: java.io.IOException: Too many open files
at sun.nio.ch.IOUtil.makePipe(Native Method)
at sun.nio.ch.KQueueSelectorImpl.<init>(KQueueSelectorImpl.java:84)
at sun.nio.ch.KQueueSelectorProvider.openSelector(KQueueSelectorProvider.java:42)
at io.netty.channel.nio.NioEventLoop.openSelector(NioEventLoop.java:173)
... 46 more
val instance: Task[sttp.client.SttpBackend[Task, monix.reactive.Observable[java.nio.ByteBuffer], WebSocketHandler]] =
AsyncHttpClientMonixBackend().memoize
lazy val frames: Observable[WebSocketFrame.Incoming] =
lazy val frames: Observable[WebSocketFrame.Incoming] = {
Observable.unsafeCreate[WebSocketFrame.Incoming] { subscriber =>
val cancelable = AssignableCancelable.multi()
def feedOnNext(): Unit = {
val receiveT: Task[(Either[WebSocketEvent.Close, WebSocketFrame.Incoming], WebSocket[Task])] =
webSocketT
.flatMap { webSocket =>
webSocket
.receive
.map(_ -> webSocket)
}
cancelable :=
receiveT
.runAsync {
case Left(error) =>
subscriber.onError(error)
case Right((Left(close), _)) =>
if ( logger.isTraceEnabled )
logger.trace(s"received close ${close}")
subscriber.onComplete()
case Right((Right(frame), webSocket)) =>
subscriber
.onNext(frame)
.syncOnContinue(feedOnNext())
.syncOnStopOrFailure {
case Some(th) =>
logger.error(s"error processing ${frame}", th)
feedOnNext()
case None =>
if ( logger.isTraceEnabled)
logger.trace(s"stop received closing WebSocket")
webSocket.close.runAsyncAndForget
}
}
}
feedOnNext()
cancelable
}
}
lazy val frames: Observable[WebSocketFrame.Incoming] = {
val receiveT: Task[(Either[WebSocketEvent.Close, WebSocketFrame.Incoming], WebSocket[Task])] =
webSocketT
.flatMap { webSocket =>
webSocket
.receive
.map(_ -> webSocket)
}
val receiveForObservableT: Task[(Either[WebSocketEvent.Close, WebSocketFrame.Incoming], Unit)] = receiveT.map(t => (t._1, ()))
val observable: Observable[Either[WebSocketEvent.Close, WebSocketFrame.Incoming]] = Observable.fromAsyncStateAction[Unit,Either[WebSocketEvent.Close, WebSocketFrame.Incoming]](_ => receiveForObservableT)(())
observable
.takeWhile(_.isRight)
.collect {
case Right(f) =>
f
}
}
Map[String, String]
for each case class?
Hello, I am trying client3 with zio (scala 2.13.3 and zio 1.0.3, sttp 3.0.0-RC5), following the example here https://sttp.softwaremill.com/en/latest/examples.html (GET and parse JSON using the ZIO async-http-client backend and circe), I get at runtime
[info] Fiber failed.
[info] An unchecked error was produced.
[info] java.lang.Error: Defect in zio.Has: Set(SttpBackend[=λ %2:0 → ZIO[-Any,+Throwable,+2:0],+{ZioStreams & package::WebSockets}]) statically known to be contained within the environment are missing
[info] at zio.Has$HasSyntax$.prune$extension(Has.scala:197)
[info] at zio.Has$HasSyntax$.union$extension(Has.scala:209)
[info] at zio.Has$Union$$anon$2.union(Has.scala:91)
[info] at zio.Has$Union$$anon$2.union(Has.scala:89)
Anything I am missing ? when I am not using layer, and just directly the backend its working properly.
statusCode
as one of the label for metric. But I noticed that if request fails with DeserializationError, all metadata of response (headers, status code and other) will be lost.effectBlocking
. I think not as otherwise Blocking
would be simply required by sttp-zio integration. I guess STTP has it's own execution context, but maybe it's still good to tell ZIO this is a blocking operation?
asStreamUnsafe
? Obviously this is backend/stream-specific, but some hints would help.httpclient-backend-fs2
, get IO[Stream[IO, T]]
from sttp, add some in-flight transformations, stream results in http response via http4s
. I think this scenario cannot be expressed with safe asStream
.
def webSocketFramePipe: Flow[WebSocketFrame.Data[_], WebSocketFrame, NotUsed] = Flow[WebSocketFrame.Data[_]].map{
case o @ WebSocketFrame.Text(str, _, _) =>
println(s"received string: $str")
case other =>
println(s"received other: $other")
other
}
val inputSrc: Source[ByteString, Any] = ... // ie Seq("test1", "test")
basicRequest
.get(uri)
.streamBody(AkkaStreams)(inputSrc)
.response(asWebSocketStreamAlways(AkkaStreams)(webSocketFramePipe))
.send(backend)
.onComplete(_ => backend.close())
HI… I am trying to use PrometheusBackend with Cats backend but it looks like it is not supported. It works fine with Monix backend.
AsyncHttpClientMonixBackend().map(backend => PrometheusBackend(backend)) // OK
AsyncHttpClientCatsBackend[cats.effect.IO]().map(backend => PrometheusBackend(backend))
error: type mismatch;
found : sttp.client.SttpBackend[cats.effect.IO,Nothing,sttp.client.asynchttpclient.WebSocketHandler]
required: sttp.client.SttpBackend[[+A]cats.effect.IO[A],S,[WS_RESULT]sttp.client.asynchttpclient.WebSocketHandler[WS_RESULT]]
Error occurred in an application involving default arguments.
sttp.client.SttpBackend[cats.effect.IO,Nothing,sttp.client.asynchttpclient.WebSocketHandler] <: sttp.client.SttpBackend[[+A]cats.effect.IO[A],S,[WS_RESULT]sttp.client.asynchttpclient.WebSocketHandler[WS_RESULT]]?
false
Am I missing something?
private def apply(
client: HttpClient,
closeClient: Boolean,
customizeRequest: HttpRequest => HttpRequest,
customEncodingHandler: ZioEncodingHandler
): SttpBackend[Task, ZioStreams with WebSockets] =
private def make(
actorSystem: ActorSystem,
ec: ExecutionContext,
terminateActorSystemOnClose: Boolean,
options: SttpBackendOptions,
customConnectionPoolSettings: Option[ConnectionPoolSettings],
http: AkkaHttpClient,
customizeRequest: HttpRequest => HttpRequest,
customizeWebsocketRequest: WebSocketRequest => WebSocketRequest = identity,
customEncodingHandler: EncodingHandler = PartialFunction.empty
Sec-WebSocket-Protocol
results in an Illegal Header
exception
val wsBuilder = client.newWebSocketBuilder()
client.connectTimeout().map[java.net.http.WebSocket.Builder](wsBuilder.connectTimeout(_))
request.headers.foreach(h => wsBuilder.header(h.name, h.value))
val cf = wsBuilder
.buildAsync(request.uri.toJavaUri, listener)
.thenApply[Unit](_ => ())
.exceptionally(t => cb(Left(t)))
Canceler(() => cf.cancel(true))
})
wsBuilder.subprotocols
afaik
SttpBackend
trait, the documentation for send
states Type parameters: R – The capabilities required by the request. This must be a subset of the the capabilities supported by the backend (which always includes Effect[F]).
, which makes sense. However when I'm looking at AkkaHttpBackend
, the type returned is SttpBackend[Future, AkkaStreams with WebSockets]
, so Effect[Future]
is not included. Shouldn't that be there?