val doobieV = "1.0.0-M5"
"org.tpolecat" %% "doobie-hikari" % doobieV, //Streaming JDBC w/ Hikari support
"org.tpolecat" %% "doobie-postgres" % doobieV, //Doobie Postgres support (replace if needed)
"org.tpolecat" %% "doobie-refined" % doobieV //Doobie Postgres support (replace if needed)
I am processing data in streams. I have a Stream[ConnectionIO, (A, B)]
. I am trying to save the A
's and B
's to the database. The B
can't be saved unless it can find the local id of the A
, so the A
s are processed first. This is the step I have and it is not working. The A
s save, but the B
s do not. I am not sure if its a misunderstanding of streams and evaluations (maybe inside the flatMap
the two new streams are being evaluated concurrently, so B
cant find local A
to save?), or if there is something with doobie, transactions, etc perhaps I am doing something very wrong.
This is the code snippet if anyone can point me in the right direction to debug this further. processA
and processB
are Pipe[Connection, A, C]
input.chunks.flatMap { chunk =>
Stream.eval(Stream.chunk(chunk).map(_.a).through(processA).compile.drain) >>
Stream.chunk(chunk).map(_.b).through(processB)
}
(A, B)
but a value, hence the map
step there... but you get the idea.
updateManyWithGeneratedKeys
. What else might it be if not a pipe? Or, why does using a pipe here cause trouble? Someone else mentioned refactoring the pipes to Chunk[A] => IO[Chunk[A]]
might help but I'm not quite sure what they mean by that or where the change would happen. I think they thought that perhaps having these all in ConnectionIO
could be a source of issues?
@gnfisher, with chunks the above can be simplified to
input.evalMapChunk { chunk =>
processA(chunk.map(_.a)) >> processB(chunk.map(_.b))
}
Here processA: Chunk[A] => ConnectionIO[Chunk[B]]
and so on. It's a little easier to reason about than streams.
Upd: removed the .chunks
call.
.eval
and .eval_
.
processA
does not return a nything that processB
takes as input
>>
vs ++
Hi, I'm having an issue with a mssql query. I try to fetch all ~25k rows from a table, and get this:
Jun 04, 2021 9:05:24 AM com.microsoft.sqlserver.jdbc.TDSReader throwInvalidTDS
SEVERE: ConnectionID:3 ClientConnectionId: 3c8eb486-fe5f-4e72-b267-69e79e780b93 got unexpected value in TDS response at offset:7992
Jun 04, 2021 9:05:24 AM com.microsoft.sqlserver.jdbc.TDSParser throwUnexpectedTokenException
SEVERE: ConnectionID:3 ClientConnectionId: 3c8eb486-fe5f-4e72-b267-69e79e780b93: getNextResult: Encountered unexpected TDS_FEATURE_EXT_UTF8SUPPORT (0x0A)
which to me looks like an encoding issue, but when I paginate the query; e.g. order by foo offset 0 rows fetch next 5000 rows only
– I can query all the pages fine.
I'm using doobie, but this might be a pure jdbc issue?
ok.... here's a question.... I have a bunch of ConnectionIO's strung together in a for comprehension.... half way through these I need to save something to the file system and base on where it ends up update a database. Because I'm using zio, this is somewhat what I have:
def save(saveMe: MyComplicatedObject, saveToFileSystem: MyComplicatedObject => ZIO[A,B,File]): ConnectionIO[MyComplicatedObject] = {
for {
res1 <- savePart1(saveMe) //returns ConnectionIO[MyComplicatedObject]
res2 <- savePart2(res1) //returns ConnectionIO[MyComplicatedObject]
res3 <- saveToFileSystem(res2) // <<<<< here... how to convert ZIO to ConnectionIO
res4 <- savePart3(res3) //returns ConnectionIO[MyComplicatedObject]
} yield res4
}
As you can see, the basic question, is how to convert a ZIO to a ConnectionIO? I know I can use .pure to get a ConnectionIO out of a normal thing, but I doubt .pure on a ZIO IO would give me what I want.
import zio.interop.catz._
in this module. So when you define savePart1
, savePart2
, savePart3
, they should return zio.Task[?]
, instead of ConnectionIO
. Same goes with this save
method. The interop should perform the magic and convert ConnectionIO
to zio.Task
automatically.
saveToFileSystem
itself should return ConnectionIO
instead of ZIO
. Perhaps in that method, you are using a lib or other custom code that returns a ZIO. In that case, I'm not aware of a lib that can convert from ZIO to ConnectionIO. You might have to evaluate the ZIO effect right there and then wrap the result into a ConnectionIO.saveToFileSystem
. In the distributed (or not) saga pattern, you would specify compensation actions in event of a rollback (for all participating systems). I don't think ConnectionIO
has something like that because I assume it just relies on the underlying RDBMS engine's rollback mechanics. I'm interested in a solution though.
interop-cats
source code and I didn't spot routines going from ZIO to Cats IO. I might be wrong though. You should post this question on ZIO discord. Like I mentioned, you can always call unsafeRun
on a ZIO effect and lift the result into Cats IO if you choose to do so.
ConnectionIO
to ZIO
. You might want to look into that.
Hello!
I'm new around. I have a question: Let's say I have a stream of Integers from a query, and I want to save all of them in a single transaction but without converting it to a List. Is it possible? :/
def (intStream: fs2.Stream[ConnectionIO,Int]): IO[Unit] = {
???
}
I really miss more documentation and examples on fs2.Streams and doobie related :_(
Is something like this correct? Would it be memory-safe?
// for simplicity, insertValIntoOtherTable: Int => ConnectionIO[Unit]
def (intStream: fs2.Stream[ConnectionIO,Int]): IO[Unit] = {
intStream.compile.fold(connection.unit)(
(accConnectionIO, intVal) => {
accConnectionIO *> insertValIntoOtherTable(intVal)
}
).flatten.transact(transactor)
}
That last flatten... :/
withUniqueGeneratedKeys[Int]("id")
to get the auto-generated ID of the inserted row. I tried doing the same for the INSERT ON DUPLICATE UPDATE query but it was returning the following error when a query updates instead of insertingExpected ResultSet exhaustion, but more rows were available.
. I think this occurs because on UPDATE MYSQL returns 2 rows updated
to indicate that there was an update instead of an insert. Does this error happen because the withUniqueGeneratedKeys
is expecting 1 row only to be updated? I 'fixed' it by using update.run
instead. Still I would like to get the original id
in case of update. Do you have any recommendations on how to go around it please?