Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
    DarkWingMcQuack
    @DarkWingMcQuack
    what also happens is that this code deadlocks from time to time
    or at least it does not continue to output stuff
    Paolo Boni
    @paoloboni
    Perhaps worth reviewing the recursion behind that function
    DarkWingMcQuack
    @DarkWingMcQuack
    yeah, i am currently looking if the call to .last is the problem
    Paolo Boni
    @paoloboni
    When loneElement case, the new query is based on closeTime, otherwise openTime, do you think that’s ok?
    DarkWingMcQuack
    @DarkWingMcQuack
    in the getKLinesmethod?
    it's an aesthetic difference, but maybe it makes sense
    1 reply
    DarkWingMcQuack
    @DarkWingMcQuack
    my guess currently is that there is something wrong with my stream thing that i am doing there
    Paolo Boni
    @paoloboni
    actually I'm looking carefully at your snippet only now, what's the intent of your code exactly? couldn't you source the stream with a single call to the api?
    oh, I guess you want to keep moving ahead in the future
    DarkWingMcQuack
    @DarkWingMcQuack
    yeah :D
    here is an updated version which seems to work:
    Stream
        .eval(Clock[F].realTime)
        .map(_.toMillis)
        .map(currentTime =>
          KLines(
            market.binanceSymbol,
            Interval.`1m`,
            Instant.ofEpochMilli(currentTime - 60000),
            Instant.ofEpochMilli(currentTime + 1000),
            1
          )
        )
        .flatMap(x => client.getKLines(x))
        .repeat
        .map(_.toOpenClosedDatapoint)
        .zipWithNext
        .collect {
          case (current, Some(next)) if current._1 < next._1 => current
        }
        .meteredStartImmediately(55.seconds)
        .evalTap(x =>
          for {
            time <- Clock[F].realTime.map(_.toMillis)
            _    <- Console[F].println(s"${time} - ${x._2.time} - (${x._2.time - time})")
          } yield ()
        )
    9 replies
    Paolo Boni
    @paoloboni
    Yes, I thought about implementing the automatic reconnect in the client, but I thought it was easier to do in the client code, otherwise having the reconnect in the client may hide other kind of connectivity issues
    DarkWingMcQuack
    @DarkWingMcQuack
    couldnt we pass a parameter reconnectAfterwhich is a duration after which the websocket reconects itself?
    i read the binance doc about the websockets are we doing this ping stuff every 10 minutes?
    Paolo Boni
    @paoloboni
    Yes, that’s handled by the client
    DarkWingMcQuack
    @DarkWingMcQuack
    after 24h will the stream just hang or will there some error?
    Paolo Boni
    @paoloboni
    The stream will be completed, so the consumer is notified about the completion and can handle it. Not sure weather that’s going to be a graceful one or not
    DarkWingMcQuack
    @DarkWingMcQuack
    so i could just call repeat?
    Paolo Boni
    @paoloboni
    It’s probably something you can reproduce locally, by chancing one of the integration tests and have the server to close the connection. I’m happy to try when I’m back to the keyboard
    Repeat would work only if the stream is completed without failure
    DarkWingMcQuack
    @DarkWingMcQuack
    will test it tomorrow, my gf is getting mad for coding all the day instead of doing something with her
    4 replies
    Paolo Boni
    @paoloboni
    I'm not sure if the disconnection will cause a failed effect. I think what you can do is (both things):
    • use repeat
    • use handleWithError and recursively re-create the stream if something goes wrong, but this may swallow other kind of errors too
    Swoorup Joshi
    @Swoorup
    It would be nice to expose a heartbeat stream for that purpose
    4 replies
    if disconnection happen user will handle themselves if they didn't receive a heartbeat
    Swoorup Joshi
    @Swoorup
    btw I added tradeStream to spotApi @paoloboni
    4 replies
    Swoorup Joshi
    @Swoorup
    image.png
    24 replies
    after scalaFmtAll
    Swoorup Joshi
    @Swoorup
    image.png
    this is even after I let them format everything.
    odd
    Swoorup Joshi
    @Swoorup
    btw do you know the default update rate for diff streams?
    3 replies
    Paolo Boni
    @paoloboni
    Thanks @Swoorup for the contribution, your change has been released as part of v1.3.1
    Swoorup Joshi
    @Swoorup
    Looks like I would need orderbook snapshot endpoint
    4 replies
    Swoorup Joshi
    @Swoorup
    You could use Union singleton types to constrain those Ints like the following in the future. Only thing is circe can't derive them I think
    type SupportedDepthLimit = 5 |10 | 20 | 50 | 100 | 500 | 1000 | 5000
    20 replies
    Swoorup Joshi
    @Swoorup
    02:32:47.977 [AsyncHttpClient-3-3] WARN  i.n.u.c.SingleThreadEventExecutor - Unexpected exception from an event executor: 
    java.lang.OutOfMemoryError: Java heap space
    [error] java.lang.OutOfMemoryError: Java heap space
    [error]         at io.circe.numbers.BiggerDecimal$.parseBiggerDecimalUnsafe(BiggerDecimal.scala:391)
    [error]         at io.circe.JsonNumber$.fromString(JsonNumber.scala:249)
    [error]         at io.circe.Decoder$$anon$30.apply(Decoder.scala:882)
    [error]         at io.circe.Decoder.tryDecode(Decoder.scala:70)
    [error]         at io.circe.Decoder.tryDecode$(Decoder.scala:50)
    [error]         at io.circe.Decoder$DecoderWithFailure.tryDecode(Decoder.scala:465)
    [error]         at io.circe.DerivedDecoder.decodeWith(Derivation.scala:256)
    [error]         at io.circe.DerivedDecoder.decodeWith$(Derivation.scala:252)
    [error]         at io.github.paoloboni.binance.spot.SpotApi$$anon$43.decodeWith(SpotApi.scala:57)
    [error]         at io.circe.DerivedDecoder$$anon$1.next(Derivation.scala:266)
    [error]         at io.circe.DerivedDecoder$$anon$1.next(Derivation.scala:265)
    [error]         at io.github.paoloboni.binance.spot.SpotApi$$anon$43.apply(SpotApi.scala:275)
    [error]         at io.circe.Decoder.decodeJson(Decoder.scala:88)
    [error]         at io.circe.Decoder.decodeJson$(Decoder.scala:50)
    [error]         at io.github.paoloboni.binance.spot.SpotApi$$anon$43.decodeJson(SpotApi.scala:57)
    [error]         at io.circe.Parser.finishDecode(Parser.scala:12)
    [error]         at io.circe.Parser.finishDecode$(Parser.scala:6)
    [error]         at io.circe.parser.package$.finishDecode(package.scala:5)
    [error]         at io.circe.Parser.decode(Parser.scala:27)
    [error]         at io.circe.Parser.decode$(Parser.scala:6)
    [error]         at io.circe.parser.package$.decode(package.scala:5)
    [error]         at io.github.paoloboni.http.HttpClient.webSocketFramePipe$3$$anonfun$3$$anonfun$3(HttpClient.scala:95)
    [error]         at io.github.paoloboni.http.HttpClient$$Lambda$18818/0x0000000804c99040.apply(Unknown Source)
    [error]         at fs2.Stream.evalMap$$anonfun$1(Stream.scala:933)
    [error]         at fs2.Stream$$Lambda$18253/0x0000000804aaa840.apply(Unknown Source)
    [error]         at fs2.Stream.flatMap$$anonfun$1(Stream.scala:1150)
    [error]         at fs2.Stream$$Lambda$18035/0x00000008049e0040.apply(Unknown Source)
    [error]         at fs2.Pull$FlatMapR$1.go$1(Pull.scala:1022)
    [error]         at fs2.Pull$FlatMapR$1.unconsed(Pull.scala:1030)
    [error]         at fs2.Pull$FlatMapR$1.out(Pull.scala:1039)
    [error]         at fs2.Pull$.go$2$$anonfun$1(Pull.scala:1198)
    [error]         at fs2.Pull$$$Lambda$18091/0x0000000804a15c40.apply(Unknown Source)
    java.lang.InterruptedException
            at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1343)
            at scala.concurrent.impl.Promise$DefaultPromise.tryAwait0(Promise.scala:207)
            at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:225)
            at scala.concurrent.Await$.$anonfun$result$1(package.scala:201)
            at cats.effect.unsafe.WorkerThread.blockOn(WorkerThread.scala:461)
            at scala.concurrent.Await$.result(package.scala:124)
            at cats.effect.std.DispatcherPlatform.unsafeRunTimed(DispatcherPlatform.scala:61)
            at cats.effect.std.DispatcherPlatform.unsafeRunTimed$(DispatcherPlatform.scala:24)
            at cats.effect.std.Dispatcher$$anon$2.unsafeRunTimed(Dispatcher.scala:190)
            at cats.effect.std.DispatcherPlatform.unsafeRunSync(DispatcherPlatform.scala:52)
            at cats.effect.std.DispatcherPlatform.unsafeRunSync$(DispatcherPlatform.scala:24)
            at cats.effect.std.Dispatcher$$anon$2.unsafeRunSync(Dispatcher.scala:190)
            at sttp.client3.impl.fs2.Fs2SimpleQueue.offer(Fs2SimpleQueue.scala:19)
            at sttp.client3.asynchttpclient.WebSocketImpl.send$$anonfun$1(WebSocketImpl.scala:53)
            at cats.effect.IO$.$anonfun$1(IO.scala:934)
            at cats.effect.IOFiber.runLoop(IOFiber.scala:381)
            at cats.effect.IOFiber.asyncContinueSuccessfulR(IOFiber.sca
    I encountered an OOM issue after running the stream trades for quite some time. While I have dived deeper into the issue, have anybody encountered similar issues?
    14 replies
    Swoorup Joshi
    @Swoorup
    *haven't
    Swoorup Joshi
    @Swoorup
    image.png
    Swoorup Joshi
    @Swoorup
    image.png
    For a simple code like the following:
    import cats.effect.{ExitCode, IO, IOApp}
    import fs2.Stream
    import io.github.paoloboni.binance.BinanceClient
    import io.github.paoloboni.binance.common.SpotConfig
    import org.typelevel.log4cats.Logger
    import org.typelevel.log4cats.slf4j.Slf4jLogger
    import scala.concurrent.duration.DurationInt
    
    object PriceMonitor extends IOApp {
      val config = SpotConfig.Default[IO](
        apiKey = "***",
        apiSecret = "***"
      )
      override def run(args: List[String]): IO[ExitCode] = {
        implicit def log: Logger[IO] = Slf4jLogger.getLogger[IO]
        BinanceClient
          .createSpotClient[IO](config)
          .use { client =>
            for {
              btc_trades <- client.tradeStreams("btcusdt").evalMap(trade => IO.println(s"TRADE: $trade")).compile.drain.start
              btc_ob <- client.diffDepthStream("btcusdt").evalMap(trade => IO.println(s"DEPTH: $trade")).compile.drain.start
    
              bch_trades <- client.tradeStreams("bchusdt").evalMap(trade => IO.println(s"TRADE: $trade")).compile.drain.start
              bch_ob <- client.diffDepthStream("bchusdt").evalMap(trade => IO.println(s"DEPTH: $trade")).compile.drain.start
    
              xrp_trades <- client.tradeStreams("xrpusdt").evalMap(trade => IO.println(s"TRADE: $trade")).compile.drain.start
              xrp_ob <- client.diffDepthStream("xrpusdt").evalMap(trade => IO.println(s"DEPTH: $trade")).compile.drain.start
    
              _ <- IO.readLine
    
              _ <- btc_trades.cancel
              _ <- btc_ob.cancel
    
              _ <- bch_trades.cancel
              _ <- bch_ob.cancel
    
              _ <- xrp_trades.cancel
              _ <- xrp_ob.cancel
    
              _ <- btc_trades.join
              _ <- btc_ob.join
    
              _ <- bch_trades.join
              _ <- bch_ob.join
    
              _ <- xrp_trades.join
              _ <- xrp_ob.join
            } yield ()
          }
          .redeem(
            { t =>
              log.error(t)("Something went wrong")
              ExitCode(1)
            },
            _ => ExitCode.Success
          )
      }
    }
    Estimating that you'd run out of memory in around 3 hours of the stream
    the downspikes in the heap are caused by GC, each time the lowest baseline memory being higher.
    I might let it run of a while and will create a GH issue with the dump attached. idk if GH allows attaching big dump files.
    once I run into oom
    Swoorup Joshi
    @Swoorup
    btw what do you think about moving discussions to DIscord, most of typelevel have moved there. Was discussing a fair bit about this leak in the cats effect channel, but still couldn’t find the root cause
    2 replies
    Paolo Boni
    @paoloboni
    This chat has been moved to Discord: https://discord.gg/7KrBehYs55