Pask423 on fs2-core-3.2.12
Update fs2-core, fs2-io, ... to… (compare)
softwaremill-ci on scalafmt-core-3.5.9
Update scalafmt-core to 3.5.9 Reformat with scalafmt 3.5.9 E… Add 'Reformat with scalafmt 3.5… (compare)
adamw on v3.7.4
adamw on master
Release 3.7.4 (compare)
adamw on master
Split js tests into 2.11+2.12 a… (compare)
adamw on master
Increase memory for js tests (compare)
adamw on master
Increase memory for js tests (compare)
softwaremill-ci on fs2-core-3.2.12
Update fs2-core, fs2-io, ... to… (compare)
adamw on v3.7.3
adamw on master
Release 3.7.3 (compare)
mergify[bot] on zio-web-sockets-public
val instance: Task[sttp.client.SttpBackend[Task, monix.reactive.Observable[java.nio.ByteBuffer], WebSocketHandler]] =
AsyncHttpClientMonixBackend().memoize
lazy val frames: Observable[WebSocketFrame.Incoming] =
lazy val frames: Observable[WebSocketFrame.Incoming] = {
Observable.unsafeCreate[WebSocketFrame.Incoming] { subscriber =>
val cancelable = AssignableCancelable.multi()
def feedOnNext(): Unit = {
val receiveT: Task[(Either[WebSocketEvent.Close, WebSocketFrame.Incoming], WebSocket[Task])] =
webSocketT
.flatMap { webSocket =>
webSocket
.receive
.map(_ -> webSocket)
}
cancelable :=
receiveT
.runAsync {
case Left(error) =>
subscriber.onError(error)
case Right((Left(close), _)) =>
if ( logger.isTraceEnabled )
logger.trace(s"received close ${close}")
subscriber.onComplete()
case Right((Right(frame), webSocket)) =>
subscriber
.onNext(frame)
.syncOnContinue(feedOnNext())
.syncOnStopOrFailure {
case Some(th) =>
logger.error(s"error processing ${frame}", th)
feedOnNext()
case None =>
if ( logger.isTraceEnabled)
logger.trace(s"stop received closing WebSocket")
webSocket.close.runAsyncAndForget
}
}
}
feedOnNext()
cancelable
}
}
lazy val frames: Observable[WebSocketFrame.Incoming] = {
val receiveT: Task[(Either[WebSocketEvent.Close, WebSocketFrame.Incoming], WebSocket[Task])] =
webSocketT
.flatMap { webSocket =>
webSocket
.receive
.map(_ -> webSocket)
}
val receiveForObservableT: Task[(Either[WebSocketEvent.Close, WebSocketFrame.Incoming], Unit)] = receiveT.map(t => (t._1, ()))
val observable: Observable[Either[WebSocketEvent.Close, WebSocketFrame.Incoming]] = Observable.fromAsyncStateAction[Unit,Either[WebSocketEvent.Close, WebSocketFrame.Incoming]](_ => receiveForObservableT)(())
observable
.takeWhile(_.isRight)
.collect {
case Right(f) =>
f
}
}
Map[String, String]
for each case class?
Hello, I am trying client3 with zio (scala 2.13.3 and zio 1.0.3, sttp 3.0.0-RC5), following the example here https://sttp.softwaremill.com/en/latest/examples.html (GET and parse JSON using the ZIO async-http-client backend and circe), I get at runtime
[info] Fiber failed.
[info] An unchecked error was produced.
[info] java.lang.Error: Defect in zio.Has: Set(SttpBackend[=λ %2:0 → ZIO[-Any,+Throwable,+2:0],+{ZioStreams & package::WebSockets}]) statically known to be contained within the environment are missing
[info] at zio.Has$HasSyntax$.prune$extension(Has.scala:197)
[info] at zio.Has$HasSyntax$.union$extension(Has.scala:209)
[info] at zio.Has$Union$$anon$2.union(Has.scala:91)
[info] at zio.Has$Union$$anon$2.union(Has.scala:89)
Anything I am missing ? when I am not using layer, and just directly the backend its working properly.
statusCode
as one of the label for metric. But I noticed that if request fails with DeserializationError, all metadata of response (headers, status code and other) will be lost.effectBlocking
. I think not as otherwise Blocking
would be simply required by sttp-zio integration. I guess STTP has it's own execution context, but maybe it's still good to tell ZIO this is a blocking operation?
asStreamUnsafe
? Obviously this is backend/stream-specific, but some hints would help.httpclient-backend-fs2
, get IO[Stream[IO, T]]
from sttp, add some in-flight transformations, stream results in http response via http4s
. I think this scenario cannot be expressed with safe asStream
.
def webSocketFramePipe: Flow[WebSocketFrame.Data[_], WebSocketFrame, NotUsed] = Flow[WebSocketFrame.Data[_]].map{
case o @ WebSocketFrame.Text(str, _, _) =>
println(s"received string: $str")
case other =>
println(s"received other: $other")
other
}
val inputSrc: Source[ByteString, Any] = ... // ie Seq("test1", "test")
basicRequest
.get(uri)
.streamBody(AkkaStreams)(inputSrc)
.response(asWebSocketStreamAlways(AkkaStreams)(webSocketFramePipe))
.send(backend)
.onComplete(_ => backend.close())
HI… I am trying to use PrometheusBackend with Cats backend but it looks like it is not supported. It works fine with Monix backend.
AsyncHttpClientMonixBackend().map(backend => PrometheusBackend(backend)) // OK
AsyncHttpClientCatsBackend[cats.effect.IO]().map(backend => PrometheusBackend(backend))
error: type mismatch;
found : sttp.client.SttpBackend[cats.effect.IO,Nothing,sttp.client.asynchttpclient.WebSocketHandler]
required: sttp.client.SttpBackend[[+A]cats.effect.IO[A],S,[WS_RESULT]sttp.client.asynchttpclient.WebSocketHandler[WS_RESULT]]
Error occurred in an application involving default arguments.
sttp.client.SttpBackend[cats.effect.IO,Nothing,sttp.client.asynchttpclient.WebSocketHandler] <: sttp.client.SttpBackend[[+A]cats.effect.IO[A],S,[WS_RESULT]sttp.client.asynchttpclient.WebSocketHandler[WS_RESULT]]?
false
Am I missing something?
private def apply(
client: HttpClient,
closeClient: Boolean,
customizeRequest: HttpRequest => HttpRequest,
customEncodingHandler: ZioEncodingHandler
): SttpBackend[Task, ZioStreams with WebSockets] =
private def make(
actorSystem: ActorSystem,
ec: ExecutionContext,
terminateActorSystemOnClose: Boolean,
options: SttpBackendOptions,
customConnectionPoolSettings: Option[ConnectionPoolSettings],
http: AkkaHttpClient,
customizeRequest: HttpRequest => HttpRequest,
customizeWebsocketRequest: WebSocketRequest => WebSocketRequest = identity,
customEncodingHandler: EncodingHandler = PartialFunction.empty
Sec-WebSocket-Protocol
results in an Illegal Header
exception
val wsBuilder = client.newWebSocketBuilder()
client.connectTimeout().map[java.net.http.WebSocket.Builder](wsBuilder.connectTimeout(_))
request.headers.foreach(h => wsBuilder.header(h.name, h.value))
val cf = wsBuilder
.buildAsync(request.uri.toJavaUri, listener)
.thenApply[Unit](_ => ())
.exceptionally(t => cb(Left(t)))
Canceler(() => cf.cancel(true))
})
wsBuilder.subprotocols
afaik
SttpBackend
trait, the documentation for send
states Type parameters: R – The capabilities required by the request. This must be a subset of the the capabilities supported by the backend (which always includes Effect[F]).
, which makes sense. However when I'm looking at AkkaHttpBackend
, the type returned is SttpBackend[Future, AkkaStreams with WebSockets]
, so Effect[Future]
is not included. Shouldn't that be there?
I'm currently idiot-testing 3.0.0's akka websockets, so I created a minimal build.sbt and copy pasted this example:
https://sttp.softwaremill.com/en/latest/examples.html#open-a-websocket-using-akka
It doesn't compile! It complains:
Symbol 'type akka.event.LoggingAdapter' is missing from the classpath.
This symbol is required by 'value sttp.client3.akkahttp.AkkaHttpBackend.customLog'.
Make sure that type LoggingAdapter is in your classpath and check for conflicting dependencies with `-Ylog-classpath`.
The akka documentation says for logging you need: "com.typesafe.akka" %% "akka-actor" % "2.6.10"
However, I saw that sttp3's akka-http-backend has this listed as provided
so I went with that instead: "com.typesafe.akka" %% "akka-stream" % "2.6.10"
It compiles, but I get 0 output when I run the example. No error, no nothing.
build.sbt:
name := "meep"
scalaVersion := "2.13.4"
libraryDependencies ++= Seq(
"com.softwaremill.sttp.client3" %% "core" % "3.0.0",
"com.softwaremill.sttp.client3" %% "akka-http-backend" % "3.0.0",
"com.typesafe.akka" %% "akka-stream" % "2.6.10"
)