并发方案速查表

方法场景数据传递优势坑点
threadingI/O等待多的任务共享内存,直接传引用轻量,启动快GIL导致CPU密集型假并行,线程安全头疼
multiprocessingCPU密集计算pickle序列化,开销大真并行,各进程独立GIL内存翻倍,启动慢,调试难
concurrent.futures批量同质任务看底层是线程还是进程API简洁,自带超时控制功能简单,复杂调度做不了
asyncio大量I/O并发协程间直接传递单线程干万线程的活async传染性,生态割裂,调试是噩梦
joblibnumpy计算特殊优化的序列化自带缓存,进度条香基本就是给数据科学用的

Threading 多线程

GIL 的真相

import threading
import time
import dis

# 看看GIL什么时候释放
def check_gil_release():
    # I/O操作会释放GIL
    time.sleep(0.001)  # ✓ 释放
    open('file.txt')   # ✓ 释放
    requests.get(url)  # ✓ 释放

    # 纯Python计算不释放
    sum(range(1000000))  # ✗ 不释放
    [i**2 for i in range(1000)]  # ✗ 不释放

# 查看字节码,每100条指令检查一次是否要切换线程
dis.dis(lambda: sum(range(100)))

线程安全的数据结构

import threading
from queue import Queue, LifoQueue, PriorityQueue
from collections import deque
import time

# 线程安全的队列们
safe_queue = Queue()  # FIFO
safe_stack = LifoQueue()  # LIFO
safe_pq = PriorityQueue()  # 优先队列

# deque某些操作是原子的(但不是全部!)
safe_deque = deque()  # append(), appendleft(), pop(), popleft() 是原子的

# 自己实现线程安全计数器
class ThreadSafeCounter:
    def __init__(self):
        self._value = 0
        self._lock = threading.Lock()

    def increment(self):
        with self._lock:
            self._value += 1
            return self._value

    def value(self):
        with self._lock:
            return self._value

# 实测GIL切换开销
def measure_gil_overhead():
    """测量线程切换开销"""
    def cpu_bound(n):
        return sum(i**2 for i in range(n))

    n = 1000000

    # 单线程
    start = time.perf_counter()
    for _ in range(4):
        cpu_bound(n)
    single = time.perf_counter() - start

    # 多线程(4个线程)
    start = time.perf_counter()
    threads = [threading.Thread(target=cpu_bound, args=(n,)) for _ in range(4)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    multi = time.perf_counter() - start

    print(f"单线程: {single:.3f}s")
    print(f"多线程: {multi:.3f}s")
    print(f"多线程反而慢了: {multi/single:.2f}x")  # 通常是1.5-2x慢

线程池的坑

from concurrent.futures import ThreadPoolExecutor
import threading

# 线程池复用的陷阱
thread_locals = threading.local()

def task_with_state(task_id):
    # 线程局部存储在池中会被复用!
    if not hasattr(thread_locals, 'count'):
        thread_locals.count = 0
    thread_locals.count += 1

    thread_id = threading.current_thread().ident
    print(f"Task {task_id} on thread {thread_id}, count={thread_locals.count}")
    return thread_locals.count

# 同一个线程会处理多个任务
with ThreadPoolExecutor(max_workers=2) as executor:
    results = list(executor.map(task_with_state, range(10)))
# 会看到同一线程的count在累加!

Multiprocessing 多进程

序列化开销实测

import multiprocessing as mp
import pickle
import numpy as np
import time

def measure_pickle_overhead():
    """测量不同数据类型的序列化开销"""
    test_data = {
        'small_list': list(range(100)),
        'large_list': list(range(1000000)),
        'numpy_array': np.random.rand(1000000),
        'nested_dict': {i: {j: j**2 for j in range(100)} for i in range(100)},
    }

    for name, data in test_data.items():
        start = time.perf_counter()
        pickled = pickle.dumps(data)
        pickle_time = time.perf_counter() - start

        start = time.perf_counter()
        unpickled = pickle.loads(pickled)
        unpickle_time = time.perf_counter() - start

        size_mb = len(pickled) / 1024 / 1024
        print(f"{name:15} | Size: {size_mb:6.2f}MB | "
              f"Pickle: {pickle_time*1000:6.2f}ms | "
              f"Unpickle: {unpickle_time*1000:6.2f}ms")

# 共享内存避免序列化(Python 3.8+)
def use_shared_memory():
    from multiprocessing import shared_memory

    # 创建共享内存
    arr = np.random.rand(10000000)
    shm = shared_memory.SharedMemory(create=True, size=arr.nbytes)
    shared_arr = np.ndarray(arr.shape, dtype=arr.dtype, buffer=shm.buf)
    shared_arr[:] = arr[:]

    def worker(shm_name, shape, dtype):
        # 在子进程中连接到共享内存
        existing_shm = shared_memory.SharedMemory(name=shm_name)
        arr = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
        result = np.sum(arr)  # 直接操作,无需序列化
        existing_shm.close()
        return result

    # 传递的只是名字和元数据,不是数据本身
    with mp.Pool() as pool:
        result = pool.apply_async(worker, (shm.name, arr.shape, arr.dtype))
        print(result.get())

    shm.close()
    shm.unlink()

进程池的内存问题

import os
import psutil
import multiprocessing as mp

def memory_hungry_task(n):
    """模拟内存密集型任务"""
    process = psutil.Process(os.getpid())

    # 分配大内存
    big_list = list(range(n))

    memory_mb = process.memory_info().rss / 1024 / 1024
    print(f"Process {os.getpid()} using {memory_mb:.2f}MB")

    return sum(big_list)

# 观察内存使用
def monitor_memory_usage():
    n = 10000000

    # 主进程内存
    main_process = psutil.Process()
    main_memory_before = main_process.memory_info().rss / 1024 / 1024

    # 创建4个进程,每个都会复制一份数据
    with mp.Pool(4) as pool:
        results = pool.map(memory_hungry_task, [n] * 4)

    main_memory_after = main_process.memory_info().rss / 1024 / 1024
    print(f"Main process: {main_memory_before:.2f}MB -> {main_memory_after:.2f}MB")

Concurrent.futures的细节

Future 对象的妙用

from concurrent.futures import ThreadPoolExecutor, Future, as_completed, wait, FIRST_COMPLETED
import time
import random

def unpredictable_task(task_id):
    """模拟不确定完成时间的任务"""
    sleep_time = random.uniform(0.1, 2.0)
    time.sleep(sleep_time)
    if random.random() < 0.3:  # 30%概率失败
        raise Exception(f"Task {task_id} failed")
    return f"Task {task_id} completed in {sleep_time:.2f}s"

# 不同的等待策略
def future_patterns():
    with ThreadPoolExecutor(max_workers=5) as executor:
        # 提交任务,立即返回Future
        futures = [executor.submit(unpredictable_task, i) for i in range(10)]

        # 方式1: as_completed - 谁完成了处理谁
        print("=== as_completed ===")
        for future in as_completed(futures, timeout=5):
            try:
                result = future.result()
                print(f"✓ {result}")
            except Exception as e:
                print(f"✗ {e}")

        # 方式2: wait - 等待特定条件
        futures = [executor.submit(unpredictable_task, i) for i in range(5)]
        done, not_done = wait(futures, timeout=1, return_when=FIRST_COMPLETED)
        print(f"\\n=== wait ===")
        print(f"Completed: {len(done)}, Pending: {len(not_done)}")

        # 方式3: 直接调用result() - 阻塞等待
        future = executor.submit(unpredictable_task, 99)
        try:
            result = future.result(timeout=3)
            print(f"\\n=== direct result ===\\n✓ {result}")
        except Exception as e:
            print(f"\\n=== direct result ===\\n✗ {e}")

# 回调机制
def with_callbacks():
    def task_done_callback(future):
        try:
            result = future.result()
            print(f"Callback: Task completed with {result}")
        except Exception as e:
            print(f"Callback: Task failed with {e}")

    with ThreadPoolExecutor(max_workers=2) as executor:
        future = executor.submit(unpredictable_task, 1)
        future.add_done_callback(task_done_callback)

        # 主线程可以继续做其他事
        print("Main thread continues...")
        time.sleep(3)

Asyncio 深入

事件循环的本质

import asyncio
import selectors
import socket

# asyncio 底层就是 select/epoll
def peek_under_hood():
    """看看事件循环在干什么"""
    loop = asyncio.get_event_loop()

    # 获取底层的selector
    selector = loop._selector  # 私有属性,别在生产环境用
    print(f"Using selector: {type(selector)}")  # Linux上是EpollSelector

    # 查看注册的文件描述符
    for key, events in selector.get_map().items():
        print(f"FD: {key}, Events: {events}")

# 自定义事件循环策略
class DebugEventLoop(asyncio.BaseEventLoop):
    def _run_once(self):
        print("Event loop tick")
        super()._run_once()

# 协程的调度
async def inspect_scheduling():
    """观察协程如何被调度"""
    async def task(name, delay):
        print(f"{name} started")
        await asyncio.sleep(delay)
        print(f"{name} finished")
        return name

    # 创建任务但不等待
    tasks = [
        asyncio.create_task(task("A", 0.5)),
        asyncio.create_task(task("B", 0.3)),
        asyncio.create_task(task("C", 0.4))
    ]

    # 任务已经在运行了!
    await asyncio.sleep(0.1)
    print("Main coroutine continues while tasks run")

    # 现在等待所有任务
    results = await asyncio.gather(*tasks)
    return results

异步上下文管理器和迭代器

class AsyncResource:
    """异步资源管理器"""
    async def __aenter__(self):
        print("Acquiring resource...")
        await asyncio.sleep(0.5)
        self.resource = "Resource acquired"
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Releasing resource...")
        await asyncio.sleep(0.5)
        self.resource = None

class AsyncDataStream:
    """异步迭代器"""
    def __init__(self, max_items=5):
        self.max_items = max_items
        self.current = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.current >= self.max_items:
            raise StopAsyncIteration

        await asyncio.sleep(0.2)  # 模拟异步获取数据
        self.current += 1
        return f"Data chunk {self.current}"

async def use_async_patterns():
    # 使用异步上下文管理器
    async with AsyncResource() as resource:
        print(f"Using {resource.resource}")

    # 使用异步迭代器
    async for chunk in AsyncDataStream():
        print(f"Processing {chunk}")

并发控制

import asyncio
from asyncio import Semaphore, Lock, Event, Condition

class RateLimiter:
    """速率限制器"""
    def __init__(self, rate, per):
        self.rate = rate
        self.per = per
        self.allowance = rate
        self.last_check = asyncio.get_event_loop().time()

    async def acquire(self):
        current = asyncio.get_event_loop().time()
        time_passed = current - self.last_check
        self.last_check = current
        self.allowance += time_passed * (self.rate / self.per)

        if self.allowance > self.rate:
            self.allowance = self.rate

        if self.allowance < 1.0:
            sleep_time = (1.0 - self.allowance) * (self.per / self.rate)
            await asyncio.sleep(sleep_time)
            self.allowance = 0.0
        else:
            self.allowance -= 1.0

async def controlled_concurrency():
    # 信号量控制并发数
    sem = Semaphore(3)

    async def limited_task(id):
        async with sem:
            print(f"Task {id} running")
            await asyncio.sleep(1)

    # 只有3个任务同时运行
    await asyncio.gather(*[limited_task(i) for i in range(10)])

    # 速率限制
    limiter = RateLimiter(rate=5, per=1)  # 5 requests per second

    async def rate_limited_task(id):
        await limiter.acquire()
        print(f"Request {id} sent at {asyncio.get_event_loop().time():.2f}")

    await asyncio.gather(*[rate_limited_task(i) for i in range(10)])

Joblib 实用技巧

缓存机制

from joblib import Memory
import numpy as np
import time

# 内存缓存
memory = Memory('./cache_dir', verbose=1)

@memory.cache
def expensive_computation(n):
    """昂贵的计算,结果会被缓存"""
    print(f"Computing for n={n}...")
    time.sleep(2)
    return np.random.rand(n, n)

# 第一次调用:计算
result1 = expensive_computation(1000)
# 第二次调用:从缓存读取
result2 = expensive_computation(1000)

# 清理缓存
memory.clear(warn=False)

# 条件缓存
@memory.cache(ignore=['verbose'])
def flexible_function(data, verbose=False):
    """verbose参数改变不会影响缓存"""
    if verbose:
        print("Processing...")
    return data.mean()

内存映射优化

from joblib import Parallel, delayed, dump, load
import numpy as np

def process_large_array():
    """处理超大数组时的内存优化"""
    # 创建内存映射数组,不会全部加载到内存
    large_array = np.memmap('large_array.dat', dtype='float32',
                            mode='w+', shape=(100000, 1000))
    large_array[:] = np.random.rand(100000, 1000)

    def process_chunk(arr, start, end):
        # 只处理一部分,内存友好
        return arr[start:end].mean()

    # 并行处理,每个进程只加载需要的部分
    chunk_size = 10000
    results = Parallel(n_jobs=4)(
        delayed(process_chunk)(large_array, i, i+chunk_size)
        for i in range(0, len(large_array), chunk_size)
    )

    return results

# 高效的数据持久化
def efficient_persistence():
    data = {'arrays': [np.random.rand(1000, 1000) for _ in range(10)]}

    # 使用joblib的dump/load,比pickle快
    dump(data, 'data.joblib', compress=3)  # 压缩级别0-9
    loaded_data = load('data.joblib')

性能对比实验

import time
import asyncio
import requests
import aiohttp
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from joblib import Parallel, delayed

def benchmark_all():
    """全方位性能对比"""

    # I/O密集型任务
    def io_task(n):
        time.sleep(0.01)
        return n ** 2

    # CPU密集型任务
    def cpu_task(n):
        return sum(i ** 2 for i in range(n))

    task_count = 100

    results = {}

    # 串行基准
    start = time.perf_counter()
    [io_task(i) for i in range(task_count)]
    results['Serial I/O'] = time.perf_counter() - start

    # 线程池
    start = time.perf_counter()
    with ThreadPoolExecutor(max_workers=10) as executor:
        list(executor.map(io_task, range(task_count)))
    results['ThreadPool I/O'] = time.perf_counter() - start

    # 进程池
    start = time.perf_counter()
    with ProcessPoolExecutor(max_workers=4) as executor:
        list(executor.map(cpu_task, [10000] * 20))
    results['ProcessPool CPU'] = time.perf_counter() - start

    # Joblib
    start = time.perf_counter()
    Parallel(n_jobs=4)(delayed(cpu_task)(10000) for _ in range(20))
    results['Joblib CPU'] = time.perf_counter() - start

    for name, duration in results.items():
        print(f"{name:20} {duration:.3f}s")

实战踩坑记录

asyncio 和 threading 混用

# 错误:在线程中运行事件循环
def wrong_way():
    def thread_target():
        # 这会出问题!
        asyncio.run(some_async_function())

    thread = threading.Thread(target=thread_target)
    thread.start()

# 正确:使用专门的方法
def right_way():
    def thread_target():
        # 创建新的事件循环
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.run_until_complete(some_async_function())
        loop.close()

multiprocessing 在 Windows 上的坑

# Windows 上必须有这个守卫
if __name__ == '__main__':
    # multiprocessing 代码
    pass

# 否则会无限递归创建进程

死锁检测

import threading
import time

def detect_deadlock():
    lock1 = threading.Lock()
    lock2 = threading.Lock()

    def thread1():
        with lock1:
            time.sleep(0.1)
            with lock2:
                pass

    def thread2():
        with lock2:
            time.sleep(0.1)
            with lock1:  # 死锁!
                pass

    # 使用超时避免永久阻塞
    def safe_thread1():
        if lock1.acquire(timeout=1):
            try:
                if lock2.acquire(timeout=1):
                    try:
                        pass
                    finally:
                        lock2.release()
            finally:
                lock1.release()

总结思考

  • GIL不是缺陷是设计:保证了CPython的内存管理线程安全,代价是CPU密集型的并行

  • async不是银弹:代码复杂度上升,调试困难,生态分裂

  • 进程不是越多越好:考虑内存和启动开销

  • 共享内存很危险:能不共享就不共享,要共享就用队列

  • 性能不是唯一指标:可维护性、可调试性同样重要

最近在生产环境的经验:

  1. Web服务用asyncio(FastAPI)

  2. 数据处理用joblib

  3. 简单并发用concurrent.futures

  4. 实时系统避免Python(GIL是硬伤)