Where communities thrive

  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
    Raz Cohen

    Hey all,
    I'm creating an async session using sessionmaker and for some reason my sessions to the db are not getting closed properly.
    This is the engine i'm passing to the sessionmaker :

    engine = create_async_engine(

    Also, i'm using the session inside a middleware with context manager so the session should be closed when returned :

    async def db_session_committer_middleware(request: Request, call_next):
        async with make_session_factory()() as session:
            request.state.async_session = session
            response = Response("Internal server error", status_code=500)
                response = await call_next(request)
            except Exception:
                await session.rollback()
                await session.commit()
            except Exception:
                get_logger().exception("Post-request commit failed:")
            return response

    As you can see the connection remain opened after the request has completed (In the screenshot below)

    Tried to follow this comment (https://github.com/sqlalchemy/sqlalchemy/issues/8145#issuecomment-1178689770), but couldn't resolve the issue.

    Steven Beeching
    I've created a basic version of my issue: pastebin If you run the document create twice (from a clean db) the second time it adds the counterparty it tries to insert rather than update.
    Alex Grönholm
    @RazcoDev I think you're being confused with sessions vs connections
    sqlalchemy pools connections by default, so unless you're using NullPool, those connections are just released to the pool rather than closed when the sessions are closed
    also, why are you creating multiple session factories?
    Hello there
    Hey there, a few months ago I was asking about poor performance generating a complex query. I still haven't been able to reproduce it outside my actual codebase. It involves a lot of different entities and recursive ctes. It's very very very strange as session.execute(query) takes 2 seconds to build the query (before execution), while str(query) takes 200ms.
    I've taken a look at the profiler again

    And I'm back at the original issue

       ncalls  tottime  percall  cumtime  percall filename:lineno(function)
       416141    0.027    0.000    0.027    0.000 annotation.py:216(__hash__)
       301121    0.036    0.000    0.036    0.000 {built-in method builtins.isinstance}
       199574    0.008    0.000    0.008    0.000 {built-in method builtins.id}
     185614/1    0.354    0.000    0.501    0.501 traversals.py:159(_gen_cache_key)

    The problem is a massive allocation to _gen_cache_key. It's absolutely huge. One call, then nearly 200'000 recursive calls to the CacheKey named tuple. It's 0.5s of python.

    The AST is quite large, but I don't have close 200'000 elements in my query, so I'm a little confused as to why this got so big.

    I think last time I took a look at this, I tried to introspect the cache key and I'd end up crashing my interpreter trying to print its contents.
    What does that cardinality of calls to _gen_cache_key depend on?
    This isn't a major issue for me anymore since I managed to sidestep it with lambda statements. However I don't want to generate cache entries AoT if I can get the initial generation faster. So I thought I'd revisit this issue.
    Is there a way I can break down a lambda statement into sub lambda statements easily?
    I know there's an example with simple where clauses
    Just wondering if it's possible with CTEs
    Is it possible to do a union_all with lambdas?

    If I try this

    result = []

        for type_, table in self._table_map.items():
            cols = list(
                itertools.chain(*[self.alias_to_cols(e) for e in alias_set.query_entites()])
            query = (
                    *self.base_trade_cols(table, type_),
                    alias_set.canonical_version.internal_id == table.c.internal_id
                    alias_set.common_metadata.internal_id == table.c.internal_id
            expires_into = aliased(ExpiresInto, name="expires_into_upper")
            query = add_metadata_for_common_metadata(
                query, alias_set
            if limit:
                query = query.limit(limit)
                lambda_stmt(lambda: PatchedSelectStatementGrouping(query))
        return lambda_stmt(lambda: union_all(*result).subquery('pjoin'))


    ArgumentError: SELECT construct for inclusion in a UNION or other set construct expected, got StatementLambdaElement(<code object <lambda> at 0x10889bc90, file "/Users/rollokonig-brock/Developer/services/base/polymorphic_generator.py", line 198>).
    Do I have to reduce over the result list to get that effect?
    Alex Grönholm

    This part looks overly complicated:

            cols = list(
                itertools.chain(*[self.alias_to_cols(e) for e in alias_set.query_entites()])

    Why not just do cols = [col for e in alias_set.query_entites() for col in self.alias_to_cols(e)]?

    Yeah that works but is functionally equivalent code
    I don't like nested fors in expressions, it's harder to delineate between loops. But 🤷‍♂️
    Federico Caselli
    @sonthonaxrk it's unlikely you will get support here for such a complex issue. please update the existing issue or open a new issue is that has been removed (we can reopen if it was closed).
    I've added an issue
    Minh Dao
    Hi. I'm having a task to fetch all data from unknown database. For now, I'm focusing on popular dialects: postgresql, mysql, sqlserver. I'm using SQLAlchemy to execute raw SELECT queries. When there are lots of rows, I don't know if it's lazy loading when I iterate and extract the result set. For e.g:
    with engine.begin() as connection:
        result = connection.execute('select * from "table_1"')
        if result:
            for row in result:
    Safa Alfulaij

    Hi all.

    I am trying to use the dataclass-way of writing my table classes.
    I am stuck in writing a mixin that uses ForeignKey


    class BaseMixin:
        created_by_id: uuid.UUID = dataclasses.field(
            init=False, metadata={"sa": lambda: Column(ForeignKey("user.id"))}
        created_by: "User" = dataclasses.field(
            metadata={"sa": lambda: relationship("User", foreign_keys=[...])},
        updated_by_id: uuid.UUID = dataclasses.field(
            init=False, metadata={"sa": lambda: Column(ForeignKey("user.id"))}
        updated_by: "User" = dataclasses.field(
            metadata={"sa": lambda: relationship("User", foreign_keys=[...])},

    How to access the _id fields in foreign_keys?
    Using normal as_declarative, I have access to the cls in the class method

        def created_by(cls):
            return relationship(
    3 replies

    Hello there - I am having some problem to connect to multiple hosts-based PostgreSQL.

    1. I followed the documentation and suggestion here - https://docs.sqlalchemy.org/en/14/dialects/postgresql.html#specifying-multiple-fallback-hosts
    2. When I used the multiple hosts option as provided, it gives me this error - psycopg2.OperationalError: could not translate host name to address using psycopg2

    I used the potgresql parameter target_session_attrs=primary so that it finds the primary node for writing and reading, otherwise cycle through the list of available hosts. However, I am getting the same error.

    Could anyone help?

    I'm trying to perform an upsert to MySQL using on_duplicate_key_update... how do I do this for bulk data? Does it accept bulk datasets in the call or do I need to manually loop through everything in the dataset?
    Dear community, could you please help me to clarify one thing... Is it possible to load the entire database from YAML using this SQLAthanor lib? I'd been through the documentation but I'm staying confused with it. Looks like this must be a basic feature, but there're only the possibilities like "load a model from..." or "update a model from...". Maybe they just a have a lack of documentation or something...
    Is it possible to build an undirected, many-to-many relationship with only one table and an association table? Like a UserModel where two User can be friends; if User A is a friend of User B, User B has a friend in User A as well. I can only do it directed - like follower/following - but not undirected as of now... for more info, here's the full issue: https://stackoverflow.com/questions/73099995/undirected-many-to-many-relationship-with-one-table
    Federico Caselli
    Not sure. I suggest you open a discussion
    Gotcha, thanks! :)
    Montana Burr, Software Engineer

    So, I have a bit of an interesting problem. For reasons I won't get into here, I want to store binary data in a database. When a client sends me a value, my program should check whether that value, encoded in binary data, is stored in the database.

    Here is somewhat of an MCVE:

    from http.server import BaseHTTPRequestHandler, HTTPServer
    import json
    from sqlalchemy import BINARY, Column, Date, ForeignKey, LargeBinary, create_engine, func, select
    import sqlalchemy
    from sqlalchemy.engine.base import Engine
    from sqlalchemy.orm import declarative_base
    from sqlalchemy import Column, String, Integer, DateTime
    from sqlalchemy.orm import relationship, Session
    from sqlalchemy import insert
    Base = declarative_base()
    engine = create_engine(
    class Token(Base):
        __tablename__ = "Tokens"
        token = Column(BINARY(64), primary_key=True, nullable=False)
    with Session(bind=engine) as session:
        # add token
    class Server(BaseHTTPRequestHandler):
        def do_POST(self):
            input_length = int(self.headers.get("Content-Length"))
            input = self.rfile.read(input_length)
            input_json = json.loads(input)
            # check if passed token exists in table
            resultset = engine.execute(
                    Token.token == input_json["token"].encode("utf8")))
            result_count = resultset.first()
            if (result_count[0] > 0):
                # found the token
                self.send_header("Content-Type", "application/json")
                        "success": True
                # token not found
                self.send_header("Content-Type", "application/json")
                        "success": False
    if __name__ == "__main__":
        web_server = HTTPServer(("", 8080), Server)
        print("Server started at {0}:{1}".format("", 8080))
        except KeyboardInterrupt:
        print("Server stopped.")
    Oh, I forgot the problem:
    When the client passes "$2b$2t.e" to the server, the server doesn't see that the value is in the database.
    Alex Grönholm
    @moonman239_twitter where are you saving the new token in the database?
    I see you do that in one place at the beginning – I suppose you only save a static token in the example?
    so does resultset always come back with a result of 0?
    Montana Burr, Software Engineer
    @agronholm yeah, only a static token in the example. It would seem that SQLAlchemy has a hard time finding binary values in databases; I have a more complex program that uses LargeBinaries and I get the same problem.
    Alex Grönholm
    sqlalchemy is hardly the issue there
    are you sure the problem isn't that you're using a fixed length binary field but only using a fraction of that space?
    I've never used those column types myself
    Montana Burr, Software Engineer
    @agronholm maybe?
    Alex Grönholm
    have you tried finding the token with a raw SQL query?
    Montana Burr, Software Engineer
    So far, the only solution that I know of is to store the token as TEXT instead of BINARY.
    Gord Thompson
    @moonman239_twitter re: "It would seem that SQLAlchemy has a hard time finding binary values in databases" — Not as such:
    import sqlalchemy as sa
    engine = sa.create_engine("mysql+pymysql://scott:tiger@")
    table_name = "moonman239"
    with engine.begin() as conn:
        conn.exec_driver_sql(f"DROP TABLE IF EXISTS {table_name}")
        conn.exec_driver_sql(f"CREATE TABLE {table_name} (token MEDIUMBLOB)")
            f"INSERT INTO {table_name} (token) VALUES (X'476F7264')"
    tbl = sa.Table(table_name, sa.MetaData(), autoload_with=engine)
    engine.echo = True
    with engine.begin() as conn:
        result = conn.execute(sa.select(tbl).where(tbl.c.token == b"\x47\x6f\x72\x64")).one()
        print(result)  # (b'Gord',)
        """SQL emitted:
        SELECT moonman239.token 
        FROM moonman239 
        WHERE moonman239.token = %(token_1)s
        [generated in 0.00077s] {'token_1': b'Gord'}