Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
    Balu krishnan
    @balucdlm_gitlab
    Is there any way to get the list of sequences defined in a metadata object? (like tables(meta_obj.tables) and column(meta_obj.table.c))
    Found a innternally using _sequence variable which is of type dict. Is there any better way to get the sequence list?
    Federico Caselli
    @CaselIT
    I'm not sure. If you like you could open an issue to see if a formal api could be provided
    Balu krishnan
    @balucdlm_gitlab
    ok
    BTall
    @btall
    Hi everyone,
    I would like to know if there is a way to retrieve the current session in the context of an ORM Event: after_update.
    Thank you for your time.
    Alex Grönholm
    @agronholm
    @btall object_session(target)
    scalaenthu
    @scalaenthu

    I have two schemas representing one to many relationship:

    class Parent(base):
        __tablename__ = "a"
        __table_args__ = {"schema": "parent"}
        id = Column("id", String, primary_key=True)
        child = relationship("Child", back_populates="parent")
    
    
    class Child(base):
        __tablename__ = "a"
        __table_args__ = {"schema": "child"}
        id = Column("a_id", Integer, primary_key=True)
        sid = Column(String, ForeignKey("a.parent.id"))
        parent = relationship(Parent, back_populates="child")
        type = Column(String)

    I want to fetch the rows of child using parent that have type = "abc"

        def get_data(self, type):
            return self.session.query(Parent.child).filter_by(
                Parent.child.any(Child.type == type)).all()

    it give me (True, )

    when i do:

      def get_data(self, type):
            return self.session.query(Parent.child).filter_by(
                type = type)).all()

    Entity '<class 'dependencies.a.Parent'>' has no property 'type'

    scalaenthu
    @scalaenthu

    When i do:

           return self.session.query(Parent).filter(
                Parent.child.any(Child.type == "abc")).all()

    It returns rows which have type other than abc as well

    Gord Thompson
    @gordthompson
    @scalaenthu - ForeignKey("a.parent.id") is potentially ambiguous depending on what the default schema is. Use ForeignKey(Parent.id)) instead.
    TofuTogether
    @TofuTogether_twitter

    Very new to sqlalcehmy, so might be a basic question. I want to add a column to a query. For instance if I want to add "5 as hi". But whenever I try and add a column here it seems like the query returns nothing. If i remove .add_columns("5 as hi") then it starts working.

    result = (
    db.query(Feed)
    .filter(User.status == 1)
    .add_columns("5 as hi")
    .offset(skip)
    .limit(limit)
    .all()
    )
    for row in result:
    print("id: ", row.id)

    Gord Thompson
    @gordthompson
    @TofuTogether_twitter - Try db.query(Feed, literal_column("5").label("hi"))
    TofuTogether
    @TofuTogether_twitter
    @gordthompson Thank you, I'm still getting "print("id: ", row.id)
    AttributeError: 'result' object has no attribute 'id'". It works if I don't try and add a literal_column or use add_columns
    Gord Thompson
    @gordthompson
    @TofuTogether_twitter - Try row.Feed.id
    TofuTogether
    @TofuTogether_twitter
    Thanks that worked :)
    TofuTogether
    @TofuTogether_twitter

    @gordthompson Thanks again, Is there a way I can make I can make it like row.hi and row.id?

    Rather than having to do row.Feed.id and row.hi inside my loop? I was hoping 'hi' would be a part of Feed

    djangoReactGuy
    @SanskarSans
    I am new to sqlalchemy. Can anyone help me to use such filtering in sqlalchemy, please?
    def _prepare_filter_expression(
        field_name: str,
        index: int,
        cursor: List[str],
        sorting_fields: List[str],
        sorting_direction: str,
    ) -> Tuple[Q, Dict[str, Union[str, bool]]]:
    
        field_expression: Dict[str, Union[str, bool]] = {}
        extra_expression = Q()
        for cursor_id, cursor_value in enumerate(cursor[:index]):
            field_expression[sorting_fields[cursor_id]] = cursor_value
    
        if sorting_direction == "gt":
            extra_expression |= Q(**{f"{field_name}__{sorting_direction}": cursor[index]})
            extra_expression |= Q(**{f"{field_name}__isnull": True})
        elif cursor[index] is not None:
            field_expression[f"{field_name}__{sorting_direction}"] = cursor[index]
        else:
            field_expression[f"{field_name}__isnull"] = False
    
        return extra_expression, field_expression
    
    
    def _prepare_filter(
        cursor: List[str], sorting_fields: List[str], sorting_direction: str
    ) -> Q:
        """Create filter arguments based on sorting fields.
    
        :param cursor: list of values that are passed from page_info, used for filtering.
        :param sorting_fields: list of fields that were used for sorting.
        :param sorting_direction: keyword direction ('lt', gt').
        :return: Q() in following format
            (OR: ('first_field__gt', 'first_value_form_cursor'),
                (AND: ('second_field__gt', 'second_value_form_cursor'),
                    ('first_field', 'first_value_form_cursor')),
                (AND: ('third_field__gt', 'third_value_form_cursor'),
                    ('second_field', 'second_value_form_cursor'),
                    ('first_field', 'first_value_form_cursor'))
            )
        """
        filter_kwargs = Q()
        for index, field_name in enumerate(sorting_fields):
            if cursor[index] is None and sorting_direction == "gt":
                continue
    
            extra_expression, field_expression = _prepare_filter_expression(
                field_name, index, cursor, sorting_fields, sorting_direction
            )
            filter_kwargs |= Q(extra_expression, **field_expression)
    
        return filter_kwargs
    Gord Thompson
    @gordthompson

    @gordthompson Thanks again, Is there a way I can make I can make it like row.hi and row.id?

    @TofuTogether_twitter - You can always select individual columns – i.e., attributes of the ORM object – instead of selecting the entire ORM object itself.

    eg-b
    @eg-b
    Hello. My problem is that if the name of a table or a column begins with an underscore and is written in lower case, then alchemy gives an invalid identifier database error in the case of Oracle. Previously, I wrote all table names in uppercase, there were no problems, but now Postgre support has been added and it has problems with uppercase. How can i solve this?
    Gord Thompson
    @gordthompson
    @eg-b - If you can create a minimal reproducible example showing how name normalization fails for table/column names that begin with an underscore please create a GitHub issue to report it.
    Vinit Shah
    @vinitshah-consilience

    Hi, I've posted the question on StackOverflow some time ago, but haven't received feedback yet, was hoping for more like here.

    I'm getting an error that looks like:

    sqlalchemy.exc.InvalidRequestError: Can't attach instance <ObjectType at 0x10592fe50>; another instance with key (
    <class '__main__.ObjectType'>, (1,), None) is already present in this session.

    when I try instantiating a new object that contains a relationship.

    I have an example below that reproduces the error.

    import sqlalchemy as sa
    from sqlalchemy import create_engine, inspect, Column, Integer, String, DateTime, ForeignKey
    from sqlalchemy.orm import sessionmaker, relationship, aliased, contains_eager, scoped_session
    from sqlalchemy.ext.declarative import declarative_base
    
    Base = declarative_base()
    
    class ObjectType(Base):
        __tablename__ = "objecttypes"
        id=Column(Integer, primary_key=True)
        name=Column(String)
    
        def __repr__(self):
            return "ObjectType({}, id={})".format(self.name, self.id)
    
    engine = create_engine('sqlite:///:memory:', echo=False)
    Base.metadata.create_all(engine) # We'll call this again later
    sessionFactory = sessionmaker(bind=engine, expire_on_commit=False)
    scopedSessionFactory = scoped_session(sessionFactory)
    
    def startScope():
        return scopedSessionFactory()
    
    def endScope():
        scopedSessionFactory().close()
        scopedSessionFactory.remove()
        return
    
    def addObjectTypes():
        """
        Add in all the object types to the db without using a session
        """
        values = ["('location')", "('locationinput')", "('gate')"]
        q =  """INSERT INTO objecttypes (name) VALUES {}""".format(",".join(values))
        engine.execute(q)
        return
    
    def buildObjectTypes():
        addObjectTypes()
        session = startScope()
        types = session.query(ObjectType).all()
        endScope()
        return dict([(objType.name, objType) for objType in types])
    
    # Holds all the types
    typeDict = buildObjectTypes()
    
    class Thing(Base):
        __tablename__ = "things"
        id=Column(Integer, primary_key=True)
        name=Column(String)
        object_type_id=Column(Integer, ForeignKey('objecttypes.id'))
        objectType=relationship(ObjectType)
        version=Column(Integer, nullable=False)
        timeCreated=Column(DateTime)
    
        __mapper_args__ = {
            'version_id_col': version,
            'polymorphic_on':object_type_id,
            'with_polymorphic':'*',
            'polymorphic_load':'inline',
        }
    
        def __repr__(self):
            return "{}, id={}, type={}, version={}".format(self.name, self.id, self.objectType, self.version)
    
    
    class Location(Thing):
        __tablename__ = "locations"
        id=Column(Integer, ForeignKey('things.id'),primary_key=True)
        __mapper_args__ = {
            'polymorphic_identity': typeDict['location'].id
        }
    
    class LocationInput(Thing):
        __tablename__ = "locationinputs"
        id=Column(Integer, ForeignKey('things.id'),primary_key=True)
        previousGateId=Column(Integer, ForeignKey('gates.id'))
        __mapper_args__ = {
            'polymorphic_identity': typeDict['locationinput'].id
        }
    
    class Gate(Thing):
        __tablename__ = "gates"
        id=Column(Integer, ForeignKey('things.id'),primary_key=True)
    
        originId=Column(Integer, ForeignKey('locations.id'))
        origin=relationship(Location, foreign_keys=[originId], backref="originGates")
    
        originInputId=Column(Integer, ForeignKey('locationinputs.id'))
        originInput=relationship(LocationInput, foreign_keys=[originInputId], backref="originInputGates")
    
        destinationId=Column(Integer, ForeignKey('locations.id'))
        destination=relationship(Location, foreign_keys=[destinationId], backref="destinationGates")
    
        destinationInputId=Column(Integer, ForeignKey('locationinputs.id'))
        destinationInput=relationship(LocationInput, foreign_keys=[destinationInputId], backref="destinationInputGates")
    
        __mapper_args__ = {
            'polymorphic_identity': typeDict['gate'].id
        }
    
    LocationInput.previousGate = relationship(Gate, foreign_keys=[LocationInput.previousGateId])
    Base.metadata.create_all(engine)
    
    def write(obj):
        session = scopedSessionFactory()
        return session.merge(obj)
    TofuTogether
    @TofuTogether_twitter
    how do I add a simple column to my query that evaluates to true or false like db.query(user).add_columns((user.name == "tim").label("is_tim")).all()
    TofuTogether
    @TofuTogether_twitter
    i can do it with a hybrid_method, if this is just an efficient I guess it is fine. Any thoughts on the efficiency of this? Where name being passed is 'tim', probably could do this with a hybrid_property instead but more just for the idea
        @hybrid_method
        def is_tim(self, name):
            return self.name == name
    Ravyouli
    @ravyouli
    Good Morning! Does anyone know if using SQLAlchemy with SQL Server in asynchronous is stable? I just want to perform select statements on a SQL Server 2019 engine and today it's used on FastAPI but we see our workers busy and users getting timeouts so we'd like to switch to asynchronous DB calls. thanks for the hints ;-) I found this but I am not clear on whether SQL server for SQL alchemy in async mode is new/stable or has been out there for a long time and known to be robust: https://docs.sqlalchemy.org/en/14/orm/extensions/asyncio.html
    Federico Caselli
    @CaselIT
    I don't know if sql server has an async dbapi. currently only asyncpg for postgress is supported
    Daniel Fortunov
    @asqui

    So I'm getting a mysterious TypeError: '<' not supported between instances of 'NoneType' and 'tuple' when my sqlalchemy dielect is trying to check self.server_version_info and finding it is unexpectedly None.

    The root cause appears to be that server_version_info is initialised via a listener to the 'first_connect' event, but this is never firing because my engine ends up with _has_events == False.

    I'm creating my engine in a fairly standard way:
    create_engine('postgresql://', pool=pool, echo=echo)
    The only special thing is that I'm using a NullPool as this is used in unit testing.

    Why would this engine end up with events disabled, and is that even a supported scenario, given that it breaks dialects that depend on being initialised via the 'first_connect' event firing?

    I'm not an expert on sqlalchemy internals so have no concept of what's "right" and "wrong" in all this complexity, so would appreciate some direction from someone with more knowledge.
    Daniel Fortunov
    @asqui
    Actually, I think I might be barking up the wrong tree with engine._has_events -- I suspect this is not doing what I think it is, because I have another "working" example where the engine._has_events == False yet the 'first_connect' event is still getting dispatched and initialising the dialect correctly.
    Daniel Fortunov
    @asqui

    Ok getting warmer... (maybe)
    In the case that fails we are binding the session to a specific connection (for unit testing) like so:
    Session(bind=engine.connect(connection=_ConnectionFairy(cxn, None, echo)))
    This seems hacky and I suspect is resulting in the problems -- binding the session to a connection prevents firing of the 'first_connect' event because the engine is not establishing a connection, it's already been given one!

    So what is the "right" way to construct a session and hand it a single connection to use? (The context is that we are in a unit test and want everything to execute on a single dbapi connection, which is constructed before sqlalchemy comes into play, and may be used to initialise appropriate database state for the test.)

    Federico Caselli
    @CaselIT
    not sure you need it. but can't you just pass a custom creator to create_engine? See https://docs.sqlalchemy.org/en/13/core/engines.html#sqlalchemy.create_engine.params.creator
    Daniel Fortunov
    @asqui

    YES! Ok, that seems a lot more sensible and brings in more of the sqlalchemy engine machinery, including dialect initialisation. However, so because the default state of DB API connections is that they are in autocommit off mode, you have the nasty side-effect that using the connection to execute any command starts a transaction for you.

    So to counteract this, the DefaultDialect.initialize() method ends with self.do_rollback(connection.connection). So the good news is that the dialect is getting initialised, the bad news is that is trying to rollback a connection that I don't want it to rollback (because it already has a transaction in progress, with test setup pre-constructed in that transaction).

    What is the lifetime of a dialect? It is owned by the engine right? Is there any way to initialise the dialect on a separate connection first, and then bring in my creator that always returns the same connection?
    Daniel Fortunov
    @asqui

    I managed to achieve this by creating an engine and ensuring it has connected by calling engine.connect() first, and then doing my Session(bind=engine.connect(connection=_ConnectionFairy(cxn, None, echo))) as before.

    Not ideal, but I can't see a way to initialise the dialect on cxn without interfering with a transaction that may already be in progress on that connection.

    Federico Caselli
    @CaselIT
    I'm still not sure why you need to use a single connection
    5 replies
    Charles Ross
    @chivalry

    This isn't a question about SQLAlchemy, but a question for those use use SQLAlchemy and also develop with Node. What ORM do you use there?

    Last year I built an web app using Flask and SQLAlchemy was great. I loved that a table was mapped with a single class and the entire definition of the model was within the class. I've been looking for something similar in a project where I must use Node, and haven't found anything. The closest, I think, was TypeORM, but most of its documentation assumes the use of TypeScript, although it can work with vanilla JavaScript. I've also looked at Sequelize and Objection, but both seem to prevent the entire table from being defined in the class. Sequelize requires a call to Class.init and Objection uses Knex as a completely separate migration support for defining the schema.

    I realize I may be out of luck. Perhaps what I want doesn't exist. But if anyone has a suggestion, I'd much like to hear it.

    Sheesh Mohsin
    @sheeshmohsin

    In Postgres 11, when I'm trying to insert in table which is mapped using automap_base functionality of sqlalchemy, It's not able to insert , I'm getting this error - sqlalchemy.orm.exc.FlushError: Instance has a NULL identity key. If this is an auto-generated value, check that the database table allows generation of new primary key values, and that the mapped Column object is configured to expect these generated values. Ensure also that this flush() is not occurring at an inappropriate time, such as within a load() event.

    As per my research across internet and different thread and pull requests of sqlalchemy, this is happening because of newly introduced DEFAULT BY IDENTITY feature in postgres. Any ideas how can I workaround this? I'm stuck In this as I upgraded database from 9.5 to 11 and now I'm facing this issue in code, any help or hint would be helpful for me.

    Alex Grönholm
    @agronholm
    @sheeshmohsin you can always just use sqlacodegen to generate models instead of using automap
    Padam Sethia
    @highoncarbs
    I'm running into sqlalchemy.exc.InvalidRequestError: This session is in 'committed' state error . I am using Mixin classes to index and search SQLAlchemy models using elasticsearch . Now I have two separate Mixin classes for two of my models , and each have a db event listen , before_commit & after_commit . The problem now is if changes are being made to one model , the event listener of the other model also triggers and throws the above error. Any solution here?
    # SearchableMixin
    db.event.listen(db.session, 'before_commit', SearchableMixin.before_commit)
    db.event.listen(db.session, 'after_commit', SearchableMixin.after_commit)
    
    #BasicSearchableMixin
    db.event.listen(db.session, 'before_commit', BasicSearchableMixin.bas_before_commit)
    db.event.listen(db.session, 'after_commit', BasicSearchableMixin.bas_after_commit)
    BTall
    @btall
    Hello everyone,
    One of you will be able to tell how to call the MySQL encrypt() function for a field in the model ?
    1 reply
    Sheesh Mohsin
    @sheeshmohsin
    @agronholm Thanks Alex
    Balu krishnan
    @balucdlm_gitlab

    Is there any way to indicate a table to select from and not select any column from other table even if the select param contains other table columns?

    I had tried this query but didn’t get worked

    query = select([A.c.id, B.c.id]).select_from(A)
    print(query)
    
    # SELECT “A”.id, “B”.id FROM A, B
    Federico Caselli
    @CaselIT
    SELECT "A".id, "B".id FROM A does not seem correct sql. What are you trying to do? I think can use literal_column to render "B".id
    Yang Bo
    @Bryant-Yang
    hi, is there any plan to make composite init with kwargs, by now the composite_values method return a tuple, and the composite class can't use kwargs to init
    Balu krishnan
    @balucdlm_gitlab

    @CaselIT

    mdm=# select A.id, B.id from A; 
    ERROR:  missing FROM-clause entry for table “B”

    Like this

    query = select([A.c.id, B.c.id]).select_from(A)
    result= query.execute()

    not making any error instead it executing a query

    SELECT “A”.id, “B”.id FROM A, B

    I need to raise that error when ever the select parameters are not from the from table or join table

    Federico Caselli
    @CaselIT
    I think you can check if there are multiple from clauses
    I'm not sure how much it's in the documentation, but I think you can check this sqlalchemy/sqlalchemy#4737 and related pr / commits
    Balu krishnan
    @balucdlm_gitlab
    Thanks. It’s helpful.