Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Repo info
Activity
Drew Boardman
@drewboardman
weird, this is how it's written in my dependencies
val doobieV              = "1.0.0-M5"

    "org.tpolecat"       %% "doobie-hikari"       % doobieV, //Streaming JDBC w/ Hikari support
    "org.tpolecat"       %% "doobie-postgres"     % doobieV, //Doobie Postgres support (replace if needed)
    "org.tpolecat"       %% "doobie-refined"      % doobieV //Doobie Postgres support (replace if needed)
Rob Norris
@tpolecat
did you reload your project after changing it?
Drew Boardman
@drewboardman
yeah i'm not sure why these errors popped up like this. I had to update a library that changed orgs and the errors went away
Greg Fisher
@gnfisher

I am processing data in streams. I have a Stream[ConnectionIO, (A, B)]. I am trying to save the A's and B's to the database. The B can't be saved unless it can find the local id of the A, so the As are processed first. This is the step I have and it is not working. The As save, but the Bs do not. I am not sure if its a misunderstanding of streams and evaluations (maybe inside the flatMap the two new streams are being evaluated concurrently, so B cant find local A to save?), or if there is something with doobie, transactions, etc perhaps I am doing something very wrong.

This is the code snippet if anyone can point me in the right direction to debug this further. processA and processB are Pipe[Connection, A, C]

    input.chunks.flatMap { chunk =>
      Stream.eval(Stream.chunk(chunk).map(_.a).through(processA).compile.drain) >>
        Stream.chunk(chunk).map(_.b).through(processB)
    }
Its not really a tuple (A, B) but a value, hence the map step there... but you get the idea.
If run in isolation both process steps work as expected, but when doing things this way, only processA has expected results.
Fabio Labella
@SystemFw
I didn't fully get your reply to my reply in the fs2 channel :)
Greg Fisher
@gnfisher
:wave: The "why do we have pipes"?
Its how its been, its the pattern I followed. I don't have a better explanation unfortunately, this is all pushing past where I have any sort of confidence in what i know and understand
What would an alternative to a pipe look like? One the pipe for parts is just an abstraction that wraps doobies updateManyWithGeneratedKeys. What else might it be if not a pipe? Or, why does using a pipe here cause trouble? Someone else mentioned refactoring the pipes to Chunk[A] => IO[Chunk[A]] might help but I'm not quite sure what they mean by that or where the change would happen. I think they thought that perhaps having these all in ConnectionIO could be a source of issues?
(also, happy to continue chatting in fs2 if this fits better there)
Grzegorz walen
@GrzegorzWalen_twitter
Hi,
I'm using Quill + Doobie to access db. What is the DoobieContext equivalent of QuillContext translate, or any other option I can enforce logging the
queries with parameter values ? I'm in the dead end any help appropriated.
Dmitry Polienko
@nigredo-tori

@gnfisher, with chunks the above can be simplified to

input.evalMapChunk { chunk =>
  processA(chunk.map(_.a)) >> processB(chunk.map(_.b))
}

Here processA: Chunk[A] => ConnectionIO[Chunk[B]] and so on. It's a little easier to reason about than streams.
Upd: removed the .chunks call.

Also less opportunity to confuse .eval and .eval_.
Greg Fisher
@gnfisher
Thanks @nigredo-tori , but I am still not sure why my more hard to read version doesn't behave as expected.
I will try that refactor to see if it makes a difference
Oh, I am on 2.0.1 of fs2 so i dont have evalMapChunk :(
Dmitry Polienko
@nigredo-tori
foo.evalMapChunk(bar) is foo.chunks.evalMap(bar).flatMap(Stream.chunk(_)).
Greg Fisher
@gnfisher
Would that example result in processA and processB running concurrently?
Dmitry Polienko
@nigredo-tori
No.
Greg Fisher
@gnfisher
I didn't think so. But that was the only explanation I could think of for why the processB records didnt persist to the DB while the As did. And if they are run in isolation, they each work as expected.
Fabio Labella
@SystemFw
are you sure you want >> and not ++?
Greg Fisher
@gnfisher
I tried ++ (by compiling and draining both) but that didnt work either. Although now all the cases I tried are blurring together, so I can go back and try again
but processA does not return a nything that processB takes as input
Fabio Labella
@SystemFw
yeah
Greg Fisher
@gnfisher
I feel very dumb these past days! :sweat:
Fabio Labella
@SystemFw
if the stream returned by processA is empty
I need to go, but text me later, and I can explain you >> vs ++
Greg Fisher
@gnfisher
Ok, thank you so much for all your help throughout this saga
Fabio Labella
@SystemFw
it's not hard but if you don't clearly know the difference it can be confusing, nothing to feel dumb about :)
Kristian Nordal
@nor

Hi, I'm having an issue with a mssql query. I try to fetch all ~25k rows from a table, and get this:

Jun 04, 2021 9:05:24 AM com.microsoft.sqlserver.jdbc.TDSReader throwInvalidTDS
SEVERE: ConnectionID:3 ClientConnectionId: 3c8eb486-fe5f-4e72-b267-69e79e780b93 got unexpected value in TDS response at offset:7992
Jun 04, 2021 9:05:24 AM com.microsoft.sqlserver.jdbc.TDSParser throwUnexpectedTokenException
SEVERE: ConnectionID:3 ClientConnectionId: 3c8eb486-fe5f-4e72-b267-69e79e780b93: getNextResult: Encountered unexpected TDS_FEATURE_EXT_UTF8SUPPORT (0x0A)

which to me looks like an encoding issue, but when I paginate the query; e.g. order by foo offset 0 rows fetch next 5000 rows only – I can query all the pages fine.
I'm using doobie, but this might be a pure jdbc issue?

jatcwang
@jatcwang:matrix.org
[m]
@nor: yep looks like it. Give raw jdbc a go and see if you get the same issue
Roberto Leibman
@rleibman

ok.... here's a question.... I have a bunch of ConnectionIO's strung together in a for comprehension.... half way through these I need to save something to the file system and base on where it ends up update a database. Because I'm using zio, this is somewhat what I have:

def save(saveMe: MyComplicatedObject, saveToFileSystem: MyComplicatedObject => ZIO[A,B,File]): ConnectionIO[MyComplicatedObject] = {
  for {
     res1 <- savePart1(saveMe) //returns ConnectionIO[MyComplicatedObject]
     res2 <- savePart2(res1) //returns ConnectionIO[MyComplicatedObject]
     res3 <- saveToFileSystem(res2) // <<<<< here... how to convert ZIO to ConnectionIO
     res4 <- savePart3(res3) //returns ConnectionIO[MyComplicatedObject]
  } yield res4
}

As you can see, the basic question, is how to convert a ZIO to a ConnectionIO? I know I can use .pure to get a ConnectionIO out of a normal thing, but I doubt .pure on a ZIO IO would give me what I want.

Rob Norris
@tpolecat
You probably don’t want to do that because if the transaction rolls back the filesystem operation will persist.
But in any case if you use cats-effect IO you can say myIO.to[ConnectionIO]. It may or may not work with ZIO, I don’t know.
Roberto Leibman
@rleibman
Yeah, I thought of that as well.... it's a personal project, I don't care this time around if there's a few extra files lying around. I'll see if that works (it doesn't yet compile, I think I need to figure out how to convert it to a cats-effect with the interop library.
vonchav
@voonchav_gitlab
@rleibman I assume you already have import zio.interop.catz._ in this module. So when you define savePart1, savePart2, savePart3, they should return zio.Task[?], instead of ConnectionIO. Same goes with this savemethod. The interop should perform the magic and convert ConnectionIO to zio.Task automatically.
Roberto Leibman
@rleibman
@voonchav_gitlab yes, maybe, but I think I want them to stay as ConnectionIO, cause eventually I want 'em all to run in a transact, which is why I want to convert the ZIO to a ConnectionIO (and yes, they'll convert to ZIO later)
vonchav
@voonchav_gitlab
Hi @rleibman, in that case, for the example you gave, saveToFileSystem itself should return ConnectionIO instead of ZIO. Perhaps in that method, you are using a lib or other custom code that returns a ZIO. In that case, I'm not aware of a lib that can convert from ZIO to ConnectionIO. You might have to evaluate the ZIO effect right there and then wrap the result into a ConnectionIO.
I think I understand what you're trying to do in the example. But in reality, having a non-DB operation as part of a DB transaction is a bit weird because I don't think a DB rollback can undo the change made in saveToFileSystem. In the distributed (or not) saga pattern, you would specify compensation actions in event of a rollback (for all participating systems). I don't think ConnectionIO has something like that because I assume it just relies on the underlying RDBMS engine's rollback mechanics. I'm interested in a solution though.
vonchav
@voonchav_gitlab
I skimmed through ZIO's interop-cats source code and I didn't spot routines going from ZIO to Cats IO. I might be wrong though. You should post this question on ZIO discord. Like I mentioned, you can always call unsafeRun on a ZIO effect and lift the result into Cats IO if you choose to do so.
heksenlied
@heksenlied:matrix.org
[m]
@rleibman: In our project, we are using https://github.com/gaelrenoux/tranzactio to lift the ConnectionIO to ZIO. You might want to look into that.
Roberto Leibman
@rleibman
That sounds awesome, I'll definitely try that.
Alfonso Nishikawa
@alfonsonishikawa

Hello!
I'm new around. I have a question: Let's say I have a stream of Integers from a query, and I want to save all of them in a single transaction but without converting it to a List. Is it possible? :/

def (intStream: fs2.Stream[ConnectionIO,Int]): IO[Unit] = {
  ???
}

I really miss more documentation and examples on fs2.Streams and doobie related :_(

Alfonso Nishikawa
@alfonsonishikawa

Is something like this correct? Would it be memory-safe?

// for simplicity, insertValIntoOtherTable: Int => ConnectionIO[Unit]

def (intStream: fs2.Stream[ConnectionIO,Int]): IO[Unit] = {
    intStream.compile.fold(connection.unit)(
      (accConnectionIO, intVal) => {
        accConnectionIO *> insertValIntoOtherTable(intVal)
      }
    ).flatten.transact(transactor)
}

That last flatten... :/

Ian Agius
@ianagius
Hi,
Im trying to write an INSERT ON DUPLICATE UPDATE query in doobie. Usually when doing an INSERT only I use withUniqueGeneratedKeys[Int]("id") to get the auto-generated ID of the inserted row. I tried doing the same for the INSERT ON DUPLICATE UPDATE query but it was returning the following error when a query updates instead of insertingExpected ResultSet exhaustion, but more rows were available.. I think this occurs because on UPDATE MYSQL returns 2 rows updated to indicate that there was an update instead of an insert. Does this error happen because the withUniqueGeneratedKeys is expecting 1 row only to be updated? I 'fixed' it by using update.run instead. Still I would like to get the original id in case of update. Do you have any recommendations on how to go around it please?
Rob Norris
@tpolecat
MySQL may not support updates returning keys.
I don't know, sorry. If you can figure out how you would do it with JDBC we can figure out how it would look in doobie.
Ian Agius
@ianagius
I will research a bit tomorrow and if I find something I will share. Thanks a lot for your time and prompt response.
Anton Solovyev
@Rosteelton
Hi! Can I get Meta automatically for opaque type in scala 3? (without writing Meta[Int].timap(identity)(identity))
daenyth
@daenyth:matrix.org
[m]
If it doesn't happen out of the box, it seems reasonable that the feature should be added - open a github ticket?