我想使用sqlalchemy核心使用postgresql 9.5添加的"新"功能进行upsert.虽然它已经实现,但我对语法很困惑,我无法适应我的需求.以下是我希望能够做的示例代码:
from sqlalchemy.ext.declarative import declarative_base Base = declarative_base() class User(Base): __tablename__ = 'test' a_id = Column('id',Integer, primary_key=True) a = Column("a",Integer) engine = create_engine('postgres://name:password@localhost/test') User().metadata.create_all(engine) meta = MetaData(engine) meta.reflect() table = Table('test', meta, autoload=True) conn = engine.connect() from sqlalchemy.dialects.postgresql import insert as psql_insert stmt = psql_insert(table).values({table.c['id']: bindparam('id'), table.c['a']: bindparam('a')}) stmt = stmt.on_conflict_do_update( index_elements=[table.c['id']], set_={'a': bindparam('a')}) list_of_dictionary = [{'id':1, 'a':1, }, {'id':2, 'a':2,}] conn.execute(stmt, list_of_dictionary)
我基本上想要插入大量的行,如果已经有一个id,我想用我初始想要插入的值来更新它.但sqlalchemy给我这个错误:
CompileError: bindparam() name 'a' is reserved for automatic usage in the VALUES or SET clause of this insert/update statement. Please use a name other than column name when using bindparam() with insert() or update() (for example, 'b_a').
虽然这是一个已知问题(请参阅https://groups.google.com/forum/#!topic/sqlalchemy/VwiUlF1cz_o),但我没有找到任何正确的答案,不需要修改list_of_dictionary的键或者列的名称.
我想知道是否有一种方法来构建stmt,以便具有一致的行为,而不依赖于变量list_of_dictionary的键是否是插入表的列的名称(我的代码在那些中没有错误地工作)例).
这对我有用:
from sqlalchemy import create_engine from sqlalchemy import MetaData, Table from sqlalchemy.dialects import postgresql from sqlalchemy.inspection import inspect def upsert(engine, schema, table_name, records=[]): metadata = MetaData(schema=schema) metadata.bind = engine table = Table(table_name, metadata, schema=schema, autoload=True) # get list of fields making up primary key primary_keys = [key.name for key in inspect(table).primary_key] # assemble base statement stmt = postgresql.insert(table).values(records) # define dict of non-primary keys for updating update_dict = { c.name: c for c in stmt.excluded if not c.primary_key } # cover case when all columns in table comprise a primary key # in which case, upsert is identical to 'on conflict do nothing. if update_dict == {}: warnings.warn('no updateable columns found for table') # we still wanna insert without errors insert_ignore(table_name, records) return None # assemble new statement with 'on conflict do update' clause update_stmt = stmt.on_conflict_do_update( index_elements=primary_keys, set_=update_dict, ) # execute with engine.connect() as conn: result = conn.execute(update_stmt) return result