当前位置:  开发笔记 > 编程语言 > 正文

Python-PostgreSQL psycopg2接口 - > executemany

如何解决《Python-PostgreSQLpsycopg2接口->executemany》经验,为你挑选了2个好方法。

我目前正在分析维基百科转储文件; 我正在使用python从中提取一堆数据并将其持久化到PostgreSQL数据库中.我总是试图让这个文件变得更快(18GB).为了与PostgreSQL接口,我使用的是psycopg2,但是这个模块似乎模仿了许多其他类似的DBAPI.

无论如何,我有一个关于cursor.executemany(命令,值)的问题; 在我看来,每1000个值左右执行一次executemany比为这500万个值中的每一个调用cursor.execute(命令%值)更好(请确认或纠正我!).

但是,你看,我正在使用executemany将1000行插入到具有UNIQUE完整性约束的表中; 这个约束事先没有在python中验证过,因为这要么一直要求SELECT(这似乎适得其反)或要求我获得超过3 GB的RAM.所有这一切都说,当我的脚本试图通过捕获psycopg2.DatabaseError来插入已存在的行时,我指望Postgres警告我.

当我的脚本检测到这样的非UNIQUE INSERT时,它的connection.rollback()(每次都会产生1000行,并且使得executemany变得毫无价值),然后逐个INSERT所有值.

由于psycopg2的记录很少(因为有很多很棒的模块......),我找不到一个有效且有效的解决方法.我已经将每个executemany INSERTed的值从1000减少到100,以减少每个executemany非UNIQUE INSERT的可能性,但我很确定他们只是告诉psycopg2忽略这些exece或告诉游标继续executemany.

基本上,这似乎是一种解决方案如此容易和流行的问题,我所能做的就是要求了解它.

再次感谢!



1> 小智..:

只需使用psql\copy命令将所有数据复制到临时表中,或使用psycopg cursor.copy_in()方法.然后:

insert into mytable
select * from (
    select distinct * 
    from scratch
) uniq
where not exists (
    select 1 
    from mytable 
    where mytable.mykey = uniq.mykey
);

这将比任何插入组合更快地进行重复数据删除和运行.

-DG



2> Le Droid..:

我遇到了同样的问题,并在这里搜索了很多天来收集大量的提示,以形成一个完整的解决方案.即使问题已经过时,我希望这对其他人有用.

1)忘记删除索引/约束并稍后重新创建它们的好处,好处是边缘或更糟.

2)executemany比执行更好,因为它为你准备了prepare语句.您可以使用如下命令获得相同的结果,以获得300%的速度:

# To run only once:
sqlCmd = """PREPARE myInsert (int, timestamp, real, text) AS
   INSERT INTO myBigTable (idNumber, date_obs, result, user)
     SELECT $1, $2, $3, $4 WHERE NOT EXISTS
     (SELECT 1 FROM myBigTable WHERE (idNumber, date_obs, user)=($1, $2, $4));"""
curPG.execute(sqlCmd)
cptInsert = 0   # To let you commit from time to time

#... inside the big loop:
curPG.execute("EXECUTE myInsert(%s,%s,%s,%s);", myNewRecord)
allreadyExists = (curPG.rowcount < 1)
if not allreadyExists:
   cptInsert += 1
   if cptInsert % 10000 == 0:
      conPG.commit()

此伪表示例对(idNumber,date_obs,user)具有唯一约束.

3)最好的解决方案是使用COPY_FROM和TRIGGER来管理BEFORE INSERT中的唯一键.这给了我36倍的速度.我开始使用500记录/秒的普通插入.并且通过"复制",我获得了超过18,000条记录/秒.使用Psycopg2的Python示例代码:

ioResult = StringIO.StringIO() #To use a virtual file as a buffer
cptInsert = 0 # To let you commit from time to time - Memory has limitations
#... inside the big loop:
   print >> ioResult, "\t".join(map(str, myNewRecord))
   cptInsert += 1
   if cptInsert % 10000 == 0:
      ioResult = flushCopyBuffer(ioResult, curPG)
#... after the loop:
ioResult = flushCopyBuffer(ioResult, curPG)

def flushCopyBuffer(bufferFile, cursorObj):
   bufferFile.seek(0)   # Little detail where lures the deamon...
   cursorObj.copy_from(bufferFile, 'myBigTable',
      columns=('idNumber', 'date_obs', 'value', 'user'))
   cursorObj.connection.commit()
   bufferFile.close()
   bufferFile = StringIO.StringIO()
   return bufferFile

这就是Python部分.现在Postgresql触发器没有异常psycopg2.IntegrityError然后所有COPY命令的记录被拒绝:

CREATE OR REPLACE FUNCTION chk_exists()
  RETURNS trigger AS $BODY$
DECLARE
    curRec RECORD;
BEGIN
   -- Check if record's key already exists or is empty (file's last line is)
   IF NEW.idNumber IS NULL THEN
      RETURN NULL;
   END IF;
   SELECT INTO curRec * FROM myBigTable
      WHERE (idNumber, date_obs, user) = (NEW.idNumber, NEW.date_obs, NEW.user);
   IF NOT FOUND THEN -- OK keep it
      RETURN NEW;
   ELSE    
      RETURN NULL; -- Oups throw it or update the current record
   END IF;
END;
$BODY$ LANGUAGE plpgsql;

现在将此函数链接到表的触发器:

CREATE TRIGGER chk_exists_before_insert
   BEFORE INSERT ON myBigTable FOR EACH ROW EXECUTE PROCEDURE chk_exists();

这似乎是很多工作,但Postgresql是一个非常快速的野兽,因为它不必一遍又一遍地解释SQL.玩得开心.

推荐阅读
重庆制造漫画社
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有