Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
  • Feb 01 10:11
    @SystemFw banned @Hudsone_gitlab
  • Jan 31 2019 04:19
    404- forked
    404-/fs2
  • Jan 31 2019 03:01
    SethTisue commented #1232
  • Jan 30 2019 17:22
  • Jan 30 2019 13:45
  • Jan 30 2019 10:48
    pchlupacek commented #1406
  • Jan 30 2019 10:47
    pchlupacek commented #1406
  • Jan 30 2019 10:39
    pchlupacek commented #1407
  • Jan 30 2019 09:58
    lJoublanc commented #870
  • Jan 30 2019 09:42
    vladimir-popov commented #1407
  • Jan 30 2019 08:10
    vladimir-popov closed #1407
  • Jan 30 2019 08:10
    vladimir-popov commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 19:20
    SystemFw commented #1407
  • Jan 29 2019 18:57
    SystemFw commented #1406
  • Jan 29 2019 17:47
    pchlupacek commented #1406
  • Jan 29 2019 17:42
    pchlupacek commented #1406
  • Jan 29 2019 17:39
    pchlupacek commented #1407
  • Jan 29 2019 17:39
    vladimir-popov edited #1407
  • Jan 29 2019 17:38
    vladimir-popov commented #1406
peterstorm
@peterstorm:matrix.org
[m]
Hmmm, still can't make it work weirdly
Did this now:
def insertLeadsToDb: Stream[F, Unit] =
        combineLeads.parEvalMapUnordered(100) { case (leadForm, lead) =>
            Concurrent[F].delay(connectRepo.insertLead(lead, leadForm.connectCampaign.get.id))
        }.void
am I discarding again? :D
Anton Sviridov
@velvetbaldmime:matrix.org
[m]
what does connectRepo.insertLead return?
peterstorm
@peterstorm:matrix.org
[m]
F[Int]
Anton Sviridov
@velvetbaldmime:matrix.org
[m]
now you're not discarding, you're double wrapping :)
You create F[F[Int]]
peterstorm
@peterstorm:matrix.org
[m]
oh of course
dammit 😀
Thank you
There we go, awesome :D
Anton Sviridov
@velvetbaldmime:matrix.org
[m]
You probably added .void because it was complaining about type misalignment :)
peterstorm
@peterstorm:matrix.org
[m]
I still have void, because I don't care about the Int I guess :D
Anton Sviridov
@velvetbaldmime:matrix.org
[m]
The whole story with Unit subsuming values in Scala is quite unfortunate. So you gotta be careful with that
peterstorm
@peterstorm:matrix.org
[m]
Yeah, I've been bitten about that about 3 times now, since I started - I keep count :D
Anton Sviridov
@velvetbaldmime:matrix.org
[m]
there's a warn-discard flag or something that helps in a lot of cases, but not all
peterstorm
@peterstorm:matrix.org
[m]
Thank you, I'll give that a try!
Anton Sviridov
@velvetbaldmime:matrix.org
[m]
The set of flags in that plugin is quite draconian, but it catches a vast array of cases when doing pure FP.
eltherion
@eltherion

Is there any better (i. e. faster) way to download file than:

fs2.io.readInputStream((url.openStream()).delay, 1024)

?

it seems to be a little bit slower that downloading in any other way (other http lib, etc.)
Adam Rosien
@arosien
depends on how you are reading that stream. handling chunks at a time should be faster than byte-by-byte
eltherion
@eltherion

I've got that stream wrapped inside def run(): F[Unit] function:

    val arrayStart = "[\n"
    val arrayEnd = "\n]"
    val chunkSize = 4096

fs2.io.readInputStream(
      async.delay(new URL(uri.toString()).openConnection.getInputStream),
      chunkSize
    )
      .map(_.toChar)
      .through(rows[F, Char]())
      .through(headers[F, String])
      .map(_.toMap)
      .map(cleanseCsvData)
      .map(toJson)
      .map(_.compactPrint)
      .map { jsonString =>
        Metrics.metrics.counter(taskId.taskId).inc()
        jsonString
      }
      .intersperse("\n")
      .cons1(arrayStart)
      .append(Stream.eval(arrayEnd.pure))
      .through(text.utf8Encode)
      .through(Files[F].writeAll(outputFile.path, Seq(WRITE, APPEND, CREATE)))
      .compile
      .drain

When I schedule it to fiber:

cancellable <- run().startOn(executionContext)

and I cancel it later it doesn't end, a file gets completely downloaded and transformed.

What am I doing wrong?

eltherion
@eltherion
ping? :sweat_smile:
ybasket
@ybasket:matrix.org
[m]
@eltherion: Not sure what's "wrong" in your setup (also could depend on why you want to cancel at all), but you could try to use Stream's interruptWhen with a Deferred that you complete once you want to cancel (or do onCancel), that should "stop" the stream reliably. But again, depends on your larger setup.

Btw: If you update to fs2-data's latest snapshot (RC1 to come today or tomorrow), then you can simplify the beginning:

fs2.io.readInputStream(...)
  .through(text.utf8Decode)
  .through(decodeUsingHeaders[CsvRow[String, String]])
  .map(_.toMap)

(As for the toMap, I just opened a PR that would allow to decode directly into a Map)

ybasket
@ybasket:matrix.org
[m]
(Snapshot is 0.10.0+81-82e26ff6-SNAPSHOT)
eltherion
@eltherion

@ybasket:matrix.org
Thing is I tried it with Signal:

  override def schedule(rawUri: RawUri): F[TaskId] = {
    for {
      signal <- SignallingRef[F, Boolean](false)
      uri    <- async.pure(Uri(rawUri.uri))
      taskId <- TaskIdComp.create[F]
      _      <- taskRunner.run(taskId, uri, signal).startOn(schedulerExecutionContext)
      task    = Task(
                  taskId = taskId,
                  uri = uri,
                  state = TaskState.Scheduled,
                  cancelable = Option(new Cancellable[F] {
                    def cancel: F[Unit] = {
                      signal.set(true)
                    }
                  }),
                  startTime = None,
                  endTime = None
                )
      _      <- tasks.addTask(task)
    } yield taskId

Inside taskRunner.run(taskId, uri, signal) I use .interruptWhen(signal) on a Stream

but then, when I try to cancel processing (by invoking another api endpoint):

  override def cancelTask(taskId: TaskId): F[Option[CancellationResult]] = {
     taskService.getTask(taskId).flatMap { taskOption =>
       taskOption.fold(async.pure(Option.empty[CancellationResult])) { task =>
         task.cancelable.cancel
      }
    }
  }

nothing happens... :(

eltherion
@eltherion
I mean the stream does the job up to the end ignoring everything.
ybasket
@ybasket:matrix.org
[m]
Have you tried adding a Sync[F].delay(println("Canceling")) in your cancel to see whether the signal is really triggered?
Btw: I would suggest to use http4s if your URIs are HTTP-based, stream it from there instead of using Java's blocking APIs. And in your cancelTask, you can just use taskOption.traverse(_.cancelable.cancel) instead of the fold.
eltherion
@eltherion
@ybasket:matrix.org
Thank you for the help, suggestions, syntax hints. I just wanted to say I've overcame all the issues mentioned above. Thank you and have a good week!
Dylan Halperin
@dylemma
is it possible for a Pipe to intercept exceptions thrown by some "downstream" phase? Not looking to allow the stream to continue, just trying to transform/wrap the exception that actually gets thrown
Oleg Pyzhcov
@oleg-py
downstream as in upstream.through(interceptHere).through(downstreamPipe)?
Dylan Halperin
@dylemma
yeah
Oleg Pyzhcov
@oleg-py
then no
Dylan Halperin
@dylemma
aw darn, thanks
Oleg Pyzhcov
@oleg-py
that would be odd, for stream to know who uses it later
Dylan Halperin
@dylemma
in my particular case, I'm doing something like that in my parsing library, and was able to implement a sort of "smart stack trace" by passing exceptions backwards in the "through" chain. That's done on my Transformer[A, B] class, which now I'm trying to implement a toPipe[F] for
sort of a "directed bubble-up" of a normal exception
Fabio Labella
@SystemFw
there are streaming designs that are bidirectional
fs2 isn't that though
Dylan Halperin
@dylemma
gotcha. Welp thanks for saving me a couple hours of head-scratching
Xuehai Bian
@xbiansf
Hi Find this error could not find implicit value for parameter compiler: fs2.Stream.Compiler[[x]Any,G]
}.compile.drain. It is complaining "Compiler" at compile method? How to resolve this? Thanks. I searched that I need Sync[]. This is for fs2.1.1. (can not upgrade because of the old project)
Oleg Pyzhcov
@oleg-py
It's three backticks, newline, code, newline, three backticks
Xuehai Bian
@xbiansf
implicit def getJobResultResponseDecoder[F[_] : Sync](implicit cs: ContextShift[F]): EntityDecoder[F, GetJobResultResponse] =
    EntityDecoder.decodeBy(MediaType.text.csv) { msg: Message[F] =>
      EitherT {
        Stream.resource(blockingExecutionContext).flatMap { blocker =>
          val bodyStream = msg.body
          val pipe: Pipe[F, Byte, Byte] = src =>
            src
              .through(text.utf8Decode)
              .through(text.lines)
              .through(text.utf8Encode)
          val sink: Pipe[F, Byte, Unit] =
            io.file.writeAll(Paths.get(getFilePath), blocker,
              Seq(StandardOpenOption.CREATE, StandardOpenOption.APPEND)
            )
          val stream: Stream[F, Unit] =
            bodyStream
              .through(pipe)
              .through(sink)
          stream
       }.head.compile.drain
  Sync[F].pure(
  GetJobResultResponse(locator, size)
    .asRight[DecodeFailure]
)}
Oleg Pyzhcov
@oleg-py
how isblockingExecutionContext defined?
Xuehai Bian
@xbiansf

  val blockingExecutionContext =
    Resource.make(
      IO(
        ExecutionContext.fromExecutorService(
          Executors.newFixedThreadPool(2)
        )
      )
    ) { ec => IO(ec.shutdown()) }
Oleg Pyzhcov
@oleg-py
You are mixing an IO stream from Stream.resource with an F stream with the pipes. They unify to Stream[[X] =>> Any, Unit] after flatMap, which you can't compile
Xuehai Bian
@xbiansf
How should I remove the IO then? Seems I have to use the F[_], because of the EntityDecoder[F ?