是否可以在不设置/检查任何标志/信号量/等的情况下终止正在运行的线程?
在Python和任何语言中突然杀死一个线程通常是一种糟糕的模式.想想以下情况:
线程正在持有必须正确关闭的关键资源
线程已经创建了几个必须被杀死的其他线程.
如果你负担得起它(如果你正在管理自己的线程),处理这个问题的好方法是有一个exit_request标志,每个线程定期检查它是否有时间退出.
例如:
import threading class StoppableThread(threading.Thread): """Thread class with a stop() method. The thread itself has to check regularly for the stopped() condition.""" def __init__(self, *args, **kwargs): super(StoppableThread, self).__init__(*args, **kwargs) self._stop_event = threading.Event() def stop(self): self._stop_event.set() def stopped(self): return self._stop_event.is_set()
在此代码中,您应该在线程上调用stop(),并等待线程使用join()正确退出.线程应定期检查停止标志.
但是有些情况下你真的需要杀死一个线程.例如,当您包装一个忙于长时间调用的外部库并且您想要中断它时.
以下代码允许(有一些限制)在Python线程中引发异常:
def _async_raise(tid, exctype): '''Raises an exception in the threads with id tid''' if not inspect.isclass(exctype): raise TypeError("Only types can be raised (not instances)") res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), ctypes.py_object(exctype)) if res == 0: raise ValueError("invalid thread id") elif res != 1: # "if it returns a number greater than one, you're in trouble, # and you should call it again with exc=NULL to revert the effect" ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), None) raise SystemError("PyThreadState_SetAsyncExc failed") class ThreadWithExc(threading.Thread): '''A thread class that supports raising exception in the thread from another thread. ''' def _get_my_tid(self): """determines this (self's) thread id CAREFUL : this function is executed in the context of the caller thread, to get the identity of the thread represented by this instance. """ if not self.isAlive(): raise threading.ThreadError("the thread is not active") # do we have it cached? if hasattr(self, "_thread_id"): return self._thread_id # no, look for it in the _active dict for tid, tobj in threading._active.items(): if tobj is self: self._thread_id = tid return tid # TODO: in python 2.6, there's a simpler way to do : self.ident raise AssertionError("could not determine the thread's id") def raiseExc(self, exctype): """Raises the given exception type in the context of this thread. If the thread is busy in a system call (time.sleep(), socket.accept(), ...), the exception is simply ignored. If you are sure that your exception should terminate the thread, one way to ensure that it works is: t = ThreadWithExc( ... ) ... t.raiseExc( SomeException ) while t.isAlive(): time.sleep( 0.1 ) t.raiseExc( SomeException ) If the exception is to be caught by the thread, you need a way to check that your thread has caught it. CAREFUL : this function is executed in the context of the caller thread, to raise an excpetion in the context of the thread represented by this instance. """ _async_raise( self._get_my_tid(), exctype )
(基于Tomer Filiba的Killable Threads.关于返回值的引用stop()
似乎来自旧版本的Python.)
如文档中所述,这不是一个神奇的子弹,因为如果线程在Python解释器之外繁忙,它将不会捕获中断.
此代码的良好使用模式是让线程捕获特定异常并执行清理.这样,您可以中断任务并仍然进行适当的清理.
没有官方API可以做到这一点,没有.
您需要使用平台API来终止线程,例如pthread_kill或TerminateThread.您可以通过pythonwin或ctypes访问此类API.
请注意,这本质上是不安全的.它可能会导致无法收集的垃圾(来自堆栈帧的局部变量变成垃圾),并且如果被杀死的线程在被杀死时具有GIL,则可能导致死锁.
一个multiprocessing.Process
CANp.terminate()
在我想杀死一个线程但不想使用flags/locks/signals/semaphores/events /的情况下,我将线程提升为完整的进程.对于仅使用几个线程的代码,开销并不是那么糟糕.
例如,这可以轻松终止执行阻塞I/O的帮助程序"线程"
转换很简单:在相关的代码替换所有threading.Thread
与multiprocessing.Process
所有queue.Queue
带multiprocessing.Queue
,并添加所需的呼叫p.terminate()
到要杀死其子你的父进程p
Python文档
如果您尝试终止整个程序,可以将该线程设置为"守护程序".请参阅 Thread.daemon
这是基于thread2 - killable线程(Python配方)
您需要调用PyThreadState_SetasyncExc(),它只能通过ctypes获得.
这仅在Python 2.7.3上进行了测试,但它可能适用于其他最近的2.x版本.
import ctypes def terminate_thread(thread): """Terminates a python thread from another thread. :param thread: a threading.Thread instance """ if not thread.isAlive(): return exc = ctypes.py_object(SystemExit) res = ctypes.pythonapi.PyThreadState_SetAsyncExc( ctypes.c_long(thread.ident), exc) if res == 0: raise ValueError("nonexistent thread id") elif res > 1: # """if it returns a number greater than one, you're in trouble, # and you should call it again with exc=NULL to revert the effect""" ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, None) raise SystemError("PyThreadState_SetAsyncExc failed")
正如其他人所提到的,常规是设置一个停止标志.对于轻量级的东西(没有Thread的子类,没有全局变量),lambda回调是一个选项.(注意括号中的if stop()
.)
import threading import time def do_work(id, stop): print("I am thread", id) while True: print("I am thread {} doing something".format(id)) if stop(): print(" Exiting loop.") break print("Thread {}, signing off".format(id)) def main(): stop_threads = False workers = [] for id in range(0,3): tmp = threading.Thread(target=do_work, args=(id, lambda: stop_threads)) workers.append(tmp) tmp.start() time.sleep(3) print('main: done sleeping; time to stop the threads.') stop_threads = True for worker in workers: worker.join() print('Finis.') if __name__ == '__main__': main()
print()
使用pr()
始终刷新(sys.stdout.flush()
)的函数替换可以提高shell输出的精度.
(仅在Windows/Eclipse/Python3.3上测试)
你不应该在没有合作的情况下强行杀死一个线程.
杀死一个线程会删除尝试/最终阻止设置的任何保证,这样你就可以锁定锁,打开文件等.
唯一一次你可以争辩强行杀死线程是一个好主意是快速杀死程序,但绝不是单线程.
在Python中,你根本无法直接杀死一个线程.
如果你真的不需要一个Thread(!),你可以做的,而不是使用线程包,就是使用 多处理包.在这里,要杀死进程,您只需调用该方法:
yourProcess.terminate() # kill the process!
Python将终止你的进程(在Unix上通过SIGTERM信号,而在Windows上通过TerminateProcess()
调用).使用队列或管道时要注意使用它!(它可能会破坏队列/管道中的数据)
需要注意的是,multiprocessing.Event
与multiprocessing.Semaphore
中的相同的方式工作,准确threading.Event
和threading.Semaphore
分别.事实上,第一个是后者的克隆.
如果您真的需要使用Thread,则无法直接杀死它.但是,您可以使用"守护程序线程".事实上,在Python中,Thread可以被标记为守护进程:
yourThread.daemon = True # set the Thread as a "daemon thread"
当没有剩下活着的非守护程序线程时,主程序将退出.换句话说,当您的主线程(当然是非守护程序线程)将完成其操作时,即使仍有一些守护程序线程正在运行,程序也将退出.
请注意,必须daemon
在start()
调用方法之前设置Thread !
当然,你可以而且应该使用daemon
它multiprocessing
.这里,当主进程退出时,它会尝试终止其所有守护进程子进程.
最后,请注意sys.exit()
并且os.kill()
不是选择.
您可以通过在将退出线程的线程中安装trace来终止线程.有关可能的实现,请参阅附件链接.
用Python杀死一个线程
如果你不杀死一个线程会更好.一种方法可能是在线程的循环中引入一个"try"块,并在你想要停止线程时抛出一个异常(例如一个break/return/...停止你的for/while/...).我在我的应用程序上使用过这个功能......
绝对可以实现一个Thread.stop
方法,如以下示例代码所示:
import sys import threading import time class StopThread(StopIteration): pass threading.SystemExit = SystemExit, StopThread class Thread2(threading.Thread): def stop(self): self.__stop = True def _bootstrap(self): if threading._trace_hook is not None: raise ValueError('Cannot run thread with tracing!') self.__stop = False sys.settrace(self.__trace) super()._bootstrap() def __trace(self, frame, event, arg): if self.__stop: raise StopThread() return self.__trace class Thread3(threading.Thread): def _bootstrap(self, stop_thread=False): def stop(): nonlocal stop_thread stop_thread = True self.stop = stop def tracer(*_): if stop_thread: raise StopThread() return tracer sys.settrace(tracer) super()._bootstrap() ############################################################################### def main(): test1 = Thread2(target=printer) test1.start() time.sleep(1) test1.stop() test1.join() test2 = Thread2(target=speed_test) test2.start() time.sleep(1) test2.stop() test2.join() test3 = Thread3(target=speed_test) test3.start() time.sleep(1) test3.stop() test3.join() def printer(): while True: print(time.time() % 1) time.sleep(0.1) def speed_test(count=0): try: while True: count += 1 except StopThread: print('Count =', count) if __name__ == '__main__': main()
的Thread3
类似乎运行代码比快大约33%的Thread2
类.
from ctypes import * pthread = cdll.LoadLibrary("libpthread-2.15.so") pthread.pthread_cancel(c_ulong(t.ident))
t是你的Thread
对象.
阅读python源代码(Modules/threadmodule.c
和Python/thread_pthread.h
)你可以看到它Thread.ident
是一个pthread_t
类型,所以你可以做任何事情都pthread
可以在python中使用libpthread
.
可以使用以下解决方法杀死线程:
kill_threads = False def doSomething(): global kill_threads while True: if kill_threads: thread.exit() ...... ...... thread.start_new_thread(doSomething, ())
这甚至可以用于终止从主线程终止其代码在另一个模块中编写的线程。我们可以在该模块中声明一个全局变量,并使用它终止该模块中产生的线程。
我通常使用它在程序出口处终止所有线程。这可能不是终止线程的理想方法,但可能会有所帮助。
如果你是显式调用time.sleep()
为你的线程(比如查询一些外部的服务)的一部分,在菲利普的方法的改进是使用了超时的event
的wait()
方法,无论你sleep()
例如:
import threading class KillableThread(threading.Thread): def __init__(self, sleep_interval=1): super().__init__() self._kill = threading.Event() self._interval = sleep_interval def run(self): while True: print("Do Something") # If no kill signal is set, sleep for the interval, # If kill signal comes in while sleeping, immediately # wake up and handle is_killed = self._kill.wait(self._interval) if is_killed: break print("Killing Thread") def kill(self): self._kill.set()
然后运行
t = KillableThread(sleep_interval=5) t.start() # Every 5 seconds it prints: #: Do Something t.kill() #: Killing Thread
使用wait()
而不是sleep()
ing并定期检查事件的优点是您可以在更长的睡眠间隔内进行编程,线程几乎立即停止(原本应为sleep()
ing时),并且我认为处理退出的代码明显更简单。
我玩这个游戏很晚,但是我一直在努力解决类似的问题,以下内容似乎可以为我很好地解决问题,并且可以在守护子线程退出时让我进行一些基本的线程状态检查和清理:
import threading import time import atexit def do_work(): i = 0 @atexit.register def goodbye(): print ("'CLEANLY' kill sub-thread with value: %s [THREAD: %s]" % (i, threading.currentThread().ident)) while True: print i i += 1 time.sleep(1) t = threading.Thread(target=do_work) t.daemon = True t.start() def after_timeout(): print "KILL MAIN THREAD: %s" % threading.currentThread().ident raise SystemExit threading.Timer(2, after_timeout).start()
产量:
0 1 KILL MAIN THREAD: 140013208254208 'CLEANLY' kill sub-thread with value: 2 [THREAD: 140013674317568]