我目前正在分析维基百科转储文件; 我正在使用python从中提取一堆数据并将其持久化到PostgreSQL数据库中.我总是试图让这个文件变得更快(18GB).为了与PostgreSQL接口,我使用的是psycopg2,但是这个模块似乎模仿了许多其他类似的DBAPI.
无论如何,我有一个关于cursor.executemany(命令,值)的问题; 在我看来,每1000个值左右执行一次executemany比为这500万个值中的每一个调用cursor.execute(命令%值)更好(请确认或纠正我!).
但是,你看,我正在使用executemany将1000行插入到具有UNIQUE完整性约束的表中; 这个约束事先没有在python中验证过,因为这要么一直要求SELECT(这似乎适得其反)或要求我获得超过3 GB的RAM.所有这一切都说,当我的脚本试图通过捕获psycopg2.DatabaseError来插入已存在的行时,我指望Postgres警告我.
当我的脚本检测到这样的非UNIQUE INSERT时,它的connection.rollback()(每次都会产生1000行,并且使得executemany变得毫无价值),然后逐个INSERT所有值.
由于psycopg2的记录很少(因为有很多很棒的模块......),我找不到一个有效且有效的解决方法.我已经将每个executemany INSERTed的值从1000减少到100,以减少每个executemany非UNIQUE INSERT的可能性,但我很确定他们只是告诉psycopg2忽略这些exece或告诉游标继续executemany.
基本上,这似乎是一种解决方案如此容易和流行的问题,我所能做的就是要求了解它.
再次感谢!
只需使用psql\copy命令将所有数据复制到临时表中,或使用psycopg cursor.copy_in()方法.然后:
insert into mytable select * from ( select distinct * from scratch ) uniq where not exists ( select 1 from mytable where mytable.mykey = uniq.mykey );
这将比任何插入组合更快地进行重复数据删除和运行.
-DG
我遇到了同样的问题,并在这里搜索了很多天来收集大量的提示,以形成一个完整的解决方案.即使问题已经过时,我希望这对其他人有用.
1)忘记删除索引/约束并稍后重新创建它们的好处,好处是边缘或更糟.
2)executemany比执行更好,因为它为你准备了prepare语句.您可以使用如下命令获得相同的结果,以获得300%的速度:
# To run only once: sqlCmd = """PREPARE myInsert (int, timestamp, real, text) AS INSERT INTO myBigTable (idNumber, date_obs, result, user) SELECT $1, $2, $3, $4 WHERE NOT EXISTS (SELECT 1 FROM myBigTable WHERE (idNumber, date_obs, user)=($1, $2, $4));""" curPG.execute(sqlCmd) cptInsert = 0 # To let you commit from time to time #... inside the big loop: curPG.execute("EXECUTE myInsert(%s,%s,%s,%s);", myNewRecord) allreadyExists = (curPG.rowcount < 1) if not allreadyExists: cptInsert += 1 if cptInsert % 10000 == 0: conPG.commit()
此伪表示例对(idNumber,date_obs,user)具有唯一约束.
3)最好的解决方案是使用COPY_FROM和TRIGGER来管理BEFORE INSERT中的唯一键.这给了我36倍的速度.我开始使用500记录/秒的普通插入.并且通过"复制",我获得了超过18,000条记录/秒.使用Psycopg2的Python示例代码:
ioResult = StringIO.StringIO() #To use a virtual file as a buffer cptInsert = 0 # To let you commit from time to time - Memory has limitations #... inside the big loop: print >> ioResult, "\t".join(map(str, myNewRecord)) cptInsert += 1 if cptInsert % 10000 == 0: ioResult = flushCopyBuffer(ioResult, curPG) #... after the loop: ioResult = flushCopyBuffer(ioResult, curPG) def flushCopyBuffer(bufferFile, cursorObj): bufferFile.seek(0) # Little detail where lures the deamon... cursorObj.copy_from(bufferFile, 'myBigTable', columns=('idNumber', 'date_obs', 'value', 'user')) cursorObj.connection.commit() bufferFile.close() bufferFile = StringIO.StringIO() return bufferFile
这就是Python部分.现在Postgresql触发器没有异常psycopg2.IntegrityError然后所有COPY命令的记录被拒绝:
CREATE OR REPLACE FUNCTION chk_exists() RETURNS trigger AS $BODY$ DECLARE curRec RECORD; BEGIN -- Check if record's key already exists or is empty (file's last line is) IF NEW.idNumber IS NULL THEN RETURN NULL; END IF; SELECT INTO curRec * FROM myBigTable WHERE (idNumber, date_obs, user) = (NEW.idNumber, NEW.date_obs, NEW.user); IF NOT FOUND THEN -- OK keep it RETURN NEW; ELSE RETURN NULL; -- Oups throw it or update the current record END IF; END; $BODY$ LANGUAGE plpgsql;
现在将此函数链接到表的触发器:
CREATE TRIGGER chk_exists_before_insert BEFORE INSERT ON myBigTable FOR EACH ROW EXECUTE PROCEDURE chk_exists();
这似乎是很多工作,但Postgresql是一个非常快速的野兽,因为它不必一遍又一遍地解释SQL.玩得开心.