feel free to hang here, note that some of the "chat" happens on libera.chat #sqlalchemy
Hey all,
I'm creating an async session using sessionmaker
and for some reason my sessions to the db are not getting closed properly.
This is the engine i'm passing to the sessionmaker
:
engine = create_async_engine(
settings.pg_dsn,
pool_size=100,
max_overflow=400,
json_serializer=_pydantic_json_serializer,
)
Also, i'm using the session inside a middleware with context manager so the session should be closed when returned :
async def db_session_committer_middleware(request: Request, call_next):
async with make_session_factory()() as session:
request.state.async_session = session
response = Response("Internal server error", status_code=500)
try:
response = await call_next(request)
except Exception:
await session.rollback()
raise
try:
await session.commit()
except Exception:
get_logger().exception("Post-request commit failed:")
raise
return response
As you can see the connection remain opened after the request has completed (In the screenshot below)
Tried to follow this comment (https://github.com/sqlalchemy/sqlalchemy/issues/8145#issuecomment-1178689770), but couldn't resolve the issue.
NullPool
, those connections are just released to the pool rather than closed when the sessions are closed
session.execute(query)
takes 2 seconds to build the query (before execution), while str(query)
takes 200ms.
And I'm back at the original issue
ncalls tottime percall cumtime percall filename:lineno(function)
416141 0.027 0.000 0.027 0.000 annotation.py:216(__hash__)
301121 0.036 0.000 0.036 0.000 {built-in method builtins.isinstance}
199574 0.008 0.000 0.008 0.000 {built-in method builtins.id}
185614/1 0.354 0.000 0.501 0.501 traversals.py:159(_gen_cache_key)
The problem is a massive allocation to _gen_cache_key. It's absolutely huge. One call, then nearly 200'000 recursive calls to the CacheKey named tuple. It's 0.5s of python.
The AST is quite large, but I don't have close 200'000 elements in my query, so I'm a little confused as to why this got so big.
If I try this
```
result = []
for type_, table in self._table_map.items():
cols = list(
itertools.chain(*[self.alias_to_cols(e) for e in alias_set.query_entites()])
)
query = (
sql.select(
*self.base_trade_cols(table, type_),
*cols
)
.join(
alias_set.canonical_version,
alias_set.canonical_version.internal_id == table.c.internal_id
)
.join(
alias_set.common_metadata,
alias_set.common_metadata.internal_id == table.c.internal_id
)
)
expires_into = aliased(ExpiresInto, name="expires_into_upper")
query = add_metadata_for_common_metadata(
query, alias_set
)
if limit:
query = query.limit(limit)
result.append(
lambda_stmt(lambda: PatchedSelectStatementGrouping(query))
)
return lambda_stmt(lambda: union_all(*result).subquery('pjoin'))
``
ArgumentError: SELECT construct for inclusion in a UNION or other set construct expected, got StatementLambdaElement(<code object <lambda> at 0x10889bc90, file "/Users/rollokonig-brock/Developer/services/base/polymorphic_generator.py", line 198>).
reduce
over the result list to get that effect?
postgresql
, mysql
, sqlserver
. I'm using SQLAlchemy to execute raw SELECT queries. When there are lots of rows, I don't know if it's lazy loading when I iterate and extract the result set. For e.g:with engine.begin() as connection:
result = connection.execute('select * from "table_1"')
if result:
for row in result:
do_something(row)
Hi all.
I am trying to use the dataclass-way of writing my table classes.
I am stuck in writing a mixin that uses ForeignKey
Code:
@dataclasses.dataclass
class BaseMixin:
created_by_id: uuid.UUID = dataclasses.field(
init=False, metadata={"sa": lambda: Column(ForeignKey("user.id"))}
)
created_by: "User" = dataclasses.field(
init=False,
metadata={"sa": lambda: relationship("User", foreign_keys=[...])},
)
updated_by_id: uuid.UUID = dataclasses.field(
init=False, metadata={"sa": lambda: Column(ForeignKey("user.id"))}
)
updated_by: "User" = dataclasses.field(
init=False,
metadata={"sa": lambda: relationship("User", foreign_keys=[...])},
)
How to access the _id
fields in foreign_keys
?
Using normal as_declarative
, I have access to the cls
in the class method
@declared_attr
def created_by(cls):
return relationship(
"User",
foreign_keys=[cls.created_by_id],
)
Hello there - I am having some problem to connect to multiple hosts-based PostgreSQL.
I used the potgresql parameter target_session_attrs=primary
so that it finds the primary node for writing and reading, otherwise cycle through the list of available hosts. However, I am getting the same error.
Could anyone help?
So, I have a bit of an interesting problem. For reasons I won't get into here, I want to store binary data in a database. When a client sends me a value, my program should check whether that value, encoded in binary data, is stored in the database.
Here is somewhat of an MCVE:
from http.server import BaseHTTPRequestHandler, HTTPServer
import json
from sqlalchemy import BINARY, Column, Date, ForeignKey, LargeBinary, create_engine, func, select
import sqlalchemy
from sqlalchemy.engine.base import Engine
from sqlalchemy.orm import declarative_base
from sqlalchemy import Column, String, Integer, DateTime
from sqlalchemy.orm import relationship, Session
from sqlalchemy import insert
Base = declarative_base()
engine = create_engine(
"mysql+pymysql://user:password@localhost/api_database")
class Token(Base):
__tablename__ = "Tokens"
token = Column(BINARY(64), primary_key=True, nullable=False)
Base.metadata.create_all(engine)
with Session(bind=engine) as session:
# add token
session.add(Token(token="$2b$2t.e".encode("utf8")))
session.commit()
class Server(BaseHTTPRequestHandler):
def do_POST(self):
input_length = int(self.headers.get("Content-Length"))
input = self.rfile.read(input_length)
input_json = json.loads(input)
# check if passed token exists in table
resultset = engine.execute(
select(func.count(Token.token)).where(
Token.token == input_json["token"].encode("utf8")))
result_count = resultset.first()
if (result_count[0] > 0):
# found the token
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(
json.dumps({
"success": True
}).encode("iso-8859-1"))
else:
# token not found
self.send_response(400)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(
json.dumps({
"success": False
}).encode("iso-8859-1"))
if __name__ == "__main__":
web_server = HTTPServer(("0.0.0.0", 8080), Server)
print("Server started at {0}:{1}".format("0.0.0.0", 8080))
try:
web_server.serve_forever()
except KeyboardInterrupt:
pass
web_server.server_close()
print("Server stopped.")
resultset
always come back with a result of 0?
import sqlalchemy as sa
engine = sa.create_engine("mysql+pymysql://scott:tiger@127.0.0.1:3307/mydb")
table_name = "moonman239"
with engine.begin() as conn:
conn.exec_driver_sql(f"DROP TABLE IF EXISTS {table_name}")
conn.exec_driver_sql(f"CREATE TABLE {table_name} (token MEDIUMBLOB)")
conn.exec_driver_sql(
f"INSERT INTO {table_name} (token) VALUES (X'476F7264')"
)
tbl = sa.Table(table_name, sa.MetaData(), autoload_with=engine)
engine.echo = True
with engine.begin() as conn:
result = conn.execute(sa.select(tbl).where(tbl.c.token == b"\x47\x6f\x72\x64")).one()
print(result) # (b'Gord',)
"""SQL emitted:
SELECT moonman239.token
FROM moonman239
WHERE moonman239.token = %(token_1)s
[generated in 0.00077s] {'token_1': b'Gord'}
"""