有没有办法在Postgresql中选择未锁定的行?我有一个多线程应用程序将执行:
Select... order by id desc limit 1 for update
在桌子上.
如果多个线程运行此查询,它们都会尝试拉回同一行.
一个获取行锁,另一个获取然后在第一个更新行后失败.我真正喜欢的是第二个线程获取匹配该WHERE
子句的第一行并且尚未锁定.
为了澄清,我希望每个线程在执行select之后立即更新第一个可用行.
因此,如果有行ID: 1,2,3,4
,第一个线程将进入,选择行ID=4
并立即更新它.
如果在该事务期间第二个线程出现,我希望它能获得行ID=3
并立即更新该行.
对于Share,也不会完成此操作,nowait
因为该WHERE
子句将与锁定的行匹配(ID=4 in my example)
.基本上我喜欢的是这个WHERE
条款中的"并没有锁定" .
Users ----------------------------------------- ID | Name | flags ----------------------------------------- 1 | bob | 0 2 | fred | 1 3 | tom | 0 4 | ed | 0
如果查询是" Select ID from users where flags = 0 order by ID desc limit 1
"并且当返回一行时,接下来就是" Update Users set flags = 1 where ID = 0
",那么我希望第一个线程用来获取行,ID 4
而下一个用于获取行ID 3
.
如果我将" For Update
" 附加到select,那么第一个线程获取该行,第二个线程阻塞然后返回任何内容,因为一旦第一个事务提交该WHERE
子句不再满足.
如果我不使用" For Update
"那么我需要在后续更新(WHERE flags = 0)上添加WHERE子句,因此只有一个线程可以更新该行.
第二个线程将选择与第一个线程相同的行,但第二个线程的更新将失败.
无论哪种方式,第二个线程都无法获得一行并进行更新,因为我无法让数据库将第4行提供给第一个线程,而第3个连接则将第3行提供给第二个线程.
此功能SELECT ... SKIP LOCKED
正在Postgres 9.5中实现.http://www.depesz.com/2014/10/10/waiting-for-9-5-implement-skip-locked-for-row-level-locks/
No No NOOO :-)
我知道作者的意思.我有类似的情况,我想出了一个很好的解决方案.首先,我将从描述我的情况开始.我有一个表i,我存储了必须在特定时间发送的消息.PG不支持函数的时序执行,因此我们必须使用守护进程(或cron).我使用自定义编写的脚本打开几个并行进程.每个进程选择一组必须以+1秒/ -1秒的精度发送的消息.表本身是使用新消息动态更新的.
因此每个进程都需要下载一组行.这组行不能被其他进程下载,因为它会造成很多混乱(有些人会在只收到一个时收到几条消息).这就是我们需要锁定行的原因.使用锁下载一组消息的查询:
FOR messages in select * from public.messages where sendTime >= CURRENT_TIMESTAMP - '1 SECOND'::INTERVAL AND sendTime <= CURRENT_TIMESTAMP + '1 SECOND'::INTERVAL AND sent is FALSE FOR UPDATE LOOP -- DO SMTH END LOOP;
每0.5秒启动一次具有此查询的进程.因此,这将导致下一个查询等待第一个锁解锁行.这种方法造成了巨大的延误 即使我们使用NOWAIT,查询也会导致我们不想要的异常,因为表中可能有新消息必须发送.如果仅使用FOR SHARE,查询将正确执行,但仍然需要花费大量时间来创建大量延迟.
为了使它工作,我们做了一点魔术:
更改查询:
FOR messages in select * from public.messages where sendTime >= CURRENT_TIMESTAMP - '1 SECOND'::INTERVAL AND sendTime <= CURRENT_TIMESTAMP + '1 SECOND'::INTERVAL AND sent is FALSE AND is_locked(msg_id) IS FALSE FOR SHARE LOOP -- DO SMTH END LOOP;
神秘的函数'is_locked(msg_id)'看起来像这样:
CREATE OR REPLACE FUNCTION is_locked(integer) RETURNS BOOLEAN AS $$ DECLARE id integer; checkout_id integer; is_it boolean; BEGIN checkout_id := $1; is_it := FALSE; BEGIN -- we use FOR UPDATE to attempt a lock and NOWAIT to get the error immediately id := msg_id FROM public.messages WHERE msg_id = checkout_id FOR UPDATE NOWAIT; EXCEPTION WHEN lock_not_available THEN is_it := TRUE; END; RETURN is_it; END; $$ LANGUAGE 'plpgsql' VOLATILE COST 100;
当然,我们可以自定义此功能,以便在数据库中的任何表上工作.在我看来,最好为一个表创建一个检查功能.向此函数添加更多内容可能会使其变慢.无论如何我需要更长时间来检查这个子句,所以不需要让它更慢.对我来说,这是完整的解决方案,它完美无缺.
现在,当我的50个进程并行运行时,每个进程都有一组唯一的新消息要发送.一旦发送,我只需用sent = TRUE更新行,再也不会再回到它.
我希望这个解决方案也适合你(作者).如果您有任何疑问,请告诉我:-)
哦,如果这对你有用,请告诉我.
我使用这样的东西:
select * into l_sms from sms where prefix_id = l_prefix_id and invoice_id is null and pg_try_advisory_lock(sms_id) order by suffix limit 1;
并且不要忘记调用pg_advisory_unlock
如果您要实现队列,请看一下PGQ,它已经解决了这个问题和其他问题。http://wiki.postgresql.org/wiki/PGQ_Tutorial