softwaremill-ci on opentelemetry-api-1.17.0
Update opentelemetry-api, ... t… (compare)
Pask423 on fs2-core-3.2.12
Update fs2-core, fs2-io, ... to… (compare)
softwaremill-ci on scalafmt-core-3.5.9
Update scalafmt-core to 3.5.9 Reformat with scalafmt 3.5.9 E… Add 'Reformat with scalafmt 3.5… (compare)
adamw on v3.7.4
adamw on master
Release 3.7.4 (compare)
adamw on master
Split js tests into 2.11+2.12 a… (compare)
adamw on master
Increase memory for js tests (compare)
adamw on master
Increase memory for js tests (compare)
17:58:15.191 | ERROR | scala-execution-context-global-14 | a8.hermes.ServerSentEventsFactory. | error with stream will restart @ Some(30)
java.lang.IllegalStateException: failed to create a child event loop
at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:88)
at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:58)
at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:47)
at io.netty.channel.MultithreadEventLoopGroup.<init>(MultithreadEventLoopGroup.java:59)
at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:86)
at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:81)
at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:68)
at org.asynchttpclient.netty.channel.NioTransportFactory.newEventLoopGroup(NioTransportFactory.java:32)
at org.asynchttpclient.netty.channel.NioTransportFactory.newEventLoopGroup(NioTransportFactory.java:21)
at org.asynchttpclient.netty.channel.ChannelManager.<init>(ChannelManager.java:133)
at org.asynchttpclient.DefaultAsyncHttpClient.<init>(DefaultAsyncHttpClient.java:92)
at sttp.client.asynchttpclient.AsyncHttpClientBackend$.defaultClient(AsyncHttpClientBackend.scala:380)
at sttp.client.asynchttpclient.monix.AsyncHttpClientMonixBackend$.$anonfun$apply$1(AsyncHttpClientMonixBackend.scala:76)
at monix.eval.internal.TaskRunLoop$.startLight(TaskRunLoop.scala:271)
at monix.eval.Task.runAsyncOptF(Task.scala:812)
at monix.eval.Task.runAsyncOpt(Task.scala:710)
at monix.eval.Task.runAsync(Task.scala:660)
at monix.reactive.internal.builders.TaskAsObservable.unsafeSubscribeFn(TaskAsObservable.scala:30)
at monix.reactive.internal.operators.ConcatMapObservable.unsafeSubscribeFn(ConcatMapObservable.scala:76)
at monix.reactive.internal.operators.OnErrorRecoverWithObservable.unsafeSubscribeFn(OnErrorRecoverWithObservable.scala:35)
at monix.reactive.internal.operators.LiftByOperatorObservable.unsafeSubscribeFn(LiftByOperatorObservable.scala:30)
at monix.reactive.observables.ChainedObservable$.$anonfun$subscribe$1(ChainedObservable.scala:74)
at monix.execution.internal.Trampoline.monix$execution$internal$Trampoline$$immediateLoop(Trampoline.scala:66)
at monix.execution.internal.Trampoline.startLoop(Trampoline.scala:32)
at monix.execution.schedulers.TrampolineExecutionContext$JVMOptimalTrampoline.startLoop(TrampolineExecutionContext.scala:132)
at monix.execution.internal.Trampoline.execute(Trampoline.scala:40)
at monix.execution.schedulers.TrampolineExecutionContext.execute(TrampolineExecutionContext.scala:57)
at monix.execution.schedulers.BatchingScheduler.execute(BatchingScheduler.scala:50)
at monix.execution.schedulers.BatchingScheduler.execute$(BatchingScheduler.scala:47)
at monix.execution.schedulers.AsyncScheduler.execute(AsyncScheduler.scala:31)
at monix.execution.schedulers.ExecuteExtensions.executeTrampolined(ExecuteExtensions.scala:86)
at monix.execution.schedulers.ExecuteExtensions.executeTrampolined$(ExecuteExtensions.scala:85)
at monix.execution.Scheduler$Extensions.executeTrampolined(Scheduler.scala:305)
at monix.reactive.observables.ChainedObservable$.subscribe(ChainedObservable.scala:69)
at monix.reactive.internal.operators.ConcatObservable.unsafeSubscribeFn(ConcatObservable.scala:37)
at monix.reactive.observables.ChainedObservable.unsafeSubscribeFn(ChainedObservable.scala:55)
at monix.reactive.internal.operators.OnErrorRecoverWithObservable$$anon$1.$anonfun$onError$1(OnErrorRecoverWithObservable.scala:57)
at monix.execution.Ack$$anon$1.run(Ack.scala:54)
at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: io.netty.channel.ChannelException: failed to open a new selector
at io.netty.channel.nio.NioEventLoop.openSelector(NioEventLoop.java:175)
at io.netty.channel.nio.NioEventLoop.<init>(NioEventLoop.java:142)
at io.netty.channel.nio.NioEventLoopGroup.newChild(NioEventLoopGroup.java:146)
at io.netty.channel.nio.NioEventLoopGroup.newChild(NioEventLoopGroup.java:37)
at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:84)
... 42 more
Caused by: java.io.IOException: Too many open files
at sun.nio.ch.IOUtil.makePipe(Native Method)
at sun.nio.ch.KQueueSelectorImpl.<init>(KQueueSelectorImpl.java:84)
at sun.nio.ch.KQueueSelectorProvider.openSelector(KQueueSelectorProvider.java:42)
at io.netty.channel.nio.NioEventLoop.openSelector(NioEventLoop.java:173)
... 46 more
val instance: Task[sttp.client.SttpBackend[Task, monix.reactive.Observable[java.nio.ByteBuffer], WebSocketHandler]] =
AsyncHttpClientMonixBackend().memoize
lazy val frames: Observable[WebSocketFrame.Incoming] =
lazy val frames: Observable[WebSocketFrame.Incoming] = {
Observable.unsafeCreate[WebSocketFrame.Incoming] { subscriber =>
val cancelable = AssignableCancelable.multi()
def feedOnNext(): Unit = {
val receiveT: Task[(Either[WebSocketEvent.Close, WebSocketFrame.Incoming], WebSocket[Task])] =
webSocketT
.flatMap { webSocket =>
webSocket
.receive
.map(_ -> webSocket)
}
cancelable :=
receiveT
.runAsync {
case Left(error) =>
subscriber.onError(error)
case Right((Left(close), _)) =>
if ( logger.isTraceEnabled )
logger.trace(s"received close ${close}")
subscriber.onComplete()
case Right((Right(frame), webSocket)) =>
subscriber
.onNext(frame)
.syncOnContinue(feedOnNext())
.syncOnStopOrFailure {
case Some(th) =>
logger.error(s"error processing ${frame}", th)
feedOnNext()
case None =>
if ( logger.isTraceEnabled)
logger.trace(s"stop received closing WebSocket")
webSocket.close.runAsyncAndForget
}
}
}
feedOnNext()
cancelable
}
}
lazy val frames: Observable[WebSocketFrame.Incoming] = {
val receiveT: Task[(Either[WebSocketEvent.Close, WebSocketFrame.Incoming], WebSocket[Task])] =
webSocketT
.flatMap { webSocket =>
webSocket
.receive
.map(_ -> webSocket)
}
val receiveForObservableT: Task[(Either[WebSocketEvent.Close, WebSocketFrame.Incoming], Unit)] = receiveT.map(t => (t._1, ()))
val observable: Observable[Either[WebSocketEvent.Close, WebSocketFrame.Incoming]] = Observable.fromAsyncStateAction[Unit,Either[WebSocketEvent.Close, WebSocketFrame.Incoming]](_ => receiveForObservableT)(())
observable
.takeWhile(_.isRight)
.collect {
case Right(f) =>
f
}
}
Map[String, String]
for each case class?
Hello, I am trying client3 with zio (scala 2.13.3 and zio 1.0.3, sttp 3.0.0-RC5), following the example here https://sttp.softwaremill.com/en/latest/examples.html (GET and parse JSON using the ZIO async-http-client backend and circe), I get at runtime
[info] Fiber failed.
[info] An unchecked error was produced.
[info] java.lang.Error: Defect in zio.Has: Set(SttpBackend[=λ %2:0 → ZIO[-Any,+Throwable,+2:0],+{ZioStreams & package::WebSockets}]) statically known to be contained within the environment are missing
[info] at zio.Has$HasSyntax$.prune$extension(Has.scala:197)
[info] at zio.Has$HasSyntax$.union$extension(Has.scala:209)
[info] at zio.Has$Union$$anon$2.union(Has.scala:91)
[info] at zio.Has$Union$$anon$2.union(Has.scala:89)
Anything I am missing ? when I am not using layer, and just directly the backend its working properly.
statusCode
as one of the label for metric. But I noticed that if request fails with DeserializationError, all metadata of response (headers, status code and other) will be lost.effectBlocking
. I think not as otherwise Blocking
would be simply required by sttp-zio integration. I guess STTP has it's own execution context, but maybe it's still good to tell ZIO this is a blocking operation?
asStreamUnsafe
? Obviously this is backend/stream-specific, but some hints would help.httpclient-backend-fs2
, get IO[Stream[IO, T]]
from sttp, add some in-flight transformations, stream results in http response via http4s
. I think this scenario cannot be expressed with safe asStream
.
def webSocketFramePipe: Flow[WebSocketFrame.Data[_], WebSocketFrame, NotUsed] = Flow[WebSocketFrame.Data[_]].map{
case o @ WebSocketFrame.Text(str, _, _) =>
println(s"received string: $str")
case other =>
println(s"received other: $other")
other
}
val inputSrc: Source[ByteString, Any] = ... // ie Seq("test1", "test")
basicRequest
.get(uri)
.streamBody(AkkaStreams)(inputSrc)
.response(asWebSocketStreamAlways(AkkaStreams)(webSocketFramePipe))
.send(backend)
.onComplete(_ => backend.close())
HI… I am trying to use PrometheusBackend with Cats backend but it looks like it is not supported. It works fine with Monix backend.
AsyncHttpClientMonixBackend().map(backend => PrometheusBackend(backend)) // OK
AsyncHttpClientCatsBackend[cats.effect.IO]().map(backend => PrometheusBackend(backend))
error: type mismatch;
found : sttp.client.SttpBackend[cats.effect.IO,Nothing,sttp.client.asynchttpclient.WebSocketHandler]
required: sttp.client.SttpBackend[[+A]cats.effect.IO[A],S,[WS_RESULT]sttp.client.asynchttpclient.WebSocketHandler[WS_RESULT]]
Error occurred in an application involving default arguments.
sttp.client.SttpBackend[cats.effect.IO,Nothing,sttp.client.asynchttpclient.WebSocketHandler] <: sttp.client.SttpBackend[[+A]cats.effect.IO[A],S,[WS_RESULT]sttp.client.asynchttpclient.WebSocketHandler[WS_RESULT]]?
false
Am I missing something?
private def apply(
client: HttpClient,
closeClient: Boolean,
customizeRequest: HttpRequest => HttpRequest,
customEncodingHandler: ZioEncodingHandler
): SttpBackend[Task, ZioStreams with WebSockets] =
private def make(
actorSystem: ActorSystem,
ec: ExecutionContext,
terminateActorSystemOnClose: Boolean,
options: SttpBackendOptions,
customConnectionPoolSettings: Option[ConnectionPoolSettings],
http: AkkaHttpClient,
customizeRequest: HttpRequest => HttpRequest,
customizeWebsocketRequest: WebSocketRequest => WebSocketRequest = identity,
customEncodingHandler: EncodingHandler = PartialFunction.empty
Sec-WebSocket-Protocol
results in an Illegal Header
exception
val wsBuilder = client.newWebSocketBuilder()
client.connectTimeout().map[java.net.http.WebSocket.Builder](wsBuilder.connectTimeout(_))
request.headers.foreach(h => wsBuilder.header(h.name, h.value))
val cf = wsBuilder
.buildAsync(request.uri.toJavaUri, listener)
.thenApply[Unit](_ => ())
.exceptionally(t => cb(Left(t)))
Canceler(() => cf.cancel(true))
})