Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Matthew de Detrich
@mdedetrich
(of course if there is anything blocking them let me know)
Piotr Gawryś
@Avasil
@mdedetrich I will take a look today and we can do release as soon as it is merged (or do a snapshot first if you want to test). Where do you call Local.isolate after this change if using Akka-Http?
Piotr Gawryś
@Avasil
Overall I think setting it as an option is alright, I was trying to come up with something that can do both things at once but I didn't get there. I wish we could have a 1 default that works well everywhere but Future-interop makes it a little bit hard
Matthew de Detrich
@mdedetrich
@Avasil Thanks for comments on the PR, will look into it today

Where do you call Local.isolate after this change if using Akka-Http?

I think the whole point is that we don't call Local.isolate since we don't even want isolation

Is this theoretically a problem?
Piotr Gawryś
@Avasil
Wouldn't it mix contexts between concurrent requests without any isolation? Or is this a different use case?
Matthew de Detrich
@mdedetrich
I think thats a different usecase, in this case you have to use Local.isolate
This is the future -> task -> future case (which is sequential)
Matthew de Detrich
@mdedetrich
@Avasil I have updated the PR with all of the request changes apart from the test which @oleg-py wanted, will add this tonight
Pau Alarcón
@paualarco

Hello!

I was wondering if it does make sense creating a Consumer that returns an Observable on success?

That would allow to chaining it with another consumer or working with the result value of each processed event, as it is done in akka streams with .mapMaterializedValue()

In which in that case the materialised value return a Task[Observable[Task[Response]].

This would be the consumer:

Consumer.create[In, Observable[Task[Out]]] { (scheduler, _, callback) =>
      new Observer.Sync[In] {
        val observable: ConcurrentSubject[Task[Out], Task[Out]] =
          ConcurrentSubject[Task[Out]](MulticastStrategy.replay)(scheduler)

        def onNext(request: In): Ack = {
          val response: Task[Out] = Task.fromFuture(request.execute(query))
          observable.onNext(observable)
          monix.execution.Ack.Continue
        }

        def onComplete(): Unit = {
          callback.onSuccess(subject)
        }

        def onError(ex: Throwable): Unit = {
          callback.onError(ex)
        }
      }
    }
Matthew de Detrich
@mdedetrich
@Avasil Hold up on the release of my PR, I think I missed something
Comments are on the PR
Matthew de Detrich
@mdedetrich
@Avasil Is there anything else you want to add for monix/monix#1146 or do you want to go ahead and merge it?
Piotr Gawryś
@Avasil
I want to take a look at monix-opentracing and see how Local is used there
Matthew de Detrich
@mdedetrich
Okay sure
BTW all of the tests are now passing in monix-opentracing with the latest monix snapshot that has the changes in the PR, only one test is failing but its completely unrelated to monix
This is FutureScalaConcurrentAutoFinishSpec
There is also io.opentracing.util.ThreadLocalScopeManager which is what monix-opentracing is based off (this is OpenTracing's default ScopeManager which just uses a ThreadLocal)
Let me know if you need any help!
Ilya
@squadgazzz
Hello everyone. Sometimes I get this message when I shut down the application. https://codeshare.io/5PVgRd
Looks like some task is trying to execute after the Scheduler is already down, isn't it? Here's a sample code. Any advice on how to avoid this kind of message?
import io.netty.channel.{Channel, ChannelHandlerContext, ChannelInboundHandlerAdapter}
import monix.execution.Scheduler
import monix.reactive.subjects.ConcurrentSubject
// scheduler here is just a monix.execution.schedulers.fixedPool
class MyObserver(someConstant: Int, implicit val scheduler: Scheduler) extends ChannelInboundHandlerAdapter {

  private val scores = ConcurrentSubject.publish[(Channel, BigInt)]

  override def channelRead(ctx: ChannelHandlerContext, msg: AnyRef): Unit = msg match {
    case sc: BigInt => scores.onNext((ctx.channel(), sc))
    case _          => super.channelRead(ctx, msg)
  }

  def shutdown(): Unit = {
    scores.onComplete()
  }
}
Alexis Hernandez
@AlexITC
Are there any plans to publish for scalajs 1.0.0?
Piotr Gawryś
@Avasil
@AlexITC Yes, you can track: monix/monix#1131
I will see if I have permissions to update minitest myself if @alexandru is away
Piotr Gawryś
@Avasil
@squadgazzz You can't execute anything asynchronously after you shut down scheduler. What's your intended behavior when shutting down the app?
Richard
@Executioner1939
@mdedetrich I am really anticipating your monix-opentracing library, we are in desperate need to add it to our distributed system in the least invasive way as possible. Mostly for tracking, Mongo, Rabbit and Redis calls throughout the system.
Alexis Hernandez
@AlexITC
Thanks!
Matthew de Detrich
@mdedetrich
@Executioner1939 I am doing a little bit of work on it, also waiting for @Avasil to approve/merge my PR. But the basics so far are working which is good.
We are also facing the same problem you are
Should expect a release soon
I also have to do some research to see how OpenTracing's Scope works because its slightly confusing now
Piotr Gawryś
@Avasil
I might need few days because I need to finish something for work on Friday but I'm very excited about released version as well!
Matthew de Detrich
@mdedetrich
Cool, no pressure!
Jack Low
@wjlow

Hey everyone, I have a use-case to run a long-running task in the background while I return some value:

for {
  x <- task1
  _ <- longRunningTask
} yield x

Is the way to do this to literally run longRunningTask.runAsync/runToFuture/runAsyncAndForget?

Jack Low
@wjlow
Okay I think I've found .start which does what I want
Jack Low
@wjlow
Sharing this small program here in case anyone is interested:
  override def run(args: List[String]): Task[ExitCode] = {
    val writeToFile = (str: String) => Task.sleep(FiniteDuration(5, TimeUnit.SECONDS)) >> Task {
      import java.io._
      val pw = new PrintWriter(new File("hello.txt"))
      pw.write(str + "\n")
      pw.close
    }
    val program = for {
      userInput <- Task(StdIn.readLine())
      fiber <- writeToFile(userInput).start
      _ <-
        if (userInput.startsWith("ignore"))
          fiber.cancel >> Task(println(s"Ignoring user input: $userInput"))
        else
          Task(println(s"Slowly writing to file...: $userInput"))
    } yield ExitCode.Success
    program >> run(args)
  }
Piotr Gawryś
@Avasil
@wjlow Wouldn't it be better to not call writeToFile instead of cancel? If you don't need a result from Task there is also startAndForget
Piotr Gawryś
@Avasil
@paualarco I'm not sure I follow the use case. Can you show simple example how would you use this Consumer?
fnqista
@fnqista
Hi guys, need some quick "go" or "no-go" for a new issue: monix/monix#1154
Pau Alarcón
@paualarco
@Avasil as you said, the use case is also covered by Observable.toListL, the one think is that it might be handy for people whose are familiarised with Akka Streams api, in which for materialising streams Sink.seq is commonly used. This is my opinion, on the other hand if you think it is going to bring confusion we should just not add it.
Piotr Gawryś
@Avasil
I see, I thought it's about something else
Adriel Velazquez
@AdrielVelazquez
This is probably a small error, but I noticed that moving from monix-kafka 0.10.x to monix-kafka 1+, KafkaConsumerConfig with includeDefault=true overrides all the values default instead of the passed in Config having the main values
Piotr Gawryś
@Avasil
@AdrielVelazquez Do you mean it works differently in the same release but for different Kafka version or did you migrate from the release in 2017?
Adriel Velazquez
@AdrielVelazquez

@Avasil Let me show an example.

If I have a kafkaconsumer.conf that looks like the following.

// Kafka settings
auto.commit.enable = true
auto.offset.reset = "earliest"
group.id = "test-consumer"
And I run the following code
    val conf = KafkaConsumerConfig(myConf, includeDefaults=true)
    val foo = myConf.getString("auto.commit.enable")
    println(foo)
    println(conf.enableAutoCommit)
myConf being the above configuration. I get this output
true
false
The defaults is overriding the conf that I passed in.
And for context I'm running these versions
  val monixVersion = "3.1.0"
  val monixKafkaVersion = "1.0.0-RC5"
  val kafkaVersion = "2.4.0"