Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Fabio Labella
@SystemFw
if the stream returned by processA is empty
I need to go, but text me later, and I can explain you >> vs ++
Greg Fisher
@gnfisher
Ok, thank you so much for all your help throughout this saga
Fabio Labella
@SystemFw
it's not hard but if you don't clearly know the difference it can be confusing, nothing to feel dumb about :)
Kristian Nordal
@nor

Hi, I'm having an issue with a mssql query. I try to fetch all ~25k rows from a table, and get this:

Jun 04, 2021 9:05:24 AM com.microsoft.sqlserver.jdbc.TDSReader throwInvalidTDS
SEVERE: ConnectionID:3 ClientConnectionId: 3c8eb486-fe5f-4e72-b267-69e79e780b93 got unexpected value in TDS response at offset:7992
Jun 04, 2021 9:05:24 AM com.microsoft.sqlserver.jdbc.TDSParser throwUnexpectedTokenException
SEVERE: ConnectionID:3 ClientConnectionId: 3c8eb486-fe5f-4e72-b267-69e79e780b93: getNextResult: Encountered unexpected TDS_FEATURE_EXT_UTF8SUPPORT (0x0A)

which to me looks like an encoding issue, but when I paginate the query; e.g. order by foo offset 0 rows fetch next 5000 rows only – I can query all the pages fine.
I'm using doobie, but this might be a pure jdbc issue?

jatcwang
@jatcwang:matrix.org
[m]
@nor: yep looks like it. Give raw jdbc a go and see if you get the same issue
Roberto Leibman
@rleibman

ok.... here's a question.... I have a bunch of ConnectionIO's strung together in a for comprehension.... half way through these I need to save something to the file system and base on where it ends up update a database. Because I'm using zio, this is somewhat what I have:

def save(saveMe: MyComplicatedObject, saveToFileSystem: MyComplicatedObject => ZIO[A,B,File]): ConnectionIO[MyComplicatedObject] = {
  for {
     res1 <- savePart1(saveMe) //returns ConnectionIO[MyComplicatedObject]
     res2 <- savePart2(res1) //returns ConnectionIO[MyComplicatedObject]
     res3 <- saveToFileSystem(res2) // <<<<< here... how to convert ZIO to ConnectionIO
     res4 <- savePart3(res3) //returns ConnectionIO[MyComplicatedObject]
  } yield res4
}

As you can see, the basic question, is how to convert a ZIO to a ConnectionIO? I know I can use .pure to get a ConnectionIO out of a normal thing, but I doubt .pure on a ZIO IO would give me what I want.

Rob Norris
@tpolecat
You probably don’t want to do that because if the transaction rolls back the filesystem operation will persist.
But in any case if you use cats-effect IO you can say myIO.to[ConnectionIO]. It may or may not work with ZIO, I don’t know.
Roberto Leibman
@rleibman
Yeah, I thought of that as well.... it's a personal project, I don't care this time around if there's a few extra files lying around. I'll see if that works (it doesn't yet compile, I think I need to figure out how to convert it to a cats-effect with the interop library.
vonchav
@voonchav_gitlab
@rleibman I assume you already have import zio.interop.catz._ in this module. So when you define savePart1, savePart2, savePart3, they should return zio.Task[?], instead of ConnectionIO. Same goes with this savemethod. The interop should perform the magic and convert ConnectionIO to zio.Task automatically.
Roberto Leibman
@rleibman
@voonchav_gitlab yes, maybe, but I think I want them to stay as ConnectionIO, cause eventually I want 'em all to run in a transact, which is why I want to convert the ZIO to a ConnectionIO (and yes, they'll convert to ZIO later)
vonchav
@voonchav_gitlab
Hi @rleibman, in that case, for the example you gave, saveToFileSystem itself should return ConnectionIO instead of ZIO. Perhaps in that method, you are using a lib or other custom code that returns a ZIO. In that case, I'm not aware of a lib that can convert from ZIO to ConnectionIO. You might have to evaluate the ZIO effect right there and then wrap the result into a ConnectionIO.
I think I understand what you're trying to do in the example. But in reality, having a non-DB operation as part of a DB transaction is a bit weird because I don't think a DB rollback can undo the change made in saveToFileSystem. In the distributed (or not) saga pattern, you would specify compensation actions in event of a rollback (for all participating systems). I don't think ConnectionIO has something like that because I assume it just relies on the underlying RDBMS engine's rollback mechanics. I'm interested in a solution though.
vonchav
@voonchav_gitlab
I skimmed through ZIO's interop-cats source code and I didn't spot routines going from ZIO to Cats IO. I might be wrong though. You should post this question on ZIO discord. Like I mentioned, you can always call unsafeRun on a ZIO effect and lift the result into Cats IO if you choose to do so.
heksenlied
@heksenlied:matrix.org
[m]
@rleibman: In our project, we are using https://github.com/gaelrenoux/tranzactio to lift the ConnectionIO to ZIO. You might want to look into that.
Roberto Leibman
@rleibman
That sounds awesome, I'll definitely try that.
Alfonso Nishikawa
@alfonsonishikawa

Hello!
I'm new around. I have a question: Let's say I have a stream of Integers from a query, and I want to save all of them in a single transaction but without converting it to a List. Is it possible? :/

def (intStream: fs2.Stream[ConnectionIO,Int]): IO[Unit] = {
  ???
}

I really miss more documentation and examples on fs2.Streams and doobie related :_(

Alfonso Nishikawa
@alfonsonishikawa

Is something like this correct? Would it be memory-safe?

// for simplicity, insertValIntoOtherTable: Int => ConnectionIO[Unit]

def (intStream: fs2.Stream[ConnectionIO,Int]): IO[Unit] = {
    intStream.compile.fold(connection.unit)(
      (accConnectionIO, intVal) => {
        accConnectionIO *> insertValIntoOtherTable(intVal)
      }
    ).flatten.transact(transactor)
}

That last flatten... :/

Ian Agius
@ianagius
Hi,
Im trying to write an INSERT ON DUPLICATE UPDATE query in doobie. Usually when doing an INSERT only I use withUniqueGeneratedKeys[Int]("id") to get the auto-generated ID of the inserted row. I tried doing the same for the INSERT ON DUPLICATE UPDATE query but it was returning the following error when a query updates instead of insertingExpected ResultSet exhaustion, but more rows were available.. I think this occurs because on UPDATE MYSQL returns 2 rows updated to indicate that there was an update instead of an insert. Does this error happen because the withUniqueGeneratedKeys is expecting 1 row only to be updated? I 'fixed' it by using update.run instead. Still I would like to get the original id in case of update. Do you have any recommendations on how to go around it please?
Rob Norris
@tpolecat
MySQL may not support updates returning keys.
I don't know, sorry. If you can figure out how you would do it with JDBC we can figure out how it would look in doobie.
Ian Agius
@ianagius
I will research a bit tomorrow and if I find something I will share. Thanks a lot for your time and prompt response.
Anton Solovyev
@Rosteelton
Hi! Can I get Meta automatically for opaque type in scala 3? (without writing Meta[Int].timap(identity)(identity))
daenyth
@daenyth:matrix.org
[m]
If it doesn't happen out of the box, it seems reasonable that the feature should be added - open a github ticket?
Sebastian Voss
@sebastianvoss

Hi! I'm using the notify/listen functionality of Postgres. The notify is executed in a DB trigger. It seems after some time my program stops receiving notifications but the stream does not stop with an error:

def notificationStream(
      channelName: String,
      pollingInterval: FiniteDuration
  ): Stream[F, PGNotification] = {
    val inner: Pipe[ConnectionIO, FiniteDuration, PGNotification] = ticks =>
      for {
        _  <- Stream.resource(channel(channelName))
        _  <- ticks
        ns <- Stream.eval(PHC.pgGetNotifications <* HC.commit)
        _  <- Stream.eval(logger.info(s"Received ${ns.size} notifications"))
        n  <- Stream.emits(ns)
      } yield n
    awakeEvery[F](pollingInterval).through(inner.transact(xa))
  }

Does somebody have a hint what could be wrong?

Rob Norris
@tpolecat
I have wondered about the reliability of this mechanism for a while. I don’t have an answer but would be interested if you find anything out elsewhere. JDBC users may have run into this.
Sebastian Voss
@sebastianvoss
thx @tpolecat, I will try to reproduce also with psql. Will report back.
Sebastian Voss
@sebastianvoss
@tpolecat , I found impossibl/pgjdbc-ng#517 which sounds similar. They are blaming it on GC. Could this be the right direction?
Rob Norris
@tpolecat
On my phone, I’ll have a look later. Thanks for digging!
Sebastian Voss
@sebastianvoss
A few more observations: In psql I can see a similar situation. After some time I get The connection to the server was lost. Attempting reset: Succeeded. and need to execute listen <channel> again to continue to receive notifications. As a brute force attempt I added Stream.eval(PHC.pgListen(channelName)) to the inner pipe. This seems to work. But I will continue to look for a smarter way.
Walter Chang
@weihsiu
trying to use scala 3 opaque type with Put and Get. the code compiles but querying Country is non-terminating.
opaque type Code = String
object Code:
  def apply(code: String): Code = code
extension (code: Code) def value: String = code
given Get[Code] = Get[String].tmap[Code](x => Code(x))
given Put[Code] = Put[String].tcontramap[Code](x => x.value)

case class Country(code: Code, name: String, pop: Int, gnp: Option[Double])
Rob Norris
@tpolecat
Hm seems like that should work.
bentucker
@bentucker

after updating a project to cats-effect version 3.1.1 & doobie 1.0.0-M5 trying to lift IO to ConnectionIO with this helper

val liftToConnIO: FunctionK[IO, doobie.ConnectionIO] = LiftIO.liftK[doobie.ConnectionIO]

generates an error could not find implicit value for parameter F: cats.effect.LiftIO[doobie.ConnectionIO]

I'm having this same problem too ...

I noticed the signature of LiftIO.liftK changed from liftK[F[_]: LiftIO]: IO ~> F to liftK[F[_]](implicit F: LiftIO[F]): IO ~> F
Davis Zanot
@dzanot
Working on this issue too :point_up: I think the cause is that Async no longer extends LiftIO, thus AsyncConnectionIO no longer satisfies the requirements of LiftIO.liftK... Which leads us to the question, is there a way to get a FunctionK[IO, ConnectionIO] with the new CE3 hierarchy? (ps I think myself and others came to this solution via https://stackoverflow.com/questions/59657203/doobie-lifting-arbitrary-effect-into-connectionio)
The ultimate goal of ours is to use our Logger[F] within the ConnectionIO context
Davis Zanot
@dzanot
Hmm I may have found a solution via WeakAsync (for posterity this was answered in Discourse https://discord.com/channels/632277896739946517/632727524434247691/851530913341898853) )
Though I don't full understand WeakAsync 🤔
danveer4686
@danveer4686
Hi Team, Below code is throwing compile error:
import doobie.hikari.HikariTransactor
import doobie.util.ExecutionContexts
import cats.effect.{ Resource, Blocker}
import zio.interop.catz._
import zio.Task

object SampleDoobie {

  def dbResource: Resource[Task, HikariTransactor[Task]] = {
    for {
      connectEC <- ExecutionContexts.fixedThreadPool[Task](20)
      xa        <- HikariTransactor.newHikariTransactor[Task](
        "org.postgresql.Driver", // driver classname
        "url", // connect URL
        "db_usr", // username
        "db_pass", // password
        connectEC,                              // await connection here
        Blocker.liftExecutionContext(connectEC) // transactEC // execute JDBC operations here
      )
    } yield xa
  }
}
Below is my sbt:
scalaVersion := "2.12.10"

val doobieVersion = "1.0.0-M3"
val calibanVersion= "0.10.1"
val zhttpVersion  = "1.0.0.0-RC17"
val zioInteropVersion = "3.1.1.0"

val libs = List(
  "org.tpolecat" %% "doobie-core"     % doobieVersion ,
  "org.tpolecat" %% "doobie-postgres" % doobieVersion ,
  "org.tpolecat" %% "doobie-h2"       % doobieVersion ,
  "org.tpolecat" %% "doobie-hikari"   % doobieVersion ,

  "com.github.ghostdogpr" %% "caliban" % calibanVersion,
  "com.github.ghostdogpr" %% "caliban-zio-http" %  calibanVersion,
  "io.d11" %% "zhttp" % zhttpVersion,
  "dev.zio" %% "zio-interop-cats" % zioInteropVersion
)

libraryDependencies ++= libs
Rob Norris
@tpolecat
What is the error?
danveer4686
@danveer4686
@tpolecat
error1 in creation of connectEC: No implicits found for parameter sf: Sync[Task]
error2 in import cats.effect.Blocker
Rob Norris
@tpolecat
It sounds like something may be pulling in both CE2 and CE3.
Witold Soczek
@v-tec2706
@danveer4686 did you managed to resolve that?
1 reply
eugeniyk
@eugeniyk
Hello guys
Please suggest how to use Put[A] instances with low-level API (if I understand correctly what is LL api - we are using HC.prepareCall, setObject and other Free monads), specially with CallableStatementIO
Idea is to make method below more type safe, as I understand Put incapsulates ability to unsafely set object within Prepared statement; currently it's 0.9.2 version
  private def setParams(params: Vector[Any]): CallableStatementIO[Vector[Unit]] = {
    params.zipWithIndex.traverse[CallableStatementIO, Unit] {
      case (p, i) =>
        setObject(i + 1, p.asInstanceOf[AnyRef])
    }
  }
eugeniyk
@eugeniyk
Not sure why we don't have high-level api for CallableStatement, this looks compilable at least
write: Write[A] = ???
item: A = ???
FCS.raw(write.unsafeSet(_,  index, item))
eugeniyk
@eugeniyk
https://tpolecat.github.io/doobie/docs/12-Custom-Mappings.html#column-vector-mappings
Does Read represent scenario of multiple result set returned from multiple queries / stored procedure?
Or it's just a generalization for cases when you return back rows of different types inside (but how is it possible?)
Rob Norris
@tpolecat
Read decodes a column vector into a value.
Get decodes a single element of a column vector into a value.
Nothing in doobie's high-level API can deal with multiple resultsets. You have to use the low-level API for that.