threading与并发
threading 模块提供基于线程的并发执行。Python 2.7 使用操作系统原生线程,但由于 GIL(Global Interpreter Lock,全局解释器锁)的存在,多线程不能实现真正的 CPU 并行。不过,线程在 I/O 密集型任务(网络请求、文件读写)中仍然非常有用。
创建线程
Thread 类:
import threading
def worker(name):
print "Worker %s starting" % name
# 执行任务
print "Worker %s done" % name
# 创建线程
thread = threading.Thread(target=worker, args=("Alice",))
thread.start() # 启动线程
thread.join() # 等待线程结束
print "Main thread continues"
继承 Thread 类:
import threading
class MyThread(threading.Thread):
def __init__(self, name):
super(MyThread, self).__init__()
self.name = name
def run(self):
print "Thread %s running" % self.name
thread = MyThread("Worker")
thread.start()
thread.join()
线程同步
Lock(互斥锁):
import threading
counter = 0
lock = threading.Lock()
def increment():
global counter
for _ in range(100000):
with lock: # 获取锁,退出时自动释放
counter += 1
threads = [threading.Thread(target=increment) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
print counter # 1000000
没有锁时,counter += 1 不是原子操作,多线程竞争会导致结果小于预期。
RLock(可重入锁):
import threading
rlock = threading.RLock()
def outer():
with rlock:
print "Outer"
inner()
def inner():
with rlock:
print "Inner"
outer() # 正常执行
同一线程可以多次获取 RLock,而 Lock 会死锁。
Semaphore(信号量):
import threading
# 最多允许 3 个线程同时执行
semaphore = threading.Semaphore(3)
def limited_worker():
with semaphore:
print "Working..."
# 模拟耗时操作
threading.Event().wait(1)
线程间通信
Event:
import threading
event = threading.Event()
def waiter():
print "Waiting..."
event.wait() # 阻塞,直到事件被设置
print "Event received!"
def setter():
threading.Event().wait(2)
print "Setting event"
event.set()
threading.Thread(target=waiter).start()
threading.Thread(target=setter).start()
Queue:
import threading
import Queue
q = Queue.Queue()
def producer():
for i in range(5):
q.put(i)
print "Produced:", i
def consumer():
while True:
item = q.get()
if item is None: # 结束信号
break
print "Consumed:", item
q.task_done()
threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()
Queue 是线程安全的,内部自动处理锁。
线程池
Python 2.7 标准库没有内置线程池,可以用 concurrent.futures(Python 3.2+)或手动实现:
import threading
import Queue
class ThreadPool(object):
def __init__(self, num_threads):
self.tasks = Queue.Queue()
for _ in range(num_threads):
threading.Thread(target=self.worker, daemon=True).start()
def worker(self):
while True:
func, args, kwargs = self.tasks.get()
try:
func(*args, **kwargs)
except Exception as e:
print "Task error:", e
finally:
self.tasks.task_done()
def submit(self, func, *args, **kwargs):
self.tasks.put((func, args, kwargs))
def wait(self):
self.tasks.join()
# 使用
pool = ThreadPool(4)
for i in range(10):
pool.submit(print, "Task", i)
pool.wait()
GIL 的影响
Python 的 GIL 确保同一时刻只有一个线程执行 Python 字节码:
import threading
import time
def cpu_bound():
count = 0
for i in range(10000000):
count += i
return count
# 多线程(由于 GIL,不会更快)
start = time.time()
threads = [threading.Thread(target=cpu_bound) for _ in range(2)]
for t in threads:
t.start()
for t in threads:
t.join()
print "Threads:", time.time() - start
# 单线程(可能更快)
start = time.time()
cpu_bound()
cpu_bound()
print "Sequential:", time.time() - start
对于 CPU 密集型任务,多线程不会加速,甚至因为线程切换开销而更慢。应使用多进程(multiprocessing 模块)或 C 扩展。
实际应用
并发下载:
import threading
import urllib2
urls = [
"http://example.com/file1.txt",
"http://example.com/file2.txt",
"http://example.com/file3.txt",
]
results = {}
lock = threading.Lock()
def download(url):
try:
response = urllib2.urlopen(url, timeout=10)
data = response.read()
with lock:
results[url] = data
except Exception as e:
with lock:
results[url] = str(e)
threads = [threading.Thread(target=download, args=(url,)) for url in urls]
for t in threads:
t.start()
for t in threads:
t.join()
print "Downloaded %d files" % len(results)
定时任务:
import threading
def schedule(interval, func, *args, **kwargs):
def wrapper():
func(*args, **kwargs)
threading.Timer(interval, wrapper).start()
threading.Timer(interval, wrapper).start()
def heartbeat():
print "Heartbeat"
schedule(5, heartbeat) # 每 5 秒执行一次
守护线程
import threading
import time
def background_task():
while True:
print "Background running"
time.sleep(1)
thread = threading.Thread(target=background_task, daemon=True)
thread.start()
print "Main thread done"
# 主线程结束时,守护线程自动终止
守护线程在程序退出时不会阻止进程终止,适合后台监控、日志等任务。
注意事项
- 多线程共享数据需要同步(锁、Queue)
- GIL 限制 CPU 并行,CPU 密集型用
multiprocessing - 线程不是免费的,创建和切换有开销
- 避免死锁:锁的获取顺序要一致
- Python 2.7 中
threading是高级接口,底层thread模块已废弃