1. 并发与并行
并发(Concurrency):指多个任务在同一时间段内交替执行,从宏观上看像是“同时”进行,但实际上是通过快速切换任务来实现的,例如有两个任务,task1正在被cpu调度,task2尽管没有被调度,但在进行I/O操作,这两个任务是并发进行的。
并行(Parallelism):指多个任务在同一时刻真正同时执行,需要多核CPU的支持,被不同CPU核心调度。
1.1 进程、线程与协程
进程:操作系统资源分配的基本单位,拥有独立的内存空间。
线程:操作系统调度的基本单位,共享进程的内存空间。
- 新建(New):线程刚被创建。
- 就绪(Runnable):线程等待CPU调度。
- 运行(Running):线程正在执行。
- 阻塞(Blocked):线程等待I/O操作或其他资源。
- 终止(Terminated):线程执行完毕。
协程:用户态的轻量级线程,由程序控制调度,无需操作系统介入。
1.2 I/O密集型 vs CPU密集型
- I/O密集型:任务主要时间花在I/O操作上,如网络请求、文件读写。
- CPU密集型:任务主要时间花在CPU计算上,如数学运算、图像处理。
1.3 Python线程与Java线程的区别
- Python:
- 有GIL全局互斥锁,线程由调度由解释器协作式切换,线程在执行Python字节码前必须获取GIL,执行完成后释放GIL,因GIL存在,多线无法利用多核CPU的并行能力。
- I/O密集型任务可通过多线程+异步I/O绕过GIL限制。
- Java:
1.4 Linux的线程、进程调度
O (1) 调度器(2.5 - 2.6.23 内核版本), 维护队列数组,每个队列数组包含 140 个优先级队列,对应 140 个不同的优先级,活动队列(active queue)和过期队列(expired queue),通过位图快速查找非空优先级队列
CFS(2.6.23 内核版本至今),红黑树
Linux 操作系统普通任务(实时任务通过实时调度器调度)默认通过**完全公平调度器(CFS)**管理进程和线程的调度,确保每个任务都能获得合理的运行机会,避免因任务优先级差异或运行时间过长导致的饿死现象,从而提高系统的整体响应性和公平性。
核心机制:
- 虚拟运行时间:
vruntime 是任务已获得的 CPU 时间经过优先级加权后的值。调度器通过选择 vruntime 最小的任务来执行,确保运行时间较少的任务优先获得 CPU 时间,从而实现公平调度。
- 红黑树:
所有可运行任务按 vruntime 排序存储在红黑树中,保证快速查找 vruntime 最小的任务。红黑树的自平衡特性使得插入、删除和查找操作的效率很高,为调度器的高效运行提供了支持。
- 动态时间片:
根据系统中任务的数量和优先级动态分配时间片。任务数量越多,时间片越小,保证每个任务都能更频繁地获得 CPU 时间;任务优先级越高,时间片越长,但其 vruntime 增长得更慢,进一步确保公平性。
1.4.1 线程->轻量级进程 LWP
在 Linux 中,线程被视为轻量级进程(Lightweight Process, LWP),与进程一视同仁,均由task_struct
描述的,只不过对于LWP而言,同一线程组的线程共享内存管理、文件系统相关的描述符。
task_struct简化定义如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| struct task_struct { // 进程/线程标识 pid_t pid; // 进程 ID pid_t tgid; // 线程组 ID(进程 ID) pid_t ppid; // 父进程 ID uid_t uid; // 用户 ID gid_t gid; // 组 ID
// 调度信息 int prio; // 动态优先级 int static_prio; // 静态优先级 u64 vruntime; // 虚拟运行时间 struct sched_entity se; // 调度实体
// 内存管理 struct mm_struct *mm; // 内存描述符 pgd_t *pgd; // 页表
// 文件系统 struct files_struct *files; // 文件描述符表 struct fs_struct *fs; // 文件系统信息
// 信号处理 struct signal_struct *signal; // 信号处理函数 sigset_t blocked; // 信号掩码 struct sigpending pending; // 挂起信号
// 进程状态 volatile long state; // 进程状态 int exit_code; // 退出状态码
// 上下文信息 struct pt_regs *regs; // CPU 寄存器 unsigned long sp; // 栈指针
// 资源使用 u64 utime; // 用户态 CPU 时间 u64 stime; // 内核态 CPU 时间 unsigned long rss; // 物理内存使用
// 线程信息 struct list_head thread_group; // 线程组 struct thread_struct thread; // 线程局部存储 };
|
1.4.2 从task_struct
看为什么LWP上下文切换开销小?
1.4.3 调度队列与状态转换
- 运行队列(Runqueue):存储所有可运行状态的进程。每个CPU核心都有一个独立的运行队列。
- 阻塞队列:因I/O或资源等待被阻塞的进程/线程,当事件完成时移回就绪队列。
- 状态迁移:
- 就绪 → 运行:被调度器选中,分配CPU时间片。
- 运行 → 就绪:时间片用完或被更高优先级任务抢占。
- 运行 → 阻塞:主动等待资源(如I/O操作)。
- 阻塞 → 就绪:等待的资源就绪。
2. ThreadPoolExecutor并发
2.1 工作模式
- 任务队列:线程池维护一个任务队列,线程从队列中获取任务执行。
- 线程切换:操作系统负责线程的调度和切换
2.2 submit 提交任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| import time from concurrent.futures import ThreadPoolExecutor
def cpu_task(): while True: continue
def io_task(i): print(f"线程任务 {i} 开始执行") time.sleep(1) # 模拟 1 秒的 I/O 等待时间 print(f"线程任务 {i} 执行完毕")
thread_executor = ThreadPoolExecutor(max_workers=10) start = time.time() io_futures = [] for i in range(10): future = thread_executor.submit(io_task, i) io_futures.append(future)
try: # 等待所有 I/O 密集型任务完成 for future in io_futures: future.result() print("用时", time.time() - start) except Exception as e: print(f"执行任务时发生异常: {e}") finally: # 关闭线程池,不再接受新的任务 thread_executor.shutdown()
|
运行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| 线程任务 0 开始执行 线程任务 1 开始执行 线程任务 2 开始执行 线程任务 3 开始执行 线程任务 4 开始执行 线程任务 5 开始执行 线程任务 6 开始执行 线程任务 7 开始执行 线程任务 8 开始执行 线程任务 9 开始执行 线程任务 5 执行完毕线程任务 1 执行完毕 线程任务 6 执行完毕 线程任务 4 执行完毕 线程任务 3 执行完毕线程任务 9 执行完毕 线程任务 0 执行完毕线程任务 8 执行完毕线程任务 7 执行完毕
线程任务 2 执行完毕
用时 1.013387680053711
|
print不是线程安全的,输出结果有些混乱
2.3 add_done_callback回调
回调函数需要有一个参数接受future对象
1 2 3 4 5 6 7 8 9 10
| def callback(future): try: result = future.result() print(f"任务完成,结果: {result}") except Exception as e: print(f"任务失败,异常: {e}")
for i in range(10): future = thread_executor.submit(io_task, i) future.add_done_callback(callback)
|
2.4 超时控制
1 2 3 4 5
| try: result = future.result(timeout=1) # 设置超时时间为1秒 print(f"任务完成,结果: {result}") except TimeoutError: print("任务超时")
|
3. asyncio并发
3.1 工作模式
事件循环是asyncio
的核心,它是一个无限循环,用于管理和调度异步任务。
就绪队列:事件循环维护一个就绪队列,存储可以立即执行的任务。
等待队列:当任务遇到 await
时,会被移到等待队列,并注册一个回调函数。
I/O 多路复用:事件循环使用操作系统提供的 I/O 多路复用机制(如 select
、epoll
、kqueue
)监听 I/O 事件。
回调机制:当 I/O 操作完成时,事件循环会调用注册的回调函数,将任务重新加入就绪队列。
3.2 基本使用
- 协程:在 Python 中,使用
async
和await
关键字来定义协程。async
关键字用于定义一个协程函数,而await
关键字用于暂停协程的执行,等待一个异步操作完成。
- 任务:是对协程的进一步封装,它用于在事件循环中调度和执行协程。可以使用
asyncio.create_task()
函数将一个协程包装成一个任务,并将其添加到事件循环中。任务可以用来并发地执行多个协程,并且可以方便地获取协程的执行结果、处理异常等。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| import asyncio
async def my_coroutine(): print("开始执行协程") await asyncio.sleep(1) print("协程执行结束") return "结果"
async def main(): task = asyncio.create_task(my_coroutine()) result = await task print(result)
asyncio.run(main())
开始执行协程 协程执行结束 结果
|
使用asyncio.gather
同时运行多个协程
是否使用gather的对比
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| import asyncio import time
async def task1(): await asyncio.sleep(1) return "Result from task1"
async def task2(): await asyncio.sleep(2) return "Result from task2"
async def main(): start = time.time() await task1() await task2() print("耗时", time.time() - start)
asyncio.run(main())
耗时 3.0184578895568848
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| import asyncio import time
async def task1(): await asyncio.sleep(1) return "Result from task1"
async def task2(): await asyncio.sleep(2) return "Result from task2"
async def main(): start = time.time() results = await asyncio.gather(task1(), task2()) print("耗时", time.time() - start) for result in results: print(f"Handling result: {result}")
asyncio.run(main())
耗时 2.0149099826812744 Handling result: Result from task1 Handling result: Result from task2
|
3.3 超时控制
Python3.8之后协程对象要先封装成tasks,再传给asyncio.wait()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| import asyncio import time
from tensorflow.python.data.experimental.ops.testing import sleep
# 定义一个异步的 I/O 密集型任务函数 async def io_task(i): print(f"协程任务 {i} 开始执行") await asyncio.sleep(2) # 模拟 1 秒的 I/O 等待时间 print(f"协程任务 {i} 执行完毕") return i # 返回任务编号
async def main(): start = time.time() tasks = [asyncio.create_task(io_task(i)) for i in range(10)] # 创建 10 个任务
# 使用 asyncio.wait 设置超时时间为 1.5 秒 done, pending = await asyncio.wait(tasks, timeout=1.5)
# 输出已完成和未完成的任务 print(f"已完成的任务数量: {len(done)}") print(f"未完成的任务数量: {len(pending)}")
# 取消未完成的任务 for task in pending: task.cancel()
print("用时", time.time() - start) # sleep(1)
if __name__ == "__main__": asyncio.run(main())
|
3.4 10000线程vs10000协程简单性能对比
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| import time from concurrent.futures import ThreadPoolExecutor
def cpu_task(): while True: continue
def io_task(i): time.sleep(1) # 模拟 1 秒的 I/O 等待时间
thread_executor = ThreadPoolExecutor(max_workers=10000) start = time.time() io_futures = [] for i in range(10000): future = thread_executor.submit(io_task, i) io_futures.append(future)
try: # 等待所有 I/O 密集型任务完成 for future in io_futures: future.result() print("用时", time.time() - start) except Exception as e: print(f"执行任务时发生异常: {e}") finally: # 关闭线程池,不再接受新的任务 thread_executor.shutdown()
用时 3.834956645965576
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| import asyncio import time
async def io_task(i): await asyncio.sleep(1) # 模拟1秒的I/O等待时间
async def main(): start = time.time() tasks = [io_task(i) for i in range(10000)] # 创建10000个协程任务 await asyncio.gather(*tasks) # 等待所有任务完成 print("用时", time.time() - start)
if __name__ == "__main__": asyncio.run(main()) 用时 1.2120013236999512
|
4. Linux相关命令
4.1 CPU - 物理核心数 - 逻辑核心数量

2个28核CPU,超线程技术,每个物理核心2个逻辑核心
4.2 ps命令
查看手册

ps
:表示“Process Status”,用于显示系统中运行的进程信息。
a
:显示所有终端上的进程(包括其他用户的进程)。
u
:显示进程的用户(所有者)信息。
x
:显示没有控制终端的进程(例如后台服务)。

4.3 top命令
实时显示系统中各个进程资源占用情况
RES
:进程占用的物理内存大小(Resident Set Size),单位为KiB。表示进程实际占用的物理内存
Shift + M
按RES
降序排列

参考
o(1)调度器 Linux如何调度进程?大学老师不讲的,看完动画秒懂!_哔哩哔哩_bilibili
Linux ps命令04 进程优先级及nice_哔哩哔哩_bilibili
CPU相关概念:物理cpu数、核数、逻辑cpu数,12核20线程实例分析-CSDN博客