adamw on adjust-exceptions-in-stub
Adjust exceptions in SttpBacken… (compare)
softwaremill-ci on cats-effect-3.4.6
Update cats-effect, cats-effect… (compare)
softwaremill-ci on zio-nio-2.0.1
Update zio-nio to 2.0.1 (compare)
adamw on v3.8.10
adamw on master
Release 3.8.10 (compare)
Hello everyone : )
I am currently playing around with STTP using the Monix backend. I am mainly stuck with closing the backend after all my requests (each request is a task) have been processed.
I have created sample code to resemble my issue (in case that helps answering my question):
import sttp.client.asynchttpclient.monix._
import monix.eval.Task
import monix.reactive.Observable
import sttp.client.{Response, UriContext}
import scala.concurrent.duration.DurationInt
object ObservableTest extends App {
val activities = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
val ids: Task[List[Int]] = Task { (1 to 3).toList }
val f: String => Task[Response[Either[String, String]]] = (i: String) => fetch(uri"$i", "")
val data: Task[List[Task[Response[Either[String, String]]]]] = ids map (_ map (_ => f("https://heloooo.free.beeceptor.com/my/api/path")))
data.guarantee(backend.close()) // If I close the backend here, I can't make generate requests after
// I have attempted to return a Task containing a tuple of (data, backend) but closing the backend from outside of the scope did not work as I expected
}
import monix.execution.Scheduler.Implicits.global
val obs = Observable
.fromTask(activities)
.flatMap { listOfFetches =>
Observable.fromIterable(listOfFetches)
}
.throttle(3 second, 1)
.map(_.runToFuture)
obs.subscribe()
}
And my fetch (api call maker) function looks like:
def fetch(uri: Uri, auth: String)(implicit
backend: SttpBackend[Task, Observable[ByteBuffer], WebSocketHandler]
) = {
println(uri)
val task = basicRequest
.get(uri)
.header("accept", "application/json")
.header("Authorization", auth)
.response(asString)
.send()
task
}
As my main task contains other tasks, which I later need to process, I need to find an alternative way to close the Monix backend from outside. Is there a clean way to do close the backend after I consume the requests in List[Task[Response[Either[String, String]]]]
?
Hello!
Is the OpenAPI scala-sttp generator usable? I ask because It's status is listed in beta (https://openapi-generator.tech/docs/generators) and I'm getting serialization errors on simple string body responses.
def getHealth(): ApiRequestT[String] =
basicRequest
.method(Method.GET, uri"$baseUrl/health")
.contentType("application/json")
.response(asJson[String])
import org.openapitools.client.api.DefaultApi
import org.openapitools.client.core.ApiInvoker._
import org.openapitools.client.core.SttpSerializer
import sttp.client.HttpURLConnectionBackend
implicit val s = new SttpSerializer()
implicit val b = HttpURLConnectionBackend()
val api = DefaultApi(<my string url>)
api.getHealth().result
results in
org.openapitools.client.core.HttpException: [200] OK: Running...
The body is correct but a serialization error is thrown.
Hello ! I'm trying to use sttp with ZIO backend (2.2.4)but I end up having a runtime exception
─An unchecked error was produced.
║ java.lang.IncompatibleClassChangeError: Found interface zio.ZQueue, but class was expected
║ at zio.interop.reactivestreams.Adapters$.process(Adapters.scala:71)
║ at zio.interop.reactivestreams.Adapters$.$anonfun$publisherToStream$6(Adapters.scala:49)
║ at zio.ZManaged.$anonfun$flatMap$6(ZManaged.scala:315)
║ at zio.internal.FiberContext.evaluateNow(FiberContext.scala:815)
║ at zio.internal.FiberContext.$anonfun$evaluateLater$1(FiberContext.scala:687)
║ at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
║ at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
║ at java.lang.Thread.run(Thread.java:748)
I can't figure out why this is happening, does someone have any clue ? I unfortunately can't share the code but it's a really simple POST request I'm trying to make, followed by a Runtime.default.unsafeRunToFuture
Hello, everyone!
Got this error
value mapK is not a member of sttp.client.SttpBackend
snippet:
https://scastie.scala-lang.org/OgnPGIorTfGep6rj3wfl6A
I've imported import sttp.client.impl.cats.implicits._
but it doesn't help.
Hello. I've seen the logging backend in sttp v2. It's always nice to be able to remove some boilerplate in my code.
How can I log also the body of a response ?
The simplest way I found is to extend the Slf4jLoggingListener
:
(I need to be in this package because Logger is private)
package sttp.client.logging.slf4j
import sttp.client.listener.ListenerBackend
import sttp.client.logging.LogMessages.requestToString
import sttp.client.{Identity, Request, Response, SttpBackend}
object CustomSlf4jLoggingListener {
private val logger = new Logger("sttp.client.logging.slf4j.Slf4jLoggingBackend")
def apply[F[_], S, WS_HANDLER[_]](
delegate: SttpBackend[F, S, WS_HANDLER]
): SttpBackend[F, S, WS_HANDLER] =
ListenerBackend.lift(delegate, new CustomSlf4jLoggingListener(logger))
}
class CustomSlf4jLoggingListener(logger: Logger) extends Slf4jLoggingListener(logger) {
override def requestSuccessful(
request: Request[_, _],
response: Response[_],
tag: Unit
): Identity[Unit] =
if (response.isSuccess) {
logger.debug(response_(request, response))
} else {
logger.warn(response_(request, response))
}
def response_(request: Request[_, _], response: Response[_]): String =
s"For request: ${requestToString(request)}, got response: ${responseToString(response)}"
def responseToString(response: Response[_]): String =
s"${response.code}, body: ${response.body}"
}
Hi All, I am new to sttp and trying to learn few basics of it.
In one of the issues I am facing I have not been getting right json response for this code
val baseRequest: RequestT[Empty, Either[String, String], Nothing] = basicRequest.header("accept", "application/vnd.github.v3+json")
val url: Uri = uri"https://api.github.com/events"
sttpBackend
.send {
baseRequest
.readTimeout(config.http.timeout)
.get(url)
.response(asJson[Either[GitErrorResponse, List[CommitEvent]]])
}
.map[Either[Error, List[CommitEvent]]] {
response: Response[Either[ResponseError[CirceError], Either[GitErrorResponse, List[CommitEvent]]]] =>
response.translate
}
.handleError { throwable: Throwable =>
throwable.translate
}
compare that to this basic scala code its working as expected
val url = "https://api.github.com/events"
val result = scala.io.Source.fromURL(url).mkString
println(result)
Can someone help ?
fromMetadata
to the new signature. Can you provide an example for the new signature?def fromMetadata[T, S](f: ResponseMetadata => ResponseAs[T, S]): ResponseAs[T, S]
def fromMetadata[T, R](default: ResponseAs[T, R], conditions: ConditionalResponseAs[T, R]*)
17:58:15.191 | ERROR | scala-execution-context-global-14 | a8.hermes.ServerSentEventsFactory. | error with stream will restart @ Some(30)
java.lang.IllegalStateException: failed to create a child event loop
at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:88)
at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:58)
at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:47)
at io.netty.channel.MultithreadEventLoopGroup.<init>(MultithreadEventLoopGroup.java:59)
at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:86)
at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:81)
at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:68)
at org.asynchttpclient.netty.channel.NioTransportFactory.newEventLoopGroup(NioTransportFactory.java:32)
at org.asynchttpclient.netty.channel.NioTransportFactory.newEventLoopGroup(NioTransportFactory.java:21)
at org.asynchttpclient.netty.channel.ChannelManager.<init>(ChannelManager.java:133)
at org.asynchttpclient.DefaultAsyncHttpClient.<init>(DefaultAsyncHttpClient.java:92)
at sttp.client.asynchttpclient.AsyncHttpClientBackend$.defaultClient(AsyncHttpClientBackend.scala:380)
at sttp.client.asynchttpclient.monix.AsyncHttpClientMonixBackend$.$anonfun$apply$1(AsyncHttpClientMonixBackend.scala:76)
at monix.eval.internal.TaskRunLoop$.startLight(TaskRunLoop.scala:271)
at monix.eval.Task.runAsyncOptF(Task.scala:812)
at monix.eval.Task.runAsyncOpt(Task.scala:710)
at monix.eval.Task.runAsync(Task.scala:660)
at monix.reactive.internal.builders.TaskAsObservable.unsafeSubscribeFn(TaskAsObservable.scala:30)
at monix.reactive.internal.operators.ConcatMapObservable.unsafeSubscribeFn(ConcatMapObservable.scala:76)
at monix.reactive.internal.operators.OnErrorRecoverWithObservable.unsafeSubscribeFn(OnErrorRecoverWithObservable.scala:35)
at monix.reactive.internal.operators.LiftByOperatorObservable.unsafeSubscribeFn(LiftByOperatorObservable.scala:30)
at monix.reactive.observables.ChainedObservable$.$anonfun$subscribe$1(ChainedObservable.scala:74)
at monix.execution.internal.Trampoline.monix$execution$internal$Trampoline$$immediateLoop(Trampoline.scala:66)
at monix.execution.internal.Trampoline.startLoop(Trampoline.scala:32)
at monix.execution.schedulers.TrampolineExecutionContext$JVMOptimalTrampoline.startLoop(TrampolineExecutionContext.scala:132)
at monix.execution.internal.Trampoline.execute(Trampoline.scala:40)
at monix.execution.schedulers.TrampolineExecutionContext.execute(TrampolineExecutionContext.scala:57)
at monix.execution.schedulers.BatchingScheduler.execute(BatchingScheduler.scala:50)
at monix.execution.schedulers.BatchingScheduler.execute$(BatchingScheduler.scala:47)
at monix.execution.schedulers.AsyncScheduler.execute(AsyncScheduler.scala:31)
at monix.execution.schedulers.ExecuteExtensions.executeTrampolined(ExecuteExtensions.scala:86)
at monix.execution.schedulers.ExecuteExtensions.executeTrampolined$(ExecuteExtensions.scala:85)
at monix.execution.Scheduler$Extensions.executeTrampolined(Scheduler.scala:305)
at monix.reactive.observables.ChainedObservable$.subscribe(ChainedObservable.scala:69)
at monix.reactive.internal.operators.ConcatObservable.unsafeSubscribeFn(ConcatObservable.scala:37)
at monix.reactive.observables.ChainedObservable.unsafeSubscribeFn(ChainedObservable.scala:55)
at monix.reactive.internal.operators.OnErrorRecoverWithObservable$$anon$1.$anonfun$onError$1(OnErrorRecoverWithObservable.scala:57)
at monix.execution.Ack$$anon$1.run(Ack.scala:54)
at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: io.netty.channel.ChannelException: failed to open a new selector
at io.netty.channel.nio.NioEventLoop.openSelector(NioEventLoop.java:175)
at io.netty.channel.nio.NioEventLoop.<init>(NioEventLoop.java:142)
at io.netty.channel.nio.NioEventLoopGroup.newChild(NioEventLoopGroup.java:146)
at io.netty.channel.nio.NioEventLoopGroup.newChild(NioEventLoopGroup.java:37)
at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:84)
... 42 more
Caused by: java.io.IOException: Too many open files
at sun.nio.ch.IOUtil.makePipe(Native Method)
at sun.nio.ch.KQueueSelectorImpl.<init>(KQueueSelectorImpl.java:84)
at sun.nio.ch.KQueueSelectorProvider.openSelector(KQueueSelectorProvider.java:42)
at io.netty.channel.nio.NioEventLoop.openSelector(NioEventLoop.java:173)
... 46 more
val instance: Task[sttp.client.SttpBackend[Task, monix.reactive.Observable[java.nio.ByteBuffer], WebSocketHandler]] =
AsyncHttpClientMonixBackend().memoize
lazy val frames: Observable[WebSocketFrame.Incoming] =
lazy val frames: Observable[WebSocketFrame.Incoming] = {
Observable.unsafeCreate[WebSocketFrame.Incoming] { subscriber =>
val cancelable = AssignableCancelable.multi()
def feedOnNext(): Unit = {
val receiveT: Task[(Either[WebSocketEvent.Close, WebSocketFrame.Incoming], WebSocket[Task])] =
webSocketT
.flatMap { webSocket =>
webSocket
.receive
.map(_ -> webSocket)
}
cancelable :=
receiveT
.runAsync {
case Left(error) =>
subscriber.onError(error)
case Right((Left(close), _)) =>
if ( logger.isTraceEnabled )
logger.trace(s"received close ${close}")
subscriber.onComplete()
case Right((Right(frame), webSocket)) =>
subscriber
.onNext(frame)
.syncOnContinue(feedOnNext())
.syncOnStopOrFailure {
case Some(th) =>
logger.error(s"error processing ${frame}", th)
feedOnNext()
case None =>
if ( logger.isTraceEnabled)
logger.trace(s"stop received closing WebSocket")
webSocket.close.runAsyncAndForget
}
}
}
feedOnNext()
cancelable
}
}
lazy val frames: Observable[WebSocketFrame.Incoming] = {
val receiveT: Task[(Either[WebSocketEvent.Close, WebSocketFrame.Incoming], WebSocket[Task])] =
webSocketT
.flatMap { webSocket =>
webSocket
.receive
.map(_ -> webSocket)
}
val receiveForObservableT: Task[(Either[WebSocketEvent.Close, WebSocketFrame.Incoming], Unit)] = receiveT.map(t => (t._1, ()))
val observable: Observable[Either[WebSocketEvent.Close, WebSocketFrame.Incoming]] = Observable.fromAsyncStateAction[Unit,Either[WebSocketEvent.Close, WebSocketFrame.Incoming]](_ => receiveForObservableT)(())
observable
.takeWhile(_.isRight)
.collect {
case Right(f) =>
f
}
}