mergify[bot] on scribe-3.11.0
mergify[bot] on master
Update scribe to 3.11.0 Merge pull request #1729 from s… (compare)
mergify[bot] on fs2-core-3.6.0
mergify[bot] on master
Update fs2-core, fs2-reactive-s… Merge pull request #1728 from s… (compare)
softwaremill-ci on scribe-3.11.0
Update scribe to 3.11.0 (compare)
softwaremill-ci on fs2-core-3.6.0
Update fs2-core, fs2-reactive-s… (compare)
softwaremill-ci on zio-2.0.7
Update zio, zio-streams to 2.0.7 (compare)
adamw on v3.8.11
adamw on master
Release 3.8.11 (compare)
mergify[bot] on zio-nio-2.0.1
threadLocalMap of io.netty.util.concurrent.FastThreadLocalThread #3009 [Thread, Stack Local] "AsyncHttpClient-5-1" tid=196 [RUNNABLE] Native ID: 16561879 14856 384
is holding on to PoolChunk
s containing the byte buffers of responses. Any suggestions?
Hello everyone : )
Apologies in advance if the question does not follow guidelines as I am new here. I am also new to sttp and Monix, and I am trying to learn more about these libraries. My goal is to fetch data (client-side) from a given API via HTTP GET requests -> parse JSON responses -> write this information to a database. My question pertains to the first part only. My objective is to run get requests in an asynchronous (hopefully fast) way while having a way to either avoid or handle rate limits.
Below is a snippet of what I have already tried, and seems to work for a single request:
package com.github.me.client
import io.circe.{Decoder, HCursor}
import sttp.client._
import sttp.client.circe._
import sttp.client.asynchttpclient.monix._
import monix.eval.Task
object SO extends App {
case class Bla(paging: Int)
implicit val dataDecoder: Decoder[Bla] = (hCursor: HCursor) => {
for {
next_page <- hCursor.downField("foo").downArray.downField("bar").as[Int]
} yield Bla(next_page)
}
val postTask = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
val r = basicRequest
.get(uri"https://foo.bar.io/v1/baz")
.header("accept", "application/json")
.header("Authorization", "hushh!")
.response(asJson[Bla])
r.send() // How can I instead of operating on a single request, operate on multiple
.flatMap { response =>
Task(response.body)
}
.guarantee(backend.close())
}
import monix.execution.Scheduler.Implicits.global
postTask.runSyncUnsafe() match {
case Left(error) => println(s"Error when executing request: $error")
case Right(data) => println(data)
}
}
My questions:
On a side note, I am not particularly fixated on using Monix, and I am open to any other back-end or implementation
Hi! This is a very small thing, but would you be open to a PR that adds a type alias for a zio SttpBackend (the part within the Has
)
https://github.com/softwaremill/sttp/blob/1e34bff7ef8bb0200d456bc8f268b3b25fdfa6ce/async-http-client-backend/zio/src/main/scala/sttp/client/asynchttpclient/zio/package.scala#L13
I find myself redefining it in several codebases to make it a bit shorter when I'm passing it around.
Hello everyone : )
I am currently playing around with STTP using the Monix backend. I am mainly stuck with closing the backend after all my requests (each request is a task) have been processed.
I have created sample code to resemble my issue (in case that helps answering my question):
import sttp.client.asynchttpclient.monix._
import monix.eval.Task
import monix.reactive.Observable
import sttp.client.{Response, UriContext}
import scala.concurrent.duration.DurationInt
object ObservableTest extends App {
val activities = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
val ids: Task[List[Int]] = Task { (1 to 3).toList }
val f: String => Task[Response[Either[String, String]]] = (i: String) => fetch(uri"$i", "")
val data: Task[List[Task[Response[Either[String, String]]]]] = ids map (_ map (_ => f("https://heloooo.free.beeceptor.com/my/api/path")))
data.guarantee(backend.close()) // If I close the backend here, I can't make generate requests after
// I have attempted to return a Task containing a tuple of (data, backend) but closing the backend from outside of the scope did not work as I expected
}
import monix.execution.Scheduler.Implicits.global
val obs = Observable
.fromTask(activities)
.flatMap { listOfFetches =>
Observable.fromIterable(listOfFetches)
}
.throttle(3 second, 1)
.map(_.runToFuture)
obs.subscribe()
}
And my fetch (api call maker) function looks like:
def fetch(uri: Uri, auth: String)(implicit
backend: SttpBackend[Task, Observable[ByteBuffer], WebSocketHandler]
) = {
println(uri)
val task = basicRequest
.get(uri)
.header("accept", "application/json")
.header("Authorization", auth)
.response(asString)
.send()
task
}
As my main task contains other tasks, which I later need to process, I need to find an alternative way to close the Monix backend from outside. Is there a clean way to do close the backend after I consume the requests in List[Task[Response[Either[String, String]]]]
?
Hello!
Is the OpenAPI scala-sttp generator usable? I ask because It's status is listed in beta (https://openapi-generator.tech/docs/generators) and I'm getting serialization errors on simple string body responses.
def getHealth(): ApiRequestT[String] =
basicRequest
.method(Method.GET, uri"$baseUrl/health")
.contentType("application/json")
.response(asJson[String])
import org.openapitools.client.api.DefaultApi
import org.openapitools.client.core.ApiInvoker._
import org.openapitools.client.core.SttpSerializer
import sttp.client.HttpURLConnectionBackend
implicit val s = new SttpSerializer()
implicit val b = HttpURLConnectionBackend()
val api = DefaultApi(<my string url>)
api.getHealth().result
results in
org.openapitools.client.core.HttpException: [200] OK: Running...
The body is correct but a serialization error is thrown.
Hello ! I'm trying to use sttp with ZIO backend (2.2.4)but I end up having a runtime exception
─An unchecked error was produced.
║ java.lang.IncompatibleClassChangeError: Found interface zio.ZQueue, but class was expected
║ at zio.interop.reactivestreams.Adapters$.process(Adapters.scala:71)
║ at zio.interop.reactivestreams.Adapters$.$anonfun$publisherToStream$6(Adapters.scala:49)
║ at zio.ZManaged.$anonfun$flatMap$6(ZManaged.scala:315)
║ at zio.internal.FiberContext.evaluateNow(FiberContext.scala:815)
║ at zio.internal.FiberContext.$anonfun$evaluateLater$1(FiberContext.scala:687)
║ at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
║ at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
║ at java.lang.Thread.run(Thread.java:748)
I can't figure out why this is happening, does someone have any clue ? I unfortunately can't share the code but it's a really simple POST request I'm trying to make, followed by a Runtime.default.unsafeRunToFuture
Hello, everyone!
Got this error
value mapK is not a member of sttp.client.SttpBackend
snippet:
https://scastie.scala-lang.org/OgnPGIorTfGep6rj3wfl6A
I've imported import sttp.client.impl.cats.implicits._
but it doesn't help.
Hello. I've seen the logging backend in sttp v2. It's always nice to be able to remove some boilerplate in my code.
How can I log also the body of a response ?
The simplest way I found is to extend the Slf4jLoggingListener
:
(I need to be in this package because Logger is private)
package sttp.client.logging.slf4j
import sttp.client.listener.ListenerBackend
import sttp.client.logging.LogMessages.requestToString
import sttp.client.{Identity, Request, Response, SttpBackend}
object CustomSlf4jLoggingListener {
private val logger = new Logger("sttp.client.logging.slf4j.Slf4jLoggingBackend")
def apply[F[_], S, WS_HANDLER[_]](
delegate: SttpBackend[F, S, WS_HANDLER]
): SttpBackend[F, S, WS_HANDLER] =
ListenerBackend.lift(delegate, new CustomSlf4jLoggingListener(logger))
}
class CustomSlf4jLoggingListener(logger: Logger) extends Slf4jLoggingListener(logger) {
override def requestSuccessful(
request: Request[_, _],
response: Response[_],
tag: Unit
): Identity[Unit] =
if (response.isSuccess) {
logger.debug(response_(request, response))
} else {
logger.warn(response_(request, response))
}
def response_(request: Request[_, _], response: Response[_]): String =
s"For request: ${requestToString(request)}, got response: ${responseToString(response)}"
def responseToString(response: Response[_]): String =
s"${response.code}, body: ${response.body}"
}
Hi All, I am new to sttp and trying to learn few basics of it.
In one of the issues I am facing I have not been getting right json response for this code
val baseRequest: RequestT[Empty, Either[String, String], Nothing] = basicRequest.header("accept", "application/vnd.github.v3+json")
val url: Uri = uri"https://api.github.com/events"
sttpBackend
.send {
baseRequest
.readTimeout(config.http.timeout)
.get(url)
.response(asJson[Either[GitErrorResponse, List[CommitEvent]]])
}
.map[Either[Error, List[CommitEvent]]] {
response: Response[Either[ResponseError[CirceError], Either[GitErrorResponse, List[CommitEvent]]]] =>
response.translate
}
.handleError { throwable: Throwable =>
throwable.translate
}
compare that to this basic scala code its working as expected
val url = "https://api.github.com/events"
val result = scala.io.Source.fromURL(url).mkString
println(result)
Can someone help ?
fromMetadata
to the new signature. Can you provide an example for the new signature?def fromMetadata[T, S](f: ResponseMetadata => ResponseAs[T, S]): ResponseAs[T, S]
def fromMetadata[T, R](default: ResponseAs[T, R], conditions: ConditionalResponseAs[T, R]*)
17:58:15.191 | ERROR | scala-execution-context-global-14 | a8.hermes.ServerSentEventsFactory. | error with stream will restart @ Some(30)
java.lang.IllegalStateException: failed to create a child event loop
at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:88)
at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:58)
at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:47)
at io.netty.channel.MultithreadEventLoopGroup.<init>(MultithreadEventLoopGroup.java:59)
at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:86)
at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:81)
at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:68)
at org.asynchttpclient.netty.channel.NioTransportFactory.newEventLoopGroup(NioTransportFactory.java:32)
at org.asynchttpclient.netty.channel.NioTransportFactory.newEventLoopGroup(NioTransportFactory.java:21)
at org.asynchttpclient.netty.channel.ChannelManager.<init>(ChannelManager.java:133)
at org.asynchttpclient.DefaultAsyncHttpClient.<init>(DefaultAsyncHttpClient.java:92)
at sttp.client.asynchttpclient.AsyncHttpClientBackend$.defaultClient(AsyncHttpClientBackend.scala:380)
at sttp.client.asynchttpclient.monix.AsyncHttpClientMonixBackend$.$anonfun$apply$1(AsyncHttpClientMonixBackend.scala:76)
at monix.eval.internal.TaskRunLoop$.startLight(TaskRunLoop.scala:271)
at monix.eval.Task.runAsyncOptF(Task.scala:812)
at monix.eval.Task.runAsyncOpt(Task.scala:710)
at monix.eval.Task.runAsync(Task.scala:660)
at monix.reactive.internal.builders.TaskAsObservable.unsafeSubscribeFn(TaskAsObservable.scala:30)
at monix.reactive.internal.operators.ConcatMapObservable.unsafeSubscribeFn(ConcatMapObservable.scala:76)
at monix.reactive.internal.operators.OnErrorRecoverWithObservable.unsafeSubscribeFn(OnErrorRecoverWithObservable.scala:35)
at monix.reactive.internal.operators.LiftByOperatorObservable.unsafeSubscribeFn(LiftByOperatorObservable.scala:30)
at monix.reactive.observables.ChainedObservable$.$anonfun$subscribe$1(ChainedObservable.scala:74)
at monix.execution.internal.Trampoline.monix$execution$internal$Trampoline$$immediateLoop(Trampoline.scala:66)
at monix.execution.internal.Trampoline.startLoop(Trampoline.scala:32)
at monix.execution.schedulers.TrampolineExecutionContext$JVMOptimalTrampoline.startLoop(TrampolineExecutionContext.scala:132)
at monix.execution.internal.Trampoline.execute(Trampoline.scala:40)
at monix.execution.schedulers.TrampolineExecutionContext.execute(TrampolineExecutionContext.scala:57)
at monix.execution.schedulers.BatchingScheduler.execute(BatchingScheduler.scala:50)
at monix.execution.schedulers.BatchingScheduler.execute$(BatchingScheduler.scala:47)
at monix.execution.schedulers.AsyncScheduler.execute(AsyncScheduler.scala:31)
at monix.execution.schedulers.ExecuteExtensions.executeTrampolined(ExecuteExtensions.scala:86)
at monix.execution.schedulers.ExecuteExtensions.executeTrampolined$(ExecuteExtensions.scala:85)
at monix.execution.Scheduler$Extensions.executeTrampolined(Scheduler.scala:305)
at monix.reactive.observables.ChainedObservable$.subscribe(ChainedObservable.scala:69)
at monix.reactive.internal.operators.ConcatObservable.unsafeSubscribeFn(ConcatObservable.scala:37)
at monix.reactive.observables.ChainedObservable.unsafeSubscribeFn(ChainedObservable.scala:55)
at monix.reactive.internal.operators.OnErrorRecoverWithObservable$$anon$1.$anonfun$onError$1(OnErrorRecoverWithObservable.scala:57)
at monix.execution.Ack$$anon$1.run(Ack.scala:54)
at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: io.netty.channel.ChannelException: failed to open a new selector
at io.netty.channel.nio.NioEventLoop.openSelector(NioEventLoop.java:175)
at io.netty.channel.nio.NioEventLoop.<init>(NioEventLoop.java:142)
at io.netty.channel.nio.NioEventLoopGroup.newChild(NioEventLoopGroup.java:146)
at io.netty.channel.nio.NioEventLoopGroup.newChild(NioEventLoopGroup.java:37)
at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:84)
... 42 more
Caused by: java.io.IOException: Too many open files
at sun.nio.ch.IOUtil.makePipe(Native Method)
at sun.nio.ch.KQueueSelectorImpl.<init>(KQueueSelectorImpl.java:84)
at sun.nio.ch.KQueueSelectorProvider.openSelector(KQueueSelectorProvider.java:42)
at io.netty.channel.nio.NioEventLoop.openSelector(NioEventLoop.java:173)
... 46 more
val instance: Task[sttp.client.SttpBackend[Task, monix.reactive.Observable[java.nio.ByteBuffer], WebSocketHandler]] =
AsyncHttpClientMonixBackend().memoize
lazy val frames: Observable[WebSocketFrame.Incoming] =