飞翔飞翔
主页
  • 计算机基础

    • TCP/IP协议
    • Linux命令
  • 数据库

    • SQL教程
  • 编程语言

    • C语言
    • Python2
    • Python3
  • 数据格式

    • JSON教程
  • 工具

    • Markdown指南
  • Git

    • GitFlow
  • Quartz

    • Quartz教程
  • Java

    • Java设计模式
  • 缓存

    • Redis教程
联系
阿里云
主页
  • 计算机基础

    • TCP/IP协议
    • Linux命令
  • 数据库

    • SQL教程
  • 编程语言

    • C语言
    • Python2
    • Python3
  • 数据格式

    • JSON教程
  • 工具

    • Markdown指南
  • Git

    • GitFlow
  • Quartz

    • Quartz教程
  • Java

    • Java设计模式
  • 缓存

    • Redis教程
联系
阿里云
  • 学习路径
  • 第1章 认识Python

    • Python 历史与特点
    • Python 2 与 Python 3 的核心差异
    • 安装与运行 Python 2.7.18
    • 编码规范 PEP 8
  • 第2章 基础语法

    • 变量与对象
    • 数字类型
    • 字符串 str
    • Unicode 字符串
    • 运算符
    • 空值 None
  • 第3章 流程控制

    • if 条件语句
    • if-else 条件语句
    • if-elif-else 多分支
    • 条件表达式(三元运算符)
    • while 循环
    • for 循环
    • range 与 xrange
    • 循环控制:break、continue、pass
    • 循环 else 子句
  • 第4章 数据结构

    • 列表基础
    • 列表方法
    • 列表推导式
    • 元组
    • 字典基础
    • 字典方法
    • 字典循环技巧
    • 集合
    • 序列解包
    • 序列比较
  • 第5章 函数

    • 定义函数
    • 参数传递机制
    • 默认参数
    • 关键字参数
    • 可变参数
    • Lambda 表达式
    • 文档字符串
    • 函数对象
  • 第6章 模块与包

    • import 导入
    • 模块搜索路径
    • name 与主程序
    • 编译文件 .pyc 与 .pyo
    • 包结构
    • dir() 函数
  • 第7章 文件与IO

    • 打开与关闭文件
    • 文件读写方法
    • with 上下文管理器
    • 格式化输出:% 操作符
    • 格式化输出:str.format()
    • JSON 序列化
  • 第8章 面向对象

    • 类定义与实例化
    • init 构造方法
    • 类变量与实例变量
    • 方法调用与 self
    • 继承基础
    • 多重继承
    • 新式类与旧式类
    • 私有变量与名称改写
    • 属性装饰器 property
    • 类方法与静态方法
    • 魔术方法
    • 空类与数据记录
  • 第9章 异常处理

    • 异常类型
    • try-except
    • try-except-else-finally
    • 抛出异常 raise
    • 自定义异常
    • with 语句与上下文管理器
  • 第10章 迭代器与生成器

    • 迭代器协议
    • 生成器函数
    • 生成器表达式
    • itertools模块
  • 第11章 标准库精要

    • os模块
    • sys模块
    • datetime模块
    • re模块
    • json模块
    • collections模块
    • math与random模块
    • urllib2与网络请求
    • subprocess与命令执行
    • threading与并发
    • unittest与测试
    • 虚拟环境与包管理
  • 第12章 工程实践

    • 调试技巧
    • 性能分析
    • 文档与注释
    • 下一步学习

threading与并发

threading 模块提供基于线程的并发执行。Python 2.7 使用操作系统原生线程,但由于 GIL(Global Interpreter Lock,全局解释器锁)的存在,多线程不能实现真正的 CPU 并行。不过,线程在 I/O 密集型任务(网络请求、文件读写)中仍然非常有用。

创建线程

Thread 类:

import threading

def worker(name):
    print "Worker %s starting" % name
    # 执行任务
    print "Worker %s done" % name

# 创建线程
thread = threading.Thread(target=worker, args=("Alice",))
thread.start()          # 启动线程
thread.join()           # 等待线程结束

print "Main thread continues"

继承 Thread 类:

import threading

class MyThread(threading.Thread):
    def __init__(self, name):
        super(MyThread, self).__init__()
        self.name = name
    
    def run(self):
        print "Thread %s running" % self.name

thread = MyThread("Worker")
thread.start()
thread.join()

线程同步

Lock(互斥锁):

import threading

counter = 0
lock = threading.Lock()

def increment():
    global counter
    for _ in range(100000):
        with lock:          # 获取锁,退出时自动释放
            counter += 1

threads = [threading.Thread(target=increment) for _ in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print counter           # 1000000

没有锁时,counter += 1 不是原子操作,多线程竞争会导致结果小于预期。

RLock(可重入锁):

import threading

rlock = threading.RLock()

def outer():
    with rlock:
        print "Outer"
        inner()

def inner():
    with rlock:
        print "Inner"

outer()                 # 正常执行

同一线程可以多次获取 RLock,而 Lock 会死锁。

Semaphore(信号量):

import threading

# 最多允许 3 个线程同时执行
semaphore = threading.Semaphore(3)

def limited_worker():
    with semaphore:
        print "Working..."
        # 模拟耗时操作
        threading.Event().wait(1)

线程间通信

Event:

import threading

event = threading.Event()

def waiter():
    print "Waiting..."
    event.wait()          # 阻塞,直到事件被设置
    print "Event received!"

def setter():
    threading.Event().wait(2)
    print "Setting event"
    event.set()

threading.Thread(target=waiter).start()
threading.Thread(target=setter).start()

Queue:

import threading
import Queue

q = Queue.Queue()

def producer():
    for i in range(5):
        q.put(i)
        print "Produced:", i

def consumer():
    while True:
        item = q.get()
        if item is None:    # 结束信号
            break
        print "Consumed:", item
        q.task_done()

threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()

Queue 是线程安全的,内部自动处理锁。

线程池

Python 2.7 标准库没有内置线程池,可以用 concurrent.futures(Python 3.2+)或手动实现:

import threading
import Queue

class ThreadPool(object):
    def __init__(self, num_threads):
        self.tasks = Queue.Queue()
        for _ in range(num_threads):
            threading.Thread(target=self.worker, daemon=True).start()
    
    def worker(self):
        while True:
            func, args, kwargs = self.tasks.get()
            try:
                func(*args, **kwargs)
            except Exception as e:
                print "Task error:", e
            finally:
                self.tasks.task_done()
    
    def submit(self, func, *args, **kwargs):
        self.tasks.put((func, args, kwargs))
    
    def wait(self):
        self.tasks.join()

# 使用
pool = ThreadPool(4)
for i in range(10):
    pool.submit(print, "Task", i)
pool.wait()

GIL 的影响

Python 的 GIL 确保同一时刻只有一个线程执行 Python 字节码:

import threading
import time

def cpu_bound():
    count = 0
    for i in range(10000000):
        count += i
    return count

# 多线程(由于 GIL,不会更快)
start = time.time()
threads = [threading.Thread(target=cpu_bound) for _ in range(2)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print "Threads:", time.time() - start

# 单线程(可能更快)
start = time.time()
cpu_bound()
cpu_bound()
print "Sequential:", time.time() - start

对于 CPU 密集型任务,多线程不会加速,甚至因为线程切换开销而更慢。应使用多进程(multiprocessing 模块)或 C 扩展。

实际应用

并发下载:

import threading
import urllib2

urls = [
    "http://example.com/file1.txt",
    "http://example.com/file2.txt",
    "http://example.com/file3.txt",
]

results = {}
lock = threading.Lock()

def download(url):
    try:
        response = urllib2.urlopen(url, timeout=10)
        data = response.read()
        with lock:
            results[url] = data
    except Exception as e:
        with lock:
            results[url] = str(e)

threads = [threading.Thread(target=download, args=(url,)) for url in urls]
for t in threads:
    t.start()
for t in threads:
    t.join()

print "Downloaded %d files" % len(results)

定时任务:

import threading

def schedule(interval, func, *args, **kwargs):
    def wrapper():
        func(*args, **kwargs)
        threading.Timer(interval, wrapper).start()
    
    threading.Timer(interval, wrapper).start()

def heartbeat():
    print "Heartbeat"

schedule(5, heartbeat)      # 每 5 秒执行一次

守护线程

import threading
import time

def background_task():
    while True:
        print "Background running"
        time.sleep(1)

thread = threading.Thread(target=background_task, daemon=True)
thread.start()

print "Main thread done"
# 主线程结束时,守护线程自动终止

守护线程在程序退出时不会阻止进程终止,适合后台监控、日志等任务。

注意事项

  • 多线程共享数据需要同步(锁、Queue)
  • GIL 限制 CPU 并行,CPU 密集型用 multiprocessing
  • 线程不是免费的,创建和切换有开销
  • 避免死锁:锁的获取顺序要一致
  • Python 2.7 中 threading 是高级接口,底层 thread 模块已废弃
上一页
subprocess与命令执行
下一页
unittest与测试