def insertLeadsToDb: Stream[F, Unit] =
combineLeads.parEvalMapUnordered(100) { case (leadForm, lead) =>
Concurrent[F].delay(connectRepo.insertLead(lead, leadForm.connectCampaign.get.id))
}.void
F[F[Int]]
.void
because it was complaining about type misalignment :)
void
, because I don't care about the Int I guess :D
I've got that stream wrapped inside def run(): F[Unit]
function:
val arrayStart = "[\n"
val arrayEnd = "\n]"
val chunkSize = 4096
fs2.io.readInputStream(
async.delay(new URL(uri.toString()).openConnection.getInputStream),
chunkSize
)
.map(_.toChar)
.through(rows[F, Char]())
.through(headers[F, String])
.map(_.toMap)
.map(cleanseCsvData)
.map(toJson)
.map(_.compactPrint)
.map { jsonString =>
Metrics.metrics.counter(taskId.taskId).inc()
jsonString
}
.intersperse("\n")
.cons1(arrayStart)
.append(Stream.eval(arrayEnd.pure))
.through(text.utf8Encode)
.through(Files[F].writeAll(outputFile.path, Seq(WRITE, APPEND, CREATE)))
.compile
.drain
When I schedule it to fiber:
cancellable <- run().startOn(executionContext)
and I cancel it later it doesn't end, a file gets completely downloaded and transformed.
What am I doing wrong?
Stream
's interruptWhen
with a Deferred
that you complete once you want to cancel (or do onCancel
), that should "stop" the stream reliably. But again, depends on your larger setup.
Btw: If you update to fs2-data's latest snapshot (RC1 to come today or tomorrow), then you can simplify the beginning:
fs2.io.readInputStream(...)
.through(text.utf8Decode)
.through(decodeUsingHeaders[CsvRow[String, String]])
.map(_.toMap)
(As for the toMap
, I just opened a PR that would allow to decode directly into a Map
)
@ybasket:matrix.org
Thing is I tried it with Signal
:
override def schedule(rawUri: RawUri): F[TaskId] = {
for {
signal <- SignallingRef[F, Boolean](false)
uri <- async.pure(Uri(rawUri.uri))
taskId <- TaskIdComp.create[F]
_ <- taskRunner.run(taskId, uri, signal).startOn(schedulerExecutionContext)
task = Task(
taskId = taskId,
uri = uri,
state = TaskState.Scheduled,
cancelable = Option(new Cancellable[F] {
def cancel: F[Unit] = {
signal.set(true)
}
}),
startTime = None,
endTime = None
)
_ <- tasks.addTask(task)
} yield taskId
Inside taskRunner.run(taskId, uri, signal)
I use .interruptWhen(signal)
on a Stream
but then, when I try to cancel processing (by invoking another api endpoint):
override def cancelTask(taskId: TaskId): F[Option[CancellationResult]] = {
taskService.getTask(taskId).flatMap { taskOption =>
taskOption.fold(async.pure(Option.empty[CancellationResult])) { task =>
task.cancelable.cancel
}
}
}
nothing happens... :(
Sync[F].delay(println("Canceling"))
in your cancel
to see whether the signal is really triggered?
cancelTask
, you can just use taskOption.traverse(_.cancelable.cancel)
instead of the fold
.
Transformer[A, B]
class, which now I'm trying to implement a toPipe[F]
for
implicit def getJobResultResponseDecoder[F[_] : Sync](implicit cs: ContextShift[F]): EntityDecoder[F, GetJobResultResponse] =
EntityDecoder.decodeBy(MediaType.text.csv) { msg: Message[F] =>
EitherT {
Stream.resource(blockingExecutionContext).flatMap { blocker =>
val bodyStream = msg.body
val pipe: Pipe[F, Byte, Byte] = src =>
src
.through(text.utf8Decode)
.through(text.lines)
.through(text.utf8Encode)
val sink: Pipe[F, Byte, Unit] =
io.file.writeAll(Paths.get(getFilePath), blocker,
Seq(StandardOpenOption.CREATE, StandardOpenOption.APPEND)
)
val stream: Stream[F, Unit] =
bodyStream
.through(pipe)
.through(sink)
stream
}.head.compile.drain
Sync[F].pure(
GetJobResultResponse(locator, size)
.asRight[DecodeFailure]
)}