并发理论与实践[Python]
2025-02-25 23:37:50 # Python

1. 并发与并行

并发(Concurrency):指多个任务在同一时间段内交替执行,从宏观上看像是“同时”进行,但实际上是通过快速切换任务来实现的,例如有两个任务,task1正在被cpu调度,task2尽管没有被调度,但在进行I/O操作,这两个任务是并发进行的。

并行(Parallelism):指多个任务在同一时刻真正同时执行,需要多核CPU的支持,被不同CPU核心调度。

1.1 进程、线程与协程

  • 进程:操作系统资源分配的基本单位,拥有独立的内存空间。

    • 程序、数据、进程控制块PCB(堆栈)
  • 线程:操作系统调度的基本单位,共享进程的内存空间。

    • 新建(New):线程刚被创建。
    • 就绪(Runnable):线程等待CPU调度。
    • 运行(Running):线程正在执行。
    • 阻塞(Blocked):线程等待I/O操作或其他资源。
    • 终止(Terminated):线程执行完毕。
  • 协程:用户态的轻量级线程,由程序控制调度,无需操作系统介入。

1.2 I/O密集型 vs CPU密集型

  • I/O密集型:任务主要时间花在I/O操作上,如网络请求、文件读写。
  • CPU密集型:任务主要时间花在CPU计算上,如数学运算、图像处理。

1.3 Python线程与Java线程的区别

  • Python
    • 有GIL全局互斥锁,线程由调度由解释器协作式切换,线程在执行Python字节码前必须获取GIL,执行完成后释放GIL,因GIL存在,多线无法利用多核CPU的并行能力。
    • I/O密集型任务可通过多线程+异步I/O绕过GIL限制。
  • Java
    • 线程直接由操作系统调度,支持真正的多核并行

1.4 Linux的线程、进程调度

O (1) 调度器(2.5 - 2.6.23 内核版本), 维护队列数组,每个队列数组包含 140 个优先级队列,对应 140 个不同的优先级,活动队列(active queue)和过期队列(expired queue),通过位图快速查找非空优先级队列

CFS(2.6.23 内核版本至今),红黑树

Linux 操作系统普通任务实时任务通过实时调度器调度)默认通过**完全公平调度器(CFS)**管理进程和线程的调度,确保每个任务都能获得合理的运行机会,避免因任务优先级差异或运行时间过长导致的饿死现象,从而提高系统的整体响应性和公平性。

核心机制

  1. 虚拟运行时间:
    vruntime 是任务已获得的 CPU 时间经过优先级加权后的值。调度器通过选择 vruntime 最小的任务来执行,确保运行时间较少的任务优先获得 CPU 时间,从而实现公平调度。
  2. 红黑树
    所有可运行任务按 vruntime 排序存储在红黑树中,保证快速查找 vruntime 最小的任务。红黑树的自平衡特性使得插入、删除和查找操作的效率很高,为调度器的高效运行提供了支持。
  3. 动态时间片
    根据系统中任务的数量和优先级动态分配时间片。任务数量越多,时间片越小,保证每个任务都能更频繁地获得 CPU 时间;任务优先级越高,时间片越长,但其 vruntime 增长得更慢,进一步确保公平性。

1.4.1 线程->轻量级进程 LWP

在 Linux 中,线程被视为轻量级进程(Lightweight Process, LWP),与进程一视同仁,均由task_struct描述的,只不过对于LWP而言,同一线程组的线程共享内存管理、文件系统相关的描述符。

task_struct简化定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
struct task_struct {
// 进程/线程标识
pid_t pid; // 进程 ID
pid_t tgid; // 线程组 ID(进程 ID)
pid_t ppid; // 父进程 ID
uid_t uid; // 用户 ID
gid_t gid; // 组 ID

// 调度信息
int prio; // 动态优先级
int static_prio; // 静态优先级
u64 vruntime; // 虚拟运行时间
struct sched_entity se; // 调度实体

// 内存管理
struct mm_struct *mm; // 内存描述符
pgd_t *pgd; // 页表

// 文件系统
struct files_struct *files; // 文件描述符表
struct fs_struct *fs; // 文件系统信息

// 信号处理
struct signal_struct *signal; // 信号处理函数
sigset_t blocked; // 信号掩码
struct sigpending pending; // 挂起信号

// 进程状态
volatile long state; // 进程状态
int exit_code; // 退出状态码

// 上下文信息
struct pt_regs *regs; // CPU 寄存器
unsigned long sp; // 栈指针

// 资源使用
u64 utime; // 用户态 CPU 时间
u64 stime; // 内核态 CPU 时间
unsigned long rss; // 物理内存使用

// 线程信息
struct list_head thread_group; // 线程组
struct thread_struct thread; // 线程局部存储
};

1.4.2 从task_struct看为什么LWP上下文切换开销小?

  • 关键字段mm_struct *mm(内存描述符)、pgd_t *pgd(页表)。
    同一进程下的所有线程共享相同的地址空间(即共享mmpgd)。在上下文切换时:

    • 进程切换:需要切换mmpgd,这会触发页表切换TLB刷新,导致显著的性能开销。
    • 线程切换:无需切换mmpgd,避免了上述操作,减少了内存管理相关的开销。
  • 关键字段files_struct *files(文件描述符表)、fs_struct *fs(文件系统信息)。
    同一进程的线程共享文件描述符表和文件系统上下文(如当前工作目录、根目录等)。线程切换时:

    • 进程切换:需要切换filesfs,涉及文件描述符表的更新和文件系统状态的重新加载。
    • 线程切换:无需切换这些资源,减少了文件系统相关的开销。

1.4.3 调度队列与状态转换

  • 运行队列(Runqueue):存储所有可运行状态的进程。每个CPU核心都有一个独立的运行队列。
  • 阻塞队列:因I/O或资源等待被阻塞的进程/线程,当事件完成时移回就绪队列。
  • 状态迁移
    • 就绪 → 运行:被调度器选中,分配CPU时间片。
    • 运行 → 就绪:时间片用完或被更高优先级任务抢占。
    • 运行 → 阻塞:主动等待资源(如I/O操作)。
    • 阻塞 → 就绪:等待的资源就绪。

2. ThreadPoolExecutor并发

2.1 工作模式

  • 任务队列:线程池维护一个任务队列,线程从队列中获取任务执行。
  • 线程切换:操作系统负责线程的调度和切换

2.2 submit 提交任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import time
from concurrent.futures import ThreadPoolExecutor


def cpu_task():
while True:
continue


def io_task(i):
print(f"线程任务 {i} 开始执行")
time.sleep(1) # 模拟 1 秒的 I/O 等待时间
print(f"线程任务 {i} 执行完毕")


thread_executor = ThreadPoolExecutor(max_workers=10)
start = time.time()
io_futures = []
for i in range(10):
future = thread_executor.submit(io_task, i)
io_futures.append(future)

try:
# 等待所有 I/O 密集型任务完成
for future in io_futures:
future.result()
print("用时", time.time() - start)
except Exception as e:
print(f"执行任务时发生异常: {e}")
finally:
# 关闭线程池,不再接受新的任务
thread_executor.shutdown()

运行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

线程任务 0 开始执行
线程任务 1 开始执行
线程任务 2 开始执行
线程任务 3 开始执行
线程任务 4 开始执行
线程任务 5 开始执行
线程任务 6 开始执行
线程任务 7 开始执行
线程任务 8 开始执行
线程任务 9 开始执行
线程任务 5 执行完毕线程任务 1 执行完毕
线程任务 6 执行完毕
线程任务 4 执行完毕
线程任务 3 执行完毕线程任务 9 执行完毕
线程任务 0 执行完毕线程任务 8 执行完毕线程任务 7 执行完毕


线程任务 2 执行完毕


用时 1.013387680053711

print不是线程安全的,输出结果有些混乱

2.3 add_done_callback回调

回调函数需要有一个参数接受future对象

1
2
3
4
5
6
7
8
9
10
def callback(future):
try:
result = future.result()
print(f"任务完成,结果: {result}")
except Exception as e:
print(f"任务失败,异常: {e}")

for i in range(10):
future = thread_executor.submit(io_task, i)
future.add_done_callback(callback)

2.4 超时控制

1
2
3
4
5
try:
result = future.result(timeout=1) # 设置超时时间为1秒
print(f"任务完成,结果: {result}")
except TimeoutError:
print("任务超时")

3. asyncio并发

3.1 工作模式

事件循环是asyncio的核心,它是一个无限循环,用于管理和调度异步任务。

  • 就绪队列:事件循环维护一个就绪队列,存储可以立即执行的任务。

  • 等待队列:当任务遇到 await 时,会被移到等待队列,并注册一个回调函数。

  • I/O 多路复用:事件循环使用操作系统提供的 I/O 多路复用机制(如 selectepollkqueue)监听 I/O 事件。

  • 回调机制:当 I/O 操作完成时,事件循环会调用注册的回调函数,将任务重新加入就绪队列。

3.2 基本使用

  • 协程:在 Python 中,使用asyncawait关键字来定义协程。async关键字用于定义一个协程函数,而await关键字用于暂停协程的执行,等待一个异步操作完成。
  • 任务:是对协程的进一步封装,它用于在事件循环中调度和执行协程。可以使用asyncio.create_task()函数将一个协程包装成一个任务,并将其添加到事件循环中。任务可以用来并发地执行多个协程,并且可以方便地获取协程的执行结果、处理异常等。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio

async def my_coroutine():
print("开始执行协程")
await asyncio.sleep(1)
print("协程执行结束")
return "结果"

async def main():
task = asyncio.create_task(my_coroutine())
result = await task
print(result)

asyncio.run(main())


开始执行协程
协程执行结束
结果

使用asyncio.gather同时运行多个协程

是否使用gather的对比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio
import time


async def task1():
await asyncio.sleep(1)
return "Result from task1"

async def task2():
await asyncio.sleep(2)
return "Result from task2"

async def main():
start = time.time()
await task1()
await task2()
print("耗时", time.time() - start)

asyncio.run(main())


耗时 3.0184578895568848
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio
import time


async def task1():
await asyncio.sleep(1)
return "Result from task1"

async def task2():
await asyncio.sleep(2)
return "Result from task2"

async def main():
start = time.time()
results = await asyncio.gather(task1(), task2())
print("耗时", time.time() - start)
for result in results:
print(f"Handling result: {result}")

asyncio.run(main())


耗时 2.0149099826812744
Handling result: Result from task1
Handling result: Result from task2

3.3 超时控制

Python3.8之后协程对象要先封装成tasks,再传给asyncio.wait()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import asyncio
import time

from tensorflow.python.data.experimental.ops.testing import sleep


# 定义一个异步的 I/O 密集型任务函数
async def io_task(i):
print(f"协程任务 {i} 开始执行")
await asyncio.sleep(2) # 模拟 1 秒的 I/O 等待时间
print(f"协程任务 {i} 执行完毕")
return i # 返回任务编号

async def main():
start = time.time()
tasks = [asyncio.create_task(io_task(i)) for i in range(10)] # 创建 10 个任务

# 使用 asyncio.wait 设置超时时间为 1.5 秒
done, pending = await asyncio.wait(tasks, timeout=1.5)

# 输出已完成和未完成的任务
print(f"已完成的任务数量: {len(done)}")
print(f"未完成的任务数量: {len(pending)}")

# 取消未完成的任务
for task in pending:
task.cancel()

print("用时", time.time() - start)
# sleep(1)

if __name__ == "__main__":
asyncio.run(main())

3.4 10000线程vs10000协程简单性能对比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import time
from concurrent.futures import ThreadPoolExecutor


def cpu_task():
while True:
continue


def io_task(i):
time.sleep(1) # 模拟 1 秒的 I/O 等待时间


thread_executor = ThreadPoolExecutor(max_workers=10000)
start = time.time()
io_futures = []
for i in range(10000):
future = thread_executor.submit(io_task, i)
io_futures.append(future)

try:
# 等待所有 I/O 密集型任务完成
for future in io_futures:
future.result()
print("用时", time.time() - start)
except Exception as e:
print(f"执行任务时发生异常: {e}")
finally:
# 关闭线程池,不再接受新的任务
thread_executor.shutdown()

用时 3.834956645965576
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio
import time


async def io_task(i):
await asyncio.sleep(1) # 模拟1秒的I/O等待时间


async def main():
start = time.time()
tasks = [io_task(i) for i in range(10000)] # 创建10000个协程任务
await asyncio.gather(*tasks) # 等待所有任务完成
print("用时", time.time() - start)


if __name__ == "__main__":
asyncio.run(main())

用时 1.2120013236999512

4. Linux相关命令

4.1 CPU - 物理核心数 - 逻辑核心数量

1
lscpu

image-20250225203331897

2个28核CPU,超线程技术,每个物理核心2个逻辑核心


4.2 ps命令

查看手册

1
man ps 

image-20250225204011847

1
ps aux
  • ps:表示“Process Status”,用于显示系统中运行的进程信息。
  • a:显示所有终端上的进程(包括其他用户的进程)。
  • u:显示进程的用户(所有者)信息。
  • x:显示没有控制终端的进程(例如后台服务)。

image-20250225205135745


4.3 top命令

实时显示系统中各个进程资源占用情况

  • RES:进程占用的物理内存大小(Resident Set Size),单位为KiB。表示进程实际占用的物理内存
  • Shift + MRES降序排列

image-20250225204600312

参考

o(1)调度器 Linux如何调度进程?大学老师不讲的,看完动画秒懂!_哔哩哔哩_bilibili

Linux ps命令04 进程优先级及nice_哔哩哔哩_bilibili

CPU相关概念:物理cpu数、核数、逻辑cpu数,12核20线程实例分析-CSDN博客