Hello. I'm trying to use the FSM package
transitions with a class that Inherits from a
When I write my own
__init__() like so:
Base = declarative_base() class Instance(Base): instanceState = Column(String(255), nullable=False) def __init__(self, **kwargs): super().__init__(**kwargs) self.machine = Machine( model=self, states=Instance.states, initial=self.instanceState) self.machine.add_transitions(self.instance_transitions)
This works fine when I call:
inst = Instance(**kwargs)
Meaning all of the
transitions FSM functions and attrs are defined. But when I get an instance of
Instance using sqlalchemy's
__init__() method is not called, and thus the statemachine is not initialized.
Is there a method I can extend in my
Instance child class that will be called when
query().get() is called?
I am having such a terrible time with upserts on sqlite. It really should be the easiest thing, but I can't tell if it's the documentation or not.
Traditionally, you'd call the ORM with
MyObject(col1='a', col2='b'), then add to the session and commit. I found some StackOverflow that suggested that 'session.add' already dealt with this at a basic level (which would be fine for my use case, but this doesn't seem to be the case.
The upsert syntax in the docs requires you to create statement-level calls to run
on_conflict_do_update. But they can't be added to the session:
UnmappedInstanceError: Class 'sqlalchemy.dialects.sqlite.dml.Insert' is not mapped. I try to
execute() the statement directly, and I get an
UnboundExecutionError, despite having bound it in the sessionmaker before.
Can someone please provide a complete and no-nonsense example of an insert and subsequent upsert using a declarative Base? I've been up against a wall for the better part of a day and it really seems like it should be much easier than this. Thank you
on_conflict_do_update()call. But I still maintain this is way more complicated than it needs to be. There should be some way of telling the Declarative Base what the upsert rules are, so that one can simply call
session.add(MyModel(a=1, b=2)); session.commit()and have it upsert if needed for simple rulesets.
hello, i need some help.
I have two model now:
flask_sample.analysis.model.Analysis. There is a relationship between them:
on User model
analysis = db.relationship("flask_sample.analysis.model.Analysis", back_populates="user", lazy="dynamic")
on Analysis Model
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False) user = db.relationship("flask_sample.user.model.User", back_populates="analysis")
The code can create all table correctly, but when i use those model, it is always raises this error:
sqlalchemy.exc.InvalidRequestError: One or more mappers failed to initialize - can't proceed with initialization of other mappers. Triggering mapper: 'mapped class User->users'. Original exception was: entity
Have tried some ways on stackoverflow, but all doesn't work.
UPDATE TableA x SET name = 'New Guy' FROM TableA y WHERE x.id = y.id AND x.id = 3 RETURNING y.id AS old_id, y.name AS old_name, x.id, x.name;
Would it be strange if I say I have two sequential migrations. And after run
alembic upgrade the whole process hangs on the last one.
But when I manually run first and then the second migration - it works.
Am I right in understanding that consecutive migrations are in some way of transactions also?
Since when second one hangs and I SIGTERM the process, looks like first one wasn't committed either.
Looks really weird to me. Perhaps I miss something (postgresql)
Good morning! I have a SqlAlchemy problem that you guys might find interesting. The core of it is representing a sparse matrix of data using the ORM. Consider the following set up:
Base = declarative_base() class Row(Base): __tablename__ = "row" id = Column(Integer, index=True, primary_key=True) organization_id = Column(Integer, ForeignKey("organization.id"), nullable=False) class Column(Base): __tablename__ = "column" id = Column(Integer, index=True, primary_key=True) type = Column(String(length=32), nullable=False) name = Column(String(length=32), nullable=False) organization_id = Column(Integer, ForeignKey("organization.id"), nullable=False) class Element(Base): __tablename__ = "element" row_id = Column(Integer, ForeignKey("row.id"), nullable=False, primary_key=True) row = relationship("Row", backref="elements") column_id = Column(Integer, ForeignKey("column.id"), nullable=False, primary_key=True) column = relationship("Column", backref="elements") type = association_proxy("column", "type") name = association_proxy("column", "name") # data is non-nullable because, if data is null in the matrix, the element row should # just be deleted data = Column(String, nullable=False)
For a given Row, I'd like to get the non-sparse list of elements that compose that list. So far, I have been able to get the elements that are non-null as a relationship attribute using the following relationship:
RelatedElement = aliased( Element, select([Element]) .select_from( join( Row, Column, Row.organization_id == Column.organization_id, full=True, isouter=True ).join( Element, and_( Row.id == Element.row_id, Column.id == Element.column_id, ), full=True ) ).alias(), ) Row.elements = relationship( RelatedElement, primaryjoin=Row.id == RelatedElement.row_id, viewonly=True )
The purpose of the full outer join was to create a cross product between rows and columns. Unfortunately, when I test this relationship, I don't get the cross product. For example:
r = Row(organization_id=1) c1 = Column(type="integer", name="Count", organization_id=1) c2 = Column(type="string", name="Name", organization_id=1) db.add_all([r, c1, c2]) db.commit() for obj in [r, c1, c2]: db.refresh(obj) e = Element(row=r, column=c1, data="1") db.add(e) db.commit() db.refresh(e) db.refresh(r) # now, we'd like to get all the elements related to r: r.elements > [Element(data="1", column=c1)] # but what I really want is: r.elements > [Element(data="1", column=c1), Element(data=null, column=c2)]
I used the documentation here to get this far, but to no avail.
Any direction would be greatly appreciated!
What's the reason of using discriminator in joined table inheritance? This pattern itself creates another table which can be counted as type.
While a polymorphic discriminator expression is not strictly necessary, it is required if polymorphic loading is desired. Establishing a simple column on the base table is the easiest way to achieve this, however very sophisticated inheritance mappings may even configure a SQL expression such as a CASE statement as the polymorphic discriminator.
I see it's not necessary. But curious, why it can be useful at all, isn't it redundant?
There are a probably a lot of answers to that question, but the one that has always stood out to me: The polymorphic stuff lets SQLAlchemy create a system of table inheritance when the database doesn't natively support it.
People tend to implement non-normalized data models and anti-patterns with the polymorphic stuff though. It's common for someone to ask a question about fixing polymorphic inheritance, and the correct answer is changing their data model.
is there any solution for this situation
Let's consider 3 tables:
books American authors British authors
Each book has a foreign key to its author, which can either be in the American table, or the British one.
How can I implement such foreign key condition in SQLAlchemy?
I'd like to have a single column to handle the link.
Howdy - I'm having trouble getting a hybrid_property to work. I'm working with flask-sqlalchemy and I have two models DataExport and Team where a DataExport belongs to exactly one team.
I'm trying to define a hybrid_property that allows me to filter for "DataExport.is_enabled == True and Team.disabled == False".
@hybrid_property def enabled(self): return self.is_enabled == True and not self.team.disabled == False
It looks like it's picking up the first part and filters for is_enabled but it ignores the second and doesn't join in Team
Hello, I'm having problems integrating a query between two related tables. Station table has a bunch of metadata for each station and is unique and TimeCheck contains calls to the station and if they were successful. The tricky bit is that the stations have two intervals for data collection 10m and 60m. I'm trying to count the number of successful calls and then filter for a single interval and then group those based on the station id. This data will then be used to calculate a percentage of successful calls
q = session.query(TimeCheck.station_id, Station.interval, func.count(case([( TimeCheck.in_communication == True, 1)], else_=None))).filter(Station.interval == 10).group_by( TimeCheck.station_id).all()
The result is that the count func counts every single row if it contains true or not
httpcore.ConnectError: [SSL: WRONG_VERSION_NUMBER] wrong version number (_ssl.c:1091)