Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Berke Sökhan
@berkesokhan
ah ofcourse answer os always the flatMap, lol
sorry having hard time with Scala monix combo here :D
Piotr Gawryś
@Avasil
The example uses buffering before commit to commit in batches instead one by one
but if you need to commit after each message then you can just do it early
Berke Sökhan
@berkesokhan
got it, interesting so if I use the example I don't need to do runtofuture? does it really executes my business function and start listening ? (I mean the previous example)
Piotr Gawryś
@Avasil
commitSync() returns Task[Unit] so if you want to do it after runBusinessStuff then yeah, you need flatMap :D
You need it, I misread earlier
Observable and Task are lazily executed, you need to run them at some point to start execution
for Observable you can use subscribe() or convert it to Task with toListL, completedL, consumeWith etc
and then run Task
Berke Sökhan
@berkesokhan
hmm the snippet you gave still has the same error
Piotr Gawryś
@Avasil
what's the signature of registerSchedule?
Berke Sökhan
@berkesokhan
at registerSchedule
oh wait I don't need to call it
that why it was complaining sorry mixed the monix schedulers with our own schedulers
So my latest is this
KafkaConsumerObservable
          .manualCommit[String, Array[Byte]](kafkaConsumerConfig, List(listenerConfig.topic))
          .map(message => Tuple3(message.record.value(), message.record.key(), message.committableOffset))
          .mapEval {
            case Tuple3(value, key, offset) =>
              executeBusinessLogic
                .runBusinessStuff(BusinessObjectAssembler.getBusinessObject(value, key))
                .flatMap(result => offset.commitSync().map(_ => result))
          }
and this is enough for actually starting listening on kafka topic and processing my business logic and committing each time, right?
Piotr Gawryś
@Avasil
That + running Observable or passing it somewhere that will run it eventually
if you can return Observable[Unit] then you can also remove map(_ => result)
and Tuple3 should be redundant, (x, y, z) is enough
Berke Sökhan
@berkesokhan
ok removed the Tuple3 at case and it still compiles
but how to run this observable that returned at the end? (and Observable[Unit] should be ok)
observable.cunsumeWith() ?
Piotr Gawryś
@Avasil
you can call subscribe() and it will run asynchronously in the background and return Cancelable which you can call if you want to stop the streaming
consumeWith(Consumer.XYZ) have various options for Task conversion
and methods ending with L also can return a Task, e.g. observable.completedL returns Task[Unit] on which you can call runToFuture, runSyncUnsafe etc
Berke Sökhan
@berkesokhan
hmm subscribe looks perfect
even w/o assigning the whole return value to an obejct I can call subscribe(), but since there won't a ref to hold onto it, would GC collect this eventually?
Piotr Gawryś
@Avasil
AFAIK it will be active on some thread so it won't completely disappear unless it crashes
Berke Sökhan
@berkesokhan
Got it. Thanks a ton Piotr. I ma still trying to learn FP Scala and Monix and you have helped me enormously.
Berke Sökhan
@berkesokhan
It worked and passed my integration/service tests, I will talk with my company to get permission to contribute to the docs as an alternative example, thanks a lot again!
Piotr Gawryś
@Avasil
Happy to help and the docs would be very appreciated!
Conner Bryan
@cab
what would be the idiomatic way to create a connection in a Subject? an MVar, or a cats-effect Deferred? The use case is websockets -- I saw the monix-sample version, but I also need to send messages to the socket. and would it be right to create a class (i.e. class WebsocketSubject extends Subject for this?) sorry for the barrage of questions, still learning how to compose all this stuff together :)
Oleg Pyzhcov
@oleg-py
@cab Subject is side-effecting, so there's not really an idiomatic version
Conner Bryan
@cab
ah gotcha
Piotr Gawryś
@Avasil
It doesn't mean it can't be done nicely :D What do you use for websockets?
Conner Bryan
@cab
i'm using asynchttpclient, as far as i could tell http4s doesn't have ws client support. this is what i have so far, and it seems to work pretty well (some methods are still stubbed though) -- https://gist.github.com/cab/3b628f54b4a017d9460b71961ba4bd9a
Oleg Pyzhcov
@oleg-py
Actually, in your case I would probably expose a factory method that returns Resource[Task, Subject[I, O]] - create socket once in acquisition and use it in all things you pass around, and close on finalization. Though, in this case - as your code has ??? - there's no meaningful semantic for e.g. onError, and none for onComplete if you're sharing the output, so you might just use something like type Sockety[I, O] = (Observable[O], I => Task[Unit]) instead
(which is how I use websockets on JS client, btw, except it's fs2.Stream and parametric in F)
Conner Bryan
@cab
oh interesting, i'll give that a try. thank you!
Oleg Pyzhcov
@oleg-py
I mean, you might use a trait instead of a type alias, but the idea is that you have a pair - a stream of data coming and a way to send it back.
Fayi Femi-Balogun
@fayimora
https://github.com/monix/monix-kafka <— Is this “production-ready”? I’m looking for a library to build a proof-of-concept using Kafka. I’m new to kafka so I cant tell if the lib provides everything I need.🙈
Piotr Gawryś
@Avasil
Yes. I can't promise it will provide everything you need, it's rather small and production-driven library but I've used it successfully in production and know about many other projects which did the same. If you end up using monix-kafka and see something missing then feel free to open an issue and perhaps it could be easily added
Carlos Eduardo Melo
@cemelo
@oleg-py you mentioned that you created some sort of async stack tracer using a custom class loader. Do you mind sharing how you did that?
Oleg Pyzhcov
@oleg-py

@cemelo I don't remember exactly what the hell I was thinking, but our app was built using Task directly (not tagless, no monad transformers returned), so it's pretty easy to figure out which methods need tracing - those that are not abstract and return monix.eval.Task (post-erasure). So I altered methods on classload time with javassist to call a globally accessible method with current method name and returned task (yay pure values):

    override def onLoad(pool: ClassPool, classname: String): Unit = {
      if (classname.startsWith("appname.") && !classname.startsWith("appname.util.logging")) {
        val cls = pool.get(classname)
        for {
          m <- cls.getDeclaredMethods
          if m.getReturnType.getName == "monix.eval.Task"
          if !m.getName.contains("$anonfun$")
          if !Modifier.isAbstract(m.getModifiers)
        } {
          val mangledName = cls.getName.replace('.', '$')
          val traceName = m.getLongName.replace(mangledName, "lambda")
          m.insertAfter("return " +
            "appname.util.logging.Tracer$.MODULE$.wrap(" +
            '"' + traceName + '"' +
            ", $_);")
        }
      }

(appname.util.logging is where that method itself + our MDC replacement lived - you can use a classloader to swap them instead of reflection, and I didn't want those touched for obvious reasons)

Then I basically built a queue of method state exits and shoved it into Local:

object Tracer {
  sealed trait Trace

  case class Push(name: String) extends Trace
  case object Pop extends Trace
  case class Err(name: String, reason: Throwable) extends Trace
  case object Cancelled extends Trace

  private[this] val local = Local(Queue.empty[Trace])

  private[this] def trace(el: Trace) =
    Task.eval(local := local.get :+ el)

  def wrap[A](methodName: String, ta: Task[A]): Task[A] = {
    for {
      _ <- trace(Push(methodName))
      r <- ta
        .doOnFinish {
          case Some(ex) => trace(Err(methodName, ex))
          case None => trace(Pop)
        }
    } yield r
  }

  def current(): Queue[Trace] = local.get
}

It isn't a stack trace that you get, it's actually a full call tree - though it obviously needs more work if you're using concurrent ops - our app wasn't

So this is e.g. how I printed that queue out:
  def trace(): Unit = {
    @tailrec def loop(ls: List[Trace], sb: StringBuilder, offset: Int): String = ls match {
      case Push(method) :: xs =>
        sb.append("*")
        sb.append("-" * offset)
        sb.append(" ")
        sb.append(method)
        sb.append("\n")
        loop(xs, sb, offset + 1)

      case Pop :: xs =>
        loop(xs, sb, offset - 1)

      case Err(name, ex) :: xs =>
        sb.append("*")
        sb.append("-" * offset)
        sb.append(s" EXCEPTION in $name: ${ex.getClass.getSimpleName} - ${ex.getMessage}")
        val (errs, rest) = xs.span {
          case Err(_, `ex`) => true
          case _ => false
        }
        loop(rest, sb, offset - 1 - errs.length)

      case Cancelled :: xs =>
        sb.append("*")
        sb.append("-" * offset)
        sb.append(s" TASK CANCELLED")
        val (cs, rest) = xs.span(_ == Cancelled)
        loop(rest, sb, offset - 1 - cs.length)

      case Nil => sb.toString()
    }

    val trace = Tracer.current().toList
    println(loop(trace, new StringBuilder(), 0))
  }
Oleg Pyzhcov
@oleg-py
Please don't ask wtf I was thinking, it was just a dank idea from over a year ago :grinning:
Fayi Femi-Balogun
@fayimora
Thanks @Avasil :thumbsup:
jhegedus42
@jhegedus42
Piotr Gawryś
@Avasil
I think Await.result(future, 5.seconds) (blocking) won't work but everything else should be fine
Salar Rahmanian
@softinio