updateManyWithGeneratedKeys
. What else might it be if not a pipe? Or, why does using a pipe here cause trouble? Someone else mentioned refactoring the pipes to Chunk[A] => IO[Chunk[A]]
might help but I'm not quite sure what they mean by that or where the change would happen. I think they thought that perhaps having these all in ConnectionIO
could be a source of issues?
@gnfisher, with chunks the above can be simplified to
input.evalMapChunk { chunk =>
processA(chunk.map(_.a)) >> processB(chunk.map(_.b))
}
Here processA: Chunk[A] => ConnectionIO[Chunk[B]]
and so on. It's a little easier to reason about than streams.
Upd: removed the .chunks
call.
.eval
and .eval_
.
processA
does not return a nything that processB
takes as input
>>
vs ++
Hi, I'm having an issue with a mssql query. I try to fetch all ~25k rows from a table, and get this:
Jun 04, 2021 9:05:24 AM com.microsoft.sqlserver.jdbc.TDSReader throwInvalidTDS
SEVERE: ConnectionID:3 ClientConnectionId: 3c8eb486-fe5f-4e72-b267-69e79e780b93 got unexpected value in TDS response at offset:7992
Jun 04, 2021 9:05:24 AM com.microsoft.sqlserver.jdbc.TDSParser throwUnexpectedTokenException
SEVERE: ConnectionID:3 ClientConnectionId: 3c8eb486-fe5f-4e72-b267-69e79e780b93: getNextResult: Encountered unexpected TDS_FEATURE_EXT_UTF8SUPPORT (0x0A)
which to me looks like an encoding issue, but when I paginate the query; e.g. order by foo offset 0 rows fetch next 5000 rows only
– I can query all the pages fine.
I'm using doobie, but this might be a pure jdbc issue?
ok.... here's a question.... I have a bunch of ConnectionIO's strung together in a for comprehension.... half way through these I need to save something to the file system and base on where it ends up update a database. Because I'm using zio, this is somewhat what I have:
def save(saveMe: MyComplicatedObject, saveToFileSystem: MyComplicatedObject => ZIO[A,B,File]): ConnectionIO[MyComplicatedObject] = {
for {
res1 <- savePart1(saveMe) //returns ConnectionIO[MyComplicatedObject]
res2 <- savePart2(res1) //returns ConnectionIO[MyComplicatedObject]
res3 <- saveToFileSystem(res2) // <<<<< here... how to convert ZIO to ConnectionIO
res4 <- savePart3(res3) //returns ConnectionIO[MyComplicatedObject]
} yield res4
}
As you can see, the basic question, is how to convert a ZIO to a ConnectionIO? I know I can use .pure to get a ConnectionIO out of a normal thing, but I doubt .pure on a ZIO IO would give me what I want.
import zio.interop.catz._
in this module. So when you define savePart1
, savePart2
, savePart3
, they should return zio.Task[?]
, instead of ConnectionIO
. Same goes with this save
method. The interop should perform the magic and convert ConnectionIO
to zio.Task
automatically.
saveToFileSystem
itself should return ConnectionIO
instead of ZIO
. Perhaps in that method, you are using a lib or other custom code that returns a ZIO. In that case, I'm not aware of a lib that can convert from ZIO to ConnectionIO. You might have to evaluate the ZIO effect right there and then wrap the result into a ConnectionIO.saveToFileSystem
. In the distributed (or not) saga pattern, you would specify compensation actions in event of a rollback (for all participating systems). I don't think ConnectionIO
has something like that because I assume it just relies on the underlying RDBMS engine's rollback mechanics. I'm interested in a solution though.
interop-cats
source code and I didn't spot routines going from ZIO to Cats IO. I might be wrong though. You should post this question on ZIO discord. Like I mentioned, you can always call unsafeRun
on a ZIO effect and lift the result into Cats IO if you choose to do so.
ConnectionIO
to ZIO
. You might want to look into that.
Hello!
I'm new around. I have a question: Let's say I have a stream of Integers from a query, and I want to save all of them in a single transaction but without converting it to a List. Is it possible? :/
def (intStream: fs2.Stream[ConnectionIO,Int]): IO[Unit] = {
???
}
I really miss more documentation and examples on fs2.Streams and doobie related :_(
Is something like this correct? Would it be memory-safe?
// for simplicity, insertValIntoOtherTable: Int => ConnectionIO[Unit]
def (intStream: fs2.Stream[ConnectionIO,Int]): IO[Unit] = {
intStream.compile.fold(connection.unit)(
(accConnectionIO, intVal) => {
accConnectionIO *> insertValIntoOtherTable(intVal)
}
).flatten.transact(transactor)
}
That last flatten... :/
withUniqueGeneratedKeys[Int]("id")
to get the auto-generated ID of the inserted row. I tried doing the same for the INSERT ON DUPLICATE UPDATE query but it was returning the following error when a query updates instead of insertingExpected ResultSet exhaustion, but more rows were available.
. I think this occurs because on UPDATE MYSQL returns 2 rows updated
to indicate that there was an update instead of an insert. Does this error happen because the withUniqueGeneratedKeys
is expecting 1 row only to be updated? I 'fixed' it by using update.run
instead. Still I would like to get the original id
in case of update. Do you have any recommendations on how to go around it please?
Hi! I'm using the notify/listen functionality of Postgres. The notify
is executed in a DB trigger. It seems after some time my program stops receiving notifications but the stream does not stop with an error:
def notificationStream(
channelName: String,
pollingInterval: FiniteDuration
): Stream[F, PGNotification] = {
val inner: Pipe[ConnectionIO, FiniteDuration, PGNotification] = ticks =>
for {
_ <- Stream.resource(channel(channelName))
_ <- ticks
ns <- Stream.eval(PHC.pgGetNotifications <* HC.commit)
_ <- Stream.eval(logger.info(s"Received ${ns.size} notifications"))
n <- Stream.emits(ns)
} yield n
awakeEvery[F](pollingInterval).through(inner.transact(xa))
}
Does somebody have a hint what could be wrong?
The connection to the server was lost. Attempting reset: Succeeded.
and need to execute listen <channel>
again to continue to receive notifications. As a brute force attempt I added Stream.eval(PHC.pgListen(channelName))
to the inner
pipe. This seems to work. But I will continue to look for a smarter way.
opaque type Code = String
object Code:
def apply(code: String): Code = code
extension (code: Code) def value: String = code
given Get[Code] = Get[String].tmap[Code](x => Code(x))
given Put[Code] = Put[String].tcontramap[Code](x => x.value)
case class Country(code: Code, name: String, pop: Int, gnp: Option[Double])