Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Greg Fisher
@gnfisher
Would that example result in processA and processB running concurrently?
Dmitry Polienko
@nigredo-tori
No.
Greg Fisher
@gnfisher
I didn't think so. But that was the only explanation I could think of for why the processB records didnt persist to the DB while the As did. And if they are run in isolation, they each work as expected.
Fabio Labella
@SystemFw
are you sure you want >> and not ++?
Greg Fisher
@gnfisher
I tried ++ (by compiling and draining both) but that didnt work either. Although now all the cases I tried are blurring together, so I can go back and try again
but processA does not return a nything that processB takes as input
Fabio Labella
@SystemFw
yeah
Greg Fisher
@gnfisher
I feel very dumb these past days! :sweat:
Fabio Labella
@SystemFw
if the stream returned by processA is empty
I need to go, but text me later, and I can explain you >> vs ++
Greg Fisher
@gnfisher
Ok, thank you so much for all your help throughout this saga
Fabio Labella
@SystemFw
it's not hard but if you don't clearly know the difference it can be confusing, nothing to feel dumb about :)
Kristian Nordal
@nor

Hi, I'm having an issue with a mssql query. I try to fetch all ~25k rows from a table, and get this:

Jun 04, 2021 9:05:24 AM com.microsoft.sqlserver.jdbc.TDSReader throwInvalidTDS
SEVERE: ConnectionID:3 ClientConnectionId: 3c8eb486-fe5f-4e72-b267-69e79e780b93 got unexpected value in TDS response at offset:7992
Jun 04, 2021 9:05:24 AM com.microsoft.sqlserver.jdbc.TDSParser throwUnexpectedTokenException
SEVERE: ConnectionID:3 ClientConnectionId: 3c8eb486-fe5f-4e72-b267-69e79e780b93: getNextResult: Encountered unexpected TDS_FEATURE_EXT_UTF8SUPPORT (0x0A)

which to me looks like an encoding issue, but when I paginate the query; e.g. order by foo offset 0 rows fetch next 5000 rows only – I can query all the pages fine.
I'm using doobie, but this might be a pure jdbc issue?

jatcwang
@jatcwang:matrix.org
[m]
@nor: yep looks like it. Give raw jdbc a go and see if you get the same issue
Roberto Leibman
@rleibman

ok.... here's a question.... I have a bunch of ConnectionIO's strung together in a for comprehension.... half way through these I need to save something to the file system and base on where it ends up update a database. Because I'm using zio, this is somewhat what I have:

def save(saveMe: MyComplicatedObject, saveToFileSystem: MyComplicatedObject => ZIO[A,B,File]): ConnectionIO[MyComplicatedObject] = {
  for {
     res1 <- savePart1(saveMe) //returns ConnectionIO[MyComplicatedObject]
     res2 <- savePart2(res1) //returns ConnectionIO[MyComplicatedObject]
     res3 <- saveToFileSystem(res2) // <<<<< here... how to convert ZIO to ConnectionIO
     res4 <- savePart3(res3) //returns ConnectionIO[MyComplicatedObject]
  } yield res4
}

As you can see, the basic question, is how to convert a ZIO to a ConnectionIO? I know I can use .pure to get a ConnectionIO out of a normal thing, but I doubt .pure on a ZIO IO would give me what I want.

Rob Norris
@tpolecat
You probably don’t want to do that because if the transaction rolls back the filesystem operation will persist.
But in any case if you use cats-effect IO you can say myIO.to[ConnectionIO]. It may or may not work with ZIO, I don’t know.
Roberto Leibman
@rleibman
Yeah, I thought of that as well.... it's a personal project, I don't care this time around if there's a few extra files lying around. I'll see if that works (it doesn't yet compile, I think I need to figure out how to convert it to a cats-effect with the interop library.
vonchav
@voonchav_gitlab
@rleibman I assume you already have import zio.interop.catz._ in this module. So when you define savePart1, savePart2, savePart3, they should return zio.Task[?], instead of ConnectionIO. Same goes with this savemethod. The interop should perform the magic and convert ConnectionIO to zio.Task automatically.
Roberto Leibman
@rleibman
@voonchav_gitlab yes, maybe, but I think I want them to stay as ConnectionIO, cause eventually I want 'em all to run in a transact, which is why I want to convert the ZIO to a ConnectionIO (and yes, they'll convert to ZIO later)
vonchav
@voonchav_gitlab
Hi @rleibman, in that case, for the example you gave, saveToFileSystem itself should return ConnectionIO instead of ZIO. Perhaps in that method, you are using a lib or other custom code that returns a ZIO. In that case, I'm not aware of a lib that can convert from ZIO to ConnectionIO. You might have to evaluate the ZIO effect right there and then wrap the result into a ConnectionIO.
I think I understand what you're trying to do in the example. But in reality, having a non-DB operation as part of a DB transaction is a bit weird because I don't think a DB rollback can undo the change made in saveToFileSystem. In the distributed (or not) saga pattern, you would specify compensation actions in event of a rollback (for all participating systems). I don't think ConnectionIO has something like that because I assume it just relies on the underlying RDBMS engine's rollback mechanics. I'm interested in a solution though.
vonchav
@voonchav_gitlab
I skimmed through ZIO's interop-cats source code and I didn't spot routines going from ZIO to Cats IO. I might be wrong though. You should post this question on ZIO discord. Like I mentioned, you can always call unsafeRun on a ZIO effect and lift the result into Cats IO if you choose to do so.
heksenlied
@heksenlied:matrix.org
[m]
@rleibman: In our project, we are using https://github.com/gaelrenoux/tranzactio to lift the ConnectionIO to ZIO. You might want to look into that.
Roberto Leibman
@rleibman
That sounds awesome, I'll definitely try that.
Alfonso Nishikawa
@alfonsonishikawa

Hello!
I'm new around. I have a question: Let's say I have a stream of Integers from a query, and I want to save all of them in a single transaction but without converting it to a List. Is it possible? :/

def (intStream: fs2.Stream[ConnectionIO,Int]): IO[Unit] = {
  ???
}

I really miss more documentation and examples on fs2.Streams and doobie related :_(

Alfonso Nishikawa
@alfonsonishikawa

Is something like this correct? Would it be memory-safe?

// for simplicity, insertValIntoOtherTable: Int => ConnectionIO[Unit]

def (intStream: fs2.Stream[ConnectionIO,Int]): IO[Unit] = {
    intStream.compile.fold(connection.unit)(
      (accConnectionIO, intVal) => {
        accConnectionIO *> insertValIntoOtherTable(intVal)
      }
    ).flatten.transact(transactor)
}

That last flatten... :/

Ian Agius
@ianagius
Hi,
Im trying to write an INSERT ON DUPLICATE UPDATE query in doobie. Usually when doing an INSERT only I use withUniqueGeneratedKeys[Int]("id") to get the auto-generated ID of the inserted row. I tried doing the same for the INSERT ON DUPLICATE UPDATE query but it was returning the following error when a query updates instead of insertingExpected ResultSet exhaustion, but more rows were available.. I think this occurs because on UPDATE MYSQL returns 2 rows updated to indicate that there was an update instead of an insert. Does this error happen because the withUniqueGeneratedKeys is expecting 1 row only to be updated? I 'fixed' it by using update.run instead. Still I would like to get the original id in case of update. Do you have any recommendations on how to go around it please?
Rob Norris
@tpolecat
MySQL may not support updates returning keys.
I don't know, sorry. If you can figure out how you would do it with JDBC we can figure out how it would look in doobie.
Ian Agius
@ianagius
I will research a bit tomorrow and if I find something I will share. Thanks a lot for your time and prompt response.
Anton Solovyev
@Rosteelton
Hi! Can I get Meta automatically for opaque type in scala 3? (without writing Meta[Int].timap(identity)(identity))
daenyth
@daenyth:matrix.org
[m]
If it doesn't happen out of the box, it seems reasonable that the feature should be added - open a github ticket?
Sebastian Voss
@sebastianvoss

Hi! I'm using the notify/listen functionality of Postgres. The notify is executed in a DB trigger. It seems after some time my program stops receiving notifications but the stream does not stop with an error:

def notificationStream(
      channelName: String,
      pollingInterval: FiniteDuration
  ): Stream[F, PGNotification] = {
    val inner: Pipe[ConnectionIO, FiniteDuration, PGNotification] = ticks =>
      for {
        _  <- Stream.resource(channel(channelName))
        _  <- ticks
        ns <- Stream.eval(PHC.pgGetNotifications <* HC.commit)
        _  <- Stream.eval(logger.info(s"Received ${ns.size} notifications"))
        n  <- Stream.emits(ns)
      } yield n
    awakeEvery[F](pollingInterval).through(inner.transact(xa))
  }

Does somebody have a hint what could be wrong?

Rob Norris
@tpolecat
I have wondered about the reliability of this mechanism for a while. I don’t have an answer but would be interested if you find anything out elsewhere. JDBC users may have run into this.
Sebastian Voss
@sebastianvoss
thx @tpolecat, I will try to reproduce also with psql. Will report back.
Sebastian Voss
@sebastianvoss
@tpolecat , I found impossibl/pgjdbc-ng#517 which sounds similar. They are blaming it on GC. Could this be the right direction?
Rob Norris
@tpolecat
On my phone, I’ll have a look later. Thanks for digging!
Sebastian Voss
@sebastianvoss
A few more observations: In psql I can see a similar situation. After some time I get The connection to the server was lost. Attempting reset: Succeeded. and need to execute listen <channel> again to continue to receive notifications. As a brute force attempt I added Stream.eval(PHC.pgListen(channelName)) to the inner pipe. This seems to work. But I will continue to look for a smarter way.
Walter Chang
@weihsiu
trying to use scala 3 opaque type with Put and Get. the code compiles but querying Country is non-terminating.
opaque type Code = String
object Code:
  def apply(code: String): Code = code
extension (code: Code) def value: String = code
given Get[Code] = Get[String].tmap[Code](x => Code(x))
given Put[Code] = Put[String].tcontramap[Code](x => x.value)

case class Country(code: Code, name: String, pop: Int, gnp: Option[Double])
Rob Norris
@tpolecat
Hm seems like that should work.
bentucker
@bentucker

after updating a project to cats-effect version 3.1.1 & doobie 1.0.0-M5 trying to lift IO to ConnectionIO with this helper

val liftToConnIO: FunctionK[IO, doobie.ConnectionIO] = LiftIO.liftK[doobie.ConnectionIO]

generates an error could not find implicit value for parameter F: cats.effect.LiftIO[doobie.ConnectionIO]

I'm having this same problem too ...

I noticed the signature of LiftIO.liftK changed from liftK[F[_]: LiftIO]: IO ~> F to liftK[F[_]](implicit F: LiftIO[F]): IO ~> F
Davis Zanot
@dzanot
Working on this issue too :point_up: I think the cause is that Async no longer extends LiftIO, thus AsyncConnectionIO no longer satisfies the requirements of LiftIO.liftK... Which leads us to the question, is there a way to get a FunctionK[IO, ConnectionIO] with the new CE3 hierarchy? (ps I think myself and others came to this solution via https://stackoverflow.com/questions/59657203/doobie-lifting-arbitrary-effect-into-connectionio)
The ultimate goal of ours is to use our Logger[F] within the ConnectionIO context
Davis Zanot
@dzanot
Hmm I may have found a solution via WeakAsync (for posterity this was answered in Discourse https://discord.com/channels/632277896739946517/632727524434247691/851530913341898853) )
Though I don't full understand WeakAsync 🤔
danveer4686
@danveer4686
Hi Team, Below code is throwing compile error:
import doobie.hikari.HikariTransactor
import doobie.util.ExecutionContexts
import cats.effect.{ Resource, Blocker}
import zio.interop.catz._
import zio.Task

object SampleDoobie {

  def dbResource: Resource[Task, HikariTransactor[Task]] = {
    for {
      connectEC <- ExecutionContexts.fixedThreadPool[Task](20)
      xa        <- HikariTransactor.newHikariTransactor[Task](
        "org.postgresql.Driver", // driver classname
        "url", // connect URL
        "db_usr", // username
        "db_pass", // password
        connectEC,                              // await connection here
        Blocker.liftExecutionContext(connectEC) // transactEC // execute JDBC operations here
      )
    } yield xa
  }
}
Below is my sbt:
scalaVersion := "2.12.10"

val doobieVersion = "1.0.0-M3"
val calibanVersion= "0.10.1"
val zhttpVersion  = "1.0.0.0-RC17"
val zioInteropVersion = "3.1.1.0"

val libs = List(
  "org.tpolecat" %% "doobie-core"     % doobieVersion ,
  "org.tpolecat" %% "doobie-postgres" % doobieVersion ,
  "org.tpolecat" %% "doobie-h2"       % doobieVersion ,
  "org.tpolecat" %% "doobie-hikari"   % doobieVersion ,

  "com.github.ghostdogpr" %% "caliban" % calibanVersion,
  "com.github.ghostdogpr" %% "caliban-zio-http" %  calibanVersion,
  "io.d11" %% "zhttp" % zhttpVersion,
  "dev.zio" %% "zio-interop-cats" % zioInteropVersion
)

libraryDependencies ++= libs
Rob Norris
@tpolecat
What is the error?
danveer4686
@danveer4686
@tpolecat
error1 in creation of connectEC: No implicits found for parameter sf: Sync[Task]
error2 in import cats.effect.Blocker