mergify[bot] on scribe-3.10.2
mergify[bot] on master
Update scribe to 3.10.2 Merge pull request #1518 from s… (compare)
softwaremill-ci on scribe-3.10.2
Update scribe to 3.10.2 (compare)
val instance: Task[sttp.client.SttpBackend[Task, monix.reactive.Observable[java.nio.ByteBuffer], WebSocketHandler]] =
AsyncHttpClientMonixBackend().memoize
lazy val frames: Observable[WebSocketFrame.Incoming] =
lazy val frames: Observable[WebSocketFrame.Incoming] = {
Observable.unsafeCreate[WebSocketFrame.Incoming] { subscriber =>
val cancelable = AssignableCancelable.multi()
def feedOnNext(): Unit = {
val receiveT: Task[(Either[WebSocketEvent.Close, WebSocketFrame.Incoming], WebSocket[Task])] =
webSocketT
.flatMap { webSocket =>
webSocket
.receive
.map(_ -> webSocket)
}
cancelable :=
receiveT
.runAsync {
case Left(error) =>
subscriber.onError(error)
case Right((Left(close), _)) =>
if ( logger.isTraceEnabled )
logger.trace(s"received close ${close}")
subscriber.onComplete()
case Right((Right(frame), webSocket)) =>
subscriber
.onNext(frame)
.syncOnContinue(feedOnNext())
.syncOnStopOrFailure {
case Some(th) =>
logger.error(s"error processing ${frame}", th)
feedOnNext()
case None =>
if ( logger.isTraceEnabled)
logger.trace(s"stop received closing WebSocket")
webSocket.close.runAsyncAndForget
}
}
}
feedOnNext()
cancelable
}
}
lazy val frames: Observable[WebSocketFrame.Incoming] = {
val receiveT: Task[(Either[WebSocketEvent.Close, WebSocketFrame.Incoming], WebSocket[Task])] =
webSocketT
.flatMap { webSocket =>
webSocket
.receive
.map(_ -> webSocket)
}
val receiveForObservableT: Task[(Either[WebSocketEvent.Close, WebSocketFrame.Incoming], Unit)] = receiveT.map(t => (t._1, ()))
val observable: Observable[Either[WebSocketEvent.Close, WebSocketFrame.Incoming]] = Observable.fromAsyncStateAction[Unit,Either[WebSocketEvent.Close, WebSocketFrame.Incoming]](_ => receiveForObservableT)(())
observable
.takeWhile(_.isRight)
.collect {
case Right(f) =>
f
}
}
Map[String, String]
for each case class?
Hello, I am trying client3 with zio (scala 2.13.3 and zio 1.0.3, sttp 3.0.0-RC5), following the example here https://sttp.softwaremill.com/en/latest/examples.html (GET and parse JSON using the ZIO async-http-client backend and circe), I get at runtime
[info] Fiber failed.
[info] An unchecked error was produced.
[info] java.lang.Error: Defect in zio.Has: Set(SttpBackend[=λ %2:0 → ZIO[-Any,+Throwable,+2:0],+{ZioStreams & package::WebSockets}]) statically known to be contained within the environment are missing
[info] at zio.Has$HasSyntax$.prune$extension(Has.scala:197)
[info] at zio.Has$HasSyntax$.union$extension(Has.scala:209)
[info] at zio.Has$Union$$anon$2.union(Has.scala:91)
[info] at zio.Has$Union$$anon$2.union(Has.scala:89)
Anything I am missing ? when I am not using layer, and just directly the backend its working properly.
statusCode
as one of the label for metric. But I noticed that if request fails with DeserializationError, all metadata of response (headers, status code and other) will be lost.effectBlocking
. I think not as otherwise Blocking
would be simply required by sttp-zio integration. I guess STTP has it's own execution context, but maybe it's still good to tell ZIO this is a blocking operation?
asStreamUnsafe
? Obviously this is backend/stream-specific, but some hints would help.httpclient-backend-fs2
, get IO[Stream[IO, T]]
from sttp, add some in-flight transformations, stream results in http response via http4s
. I think this scenario cannot be expressed with safe asStream
.
def webSocketFramePipe: Flow[WebSocketFrame.Data[_], WebSocketFrame, NotUsed] = Flow[WebSocketFrame.Data[_]].map{
case o @ WebSocketFrame.Text(str, _, _) =>
println(s"received string: $str")
case other =>
println(s"received other: $other")
other
}
val inputSrc: Source[ByteString, Any] = ... // ie Seq("test1", "test")
basicRequest
.get(uri)
.streamBody(AkkaStreams)(inputSrc)
.response(asWebSocketStreamAlways(AkkaStreams)(webSocketFramePipe))
.send(backend)
.onComplete(_ => backend.close())
HI… I am trying to use PrometheusBackend with Cats backend but it looks like it is not supported. It works fine with Monix backend.
AsyncHttpClientMonixBackend().map(backend => PrometheusBackend(backend)) // OK
AsyncHttpClientCatsBackend[cats.effect.IO]().map(backend => PrometheusBackend(backend))
error: type mismatch;
found : sttp.client.SttpBackend[cats.effect.IO,Nothing,sttp.client.asynchttpclient.WebSocketHandler]
required: sttp.client.SttpBackend[[+A]cats.effect.IO[A],S,[WS_RESULT]sttp.client.asynchttpclient.WebSocketHandler[WS_RESULT]]
Error occurred in an application involving default arguments.
sttp.client.SttpBackend[cats.effect.IO,Nothing,sttp.client.asynchttpclient.WebSocketHandler] <: sttp.client.SttpBackend[[+A]cats.effect.IO[A],S,[WS_RESULT]sttp.client.asynchttpclient.WebSocketHandler[WS_RESULT]]?
false
Am I missing something?
private def apply(
client: HttpClient,
closeClient: Boolean,
customizeRequest: HttpRequest => HttpRequest,
customEncodingHandler: ZioEncodingHandler
): SttpBackend[Task, ZioStreams with WebSockets] =
private def make(
actorSystem: ActorSystem,
ec: ExecutionContext,
terminateActorSystemOnClose: Boolean,
options: SttpBackendOptions,
customConnectionPoolSettings: Option[ConnectionPoolSettings],
http: AkkaHttpClient,
customizeRequest: HttpRequest => HttpRequest,
customizeWebsocketRequest: WebSocketRequest => WebSocketRequest = identity,
customEncodingHandler: EncodingHandler = PartialFunction.empty
Sec-WebSocket-Protocol
results in an Illegal Header
exception
val wsBuilder = client.newWebSocketBuilder()
client.connectTimeout().map[java.net.http.WebSocket.Builder](wsBuilder.connectTimeout(_))
request.headers.foreach(h => wsBuilder.header(h.name, h.value))
val cf = wsBuilder
.buildAsync(request.uri.toJavaUri, listener)
.thenApply[Unit](_ => ())
.exceptionally(t => cb(Left(t)))
Canceler(() => cf.cancel(true))
})
wsBuilder.subprotocols
afaik
SttpBackend
trait, the documentation for send
states Type parameters: R – The capabilities required by the request. This must be a subset of the the capabilities supported by the backend (which always includes Effect[F]).
, which makes sense. However when I'm looking at AkkaHttpBackend
, the type returned is SttpBackend[Future, AkkaStreams with WebSockets]
, so Effect[Future]
is not included. Shouldn't that be there?
I'm currently idiot-testing 3.0.0's akka websockets, so I created a minimal build.sbt and copy pasted this example:
https://sttp.softwaremill.com/en/latest/examples.html#open-a-websocket-using-akka
It doesn't compile! It complains:
Symbol 'type akka.event.LoggingAdapter' is missing from the classpath.
This symbol is required by 'value sttp.client3.akkahttp.AkkaHttpBackend.customLog'.
Make sure that type LoggingAdapter is in your classpath and check for conflicting dependencies with `-Ylog-classpath`.
The akka documentation says for logging you need: "com.typesafe.akka" %% "akka-actor" % "2.6.10"
However, I saw that sttp3's akka-http-backend has this listed as provided
so I went with that instead: "com.typesafe.akka" %% "akka-stream" % "2.6.10"
It compiles, but I get 0 output when I run the example. No error, no nothing.
build.sbt:
name := "meep"
scalaVersion := "2.13.4"
libraryDependencies ++= Seq(
"com.softwaremill.sttp.client3" %% "core" % "3.0.0",
"com.softwaremill.sttp.client3" %% "akka-http-backend" % "3.0.0",
"com.typesafe.akka" %% "akka-stream" % "2.6.10"
)
Playing this game with the monix example, I get this SSL exception when running:
runMain sttp.client3.examples.WebSocketMonix
[info] running sttp.client3.examples.WebSocketMonix
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[error] (run-main-4) sttp.client3.SttpClientException$ConnectException: Exception when sending request: GET wss://echo.websocket.org
[error] sttp.client3.SttpClientException$ConnectException: Exception when sending request: GET wss://echo.websocket.org
[error] Caused by: java.net.ConnectException: handshake timed out after 10000ms
[error] Caused by: io.netty.handler.ssl.SslHandshakeTimeoutException: handshake timed out after 10000ms
(I've omitted the stack trace elements in between due to gitter's message limits)