feel free to hang here, note that some of the "chat" happens on libera.chat #sqlalchemy
If I try this
```
result = []
for type_, table in self._table_map.items():
cols = list(
itertools.chain(*[self.alias_to_cols(e) for e in alias_set.query_entites()])
)
query = (
sql.select(
*self.base_trade_cols(table, type_),
*cols
)
.join(
alias_set.canonical_version,
alias_set.canonical_version.internal_id == table.c.internal_id
)
.join(
alias_set.common_metadata,
alias_set.common_metadata.internal_id == table.c.internal_id
)
)
expires_into = aliased(ExpiresInto, name="expires_into_upper")
query = add_metadata_for_common_metadata(
query, alias_set
)
if limit:
query = query.limit(limit)
result.append(
lambda_stmt(lambda: PatchedSelectStatementGrouping(query))
)
return lambda_stmt(lambda: union_all(*result).subquery('pjoin'))
``
ArgumentError: SELECT construct for inclusion in a UNION or other set construct expected, got StatementLambdaElement(<code object <lambda> at 0x10889bc90, file "/Users/rollokonig-brock/Developer/services/base/polymorphic_generator.py", line 198>).
reduce
over the result list to get that effect?
postgresql
, mysql
, sqlserver
. I'm using SQLAlchemy to execute raw SELECT queries. When there are lots of rows, I don't know if it's lazy loading when I iterate and extract the result set. For e.g:with engine.begin() as connection:
result = connection.execute('select * from "table_1"')
if result:
for row in result:
do_something(row)
Hi all.
I am trying to use the dataclass-way of writing my table classes.
I am stuck in writing a mixin that uses ForeignKey
Code:
@dataclasses.dataclass
class BaseMixin:
created_by_id: uuid.UUID = dataclasses.field(
init=False, metadata={"sa": lambda: Column(ForeignKey("user.id"))}
)
created_by: "User" = dataclasses.field(
init=False,
metadata={"sa": lambda: relationship("User", foreign_keys=[...])},
)
updated_by_id: uuid.UUID = dataclasses.field(
init=False, metadata={"sa": lambda: Column(ForeignKey("user.id"))}
)
updated_by: "User" = dataclasses.field(
init=False,
metadata={"sa": lambda: relationship("User", foreign_keys=[...])},
)
How to access the _id
fields in foreign_keys
?
Using normal as_declarative
, I have access to the cls
in the class method
@declared_attr
def created_by(cls):
return relationship(
"User",
foreign_keys=[cls.created_by_id],
)
Hello there - I am having some problem to connect to multiple hosts-based PostgreSQL.
I used the potgresql parameter target_session_attrs=primary
so that it finds the primary node for writing and reading, otherwise cycle through the list of available hosts. However, I am getting the same error.
Could anyone help?
So, I have a bit of an interesting problem. For reasons I won't get into here, I want to store binary data in a database. When a client sends me a value, my program should check whether that value, encoded in binary data, is stored in the database.
Here is somewhat of an MCVE:
from http.server import BaseHTTPRequestHandler, HTTPServer
import json
from sqlalchemy import BINARY, Column, Date, ForeignKey, LargeBinary, create_engine, func, select
import sqlalchemy
from sqlalchemy.engine.base import Engine
from sqlalchemy.orm import declarative_base
from sqlalchemy import Column, String, Integer, DateTime
from sqlalchemy.orm import relationship, Session
from sqlalchemy import insert
Base = declarative_base()
engine = create_engine(
"mysql+pymysql://user:password@localhost/api_database")
class Token(Base):
__tablename__ = "Tokens"
token = Column(BINARY(64), primary_key=True, nullable=False)
Base.metadata.create_all(engine)
with Session(bind=engine) as session:
# add token
session.add(Token(token="$2b$2t.e".encode("utf8")))
session.commit()
class Server(BaseHTTPRequestHandler):
def do_POST(self):
input_length = int(self.headers.get("Content-Length"))
input = self.rfile.read(input_length)
input_json = json.loads(input)
# check if passed token exists in table
resultset = engine.execute(
select(func.count(Token.token)).where(
Token.token == input_json["token"].encode("utf8")))
result_count = resultset.first()
if (result_count[0] > 0):
# found the token
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(
json.dumps({
"success": True
}).encode("iso-8859-1"))
else:
# token not found
self.send_response(400)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(
json.dumps({
"success": False
}).encode("iso-8859-1"))
if __name__ == "__main__":
web_server = HTTPServer(("0.0.0.0", 8080), Server)
print("Server started at {0}:{1}".format("0.0.0.0", 8080))
try:
web_server.serve_forever()
except KeyboardInterrupt:
pass
web_server.server_close()
print("Server stopped.")
import sqlalchemy as sa
engine = sa.create_engine("mysql+pymysql://scott:tiger@127.0.0.1:3307/mydb")
table_name = "moonman239"
with engine.begin() as conn:
conn.exec_driver_sql(f"DROP TABLE IF EXISTS {table_name}")
conn.exec_driver_sql(f"CREATE TABLE {table_name} (token MEDIUMBLOB)")
conn.exec_driver_sql(
f"INSERT INTO {table_name} (token) VALUES (X'476F7264')"
)
tbl = sa.Table(table_name, sa.MetaData(), autoload_with=engine)
engine.echo = True
with engine.begin() as conn:
result = conn.execute(sa.select(tbl).where(tbl.c.token == b"\x47\x6f\x72\x64")).one()
print(result) # (b'Gord',)
"""SQL emitted:
SELECT moonman239.token
FROM moonman239
WHERE moonman239.token = %(token_1)s
[generated in 0.00077s] {'token_1': b'Gord'}
"""
@moonman239_twitter - BINARY(64) columns are right-padded with nulls, so you'd need to pad out your search term:
token = b"\x47\x6f\x72\x64"
token = token + (b"\00" * (64 - len(token)))
result = conn.execute(sa.select(tbl).where(tbl.c.token == token)).one()
VARBINARY might be a better choice.
hey, y'all … I'm trying to do use async API, but register_composites()
is throwing an error with a postgresql+asyncpg
engine
down the stack, it complains TypeError: AsyncAdapt_asyncpg_connection.cursor() got an unexpected keyword argument 'cursor_factory'
sqlalchemy_utils.register_composites(conn.sync_connection)
, with conn.sync_engine.begin() as sync_conn: …
& sqlalchemy_utils.register_composites(await conn.get_raw_connection())
without any luck
engine
was created with create_async_engine(, future=True)
below. I left out filenames from my private repo….py:110: in create_configuration
async with app.db.begin() as conn:
../../homebrew/Cellar/python@3.10/3.10.4/Frameworks/Python.framework/Versions/3.10/lib/python3.10/contextlib.py:199: in __aenter__
return await anext(self.gen)
app/db.py:80: in begin
await conn.run_sync(sqlalchemy_utils.register_composites)
../../../.local/share/virtualenvs/sqlalchemy-asyncio-8jUFBPyQ/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/engine.py:546: in run_sync
return await greenlet_spawn(fn, conn, *arg, **kw)
../../../.local/share/virtualenvs/sqlalchemy-asyncio-8jUFBPyQ/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py:115: in greenlet_spawn
result = context.switch(*args, **kwargs)
../../../.local/share/virtualenvs/sqlalchemy-asyncio-8jUFBPyQ/lib/python3.10/site-packages/sqlalchemy_utils/types/pg_composite.py:325: in register_composites
register_psycopg2_composite(
../../../.local/share/virtualenvs/sqlalchemy-asyncio-8jUFBPyQ/lib/python3.10/site-packages/sqlalchemy_utils/types/pg_composite.py:271: in register_psycopg2_composite
psycopg2.extras.register_composite(
../../../.local/share/virtualenvs/sqlalchemy-asyncio-8jUFBPyQ/lib/python3.10/site-packages/psycopg2/extras.py:1136: in register_composite
caster = factory._from_db(name, conn_or_curs)
../../../.local/share/virtualenvs/sqlalchemy-asyncio-8jUFBPyQ/lib/python3.10/site-packages/psycopg2/extras.py:1073: in _from_db
conn, curs = _solve_conn_curs(conn_or_curs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
conn_or_curs = <AdaptedConnection <asyncpg.connection.Connection object at 0x1068e6a40>>
def _solve_conn_curs(conn_or_curs):
"""Return the connection and a DBAPI cursor from a connection or cursor."""
if conn_or_curs is None:
raise psycopg2.ProgrammingError("no connection or cursor provided")
if hasattr(conn_or_curs, 'execute'):
conn = conn_or_curs.connection
curs = conn.cursor(cursor_factory=_cursor)
else:
conn = conn_or_curs
> curs = conn.cursor(cursor_factory=_cursor)
E TypeError: AsyncAdapt_asyncpg_connection.cursor() got an unexpected keyword argument 'cursor_factory'
../../../.local/share/virtualenvs/sqlalchemy-asyncio-8jUFBPyQ/lib/python3.10/site-packages/psycopg2/extras.py:787: TypeError
============================================================================================== short test summary info ==============================================================================================
ERROR app/catalog_search_service/test/test_config_api.py::test_update_config - TypeError: AsyncAdapt_asyncpg_connection.cursor() got an unexpected keyword argument 'cursor_factory'
f"postgresql+asyncpg://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}"
async
calls
sqlalchemy_utils
module ... it calls psycopg2 stuff underneath