一、进程(Process)和线程(Thread)的基本概念
1. 进程(Process)
- 定义:进程是操作系统分配资源的最小单位,每个进程有独立的内存空间、文件句柄和系统资源。
- 特点:
- 隔离性:进程之间相互隔离,一个进程崩溃不会影响其他进程。
- 资源开销大:创建和切换进程需要较高的系统资源。
- 适用于 CPU 密集型任务:多进程可充分利用多核 CPU 的并行计算能力。
类比:一个工厂,一个车间,一名工人。 == 一个cpu,一个进程,一个线程
2. 线程(Thread)
- 定义:线程是操作系统调度的最小单位,属于同一进程的多个线程共享进程的内存和资源。
- 特点:
- 轻量级:创建和切换线程的开销远小于进程。
- 共享内存:线程之间可以直接读写同一进程的数据,但需要同步机制(如锁)避免冲突。
- 适用于 IO 密集型任务:例如网络请求、文件读写等需要等待外部操作的任务。
类比:一个工厂,一个车间,一名工人。 == 一个cpu,一个进程,一个线程
二、全局解释器锁(GIL)
1. GIL 是什么?
- 定义:GIL(Global Interpreter Lock)是 Python 解释器(CPython)中的一个全局互斥锁,用于保护 Python 对象的内存安全。
- 作用:同一时刻只能有一个线程执行 Python 字节码,避免多线程并发操作导致的数据不一致问题。
三,线程
1.start()当前线程准备就绪(等待cpu调度,具体时间由cpu决定)
import threading
loop = 100000000
number = 0
def _add(count):
global number
for i in range(count):
number += 1
t=threading.Thread(target=_add,args=(loop,))
t.start() #当前线程准备就绪(等待cpu调度,具体时间由cpu决定)
print(number)
2.join()等待子线程结束,主线程才会结束
#等待当前线程执行完成后再向下继续执行
import threading
"""
loop = 100000000
number = 0
def _add(count):
global number
for i in range(count):
number += 1
t=threading.Thread(target=_add,args=(loop,))
t.start()
t.join() #主线程等待中....
print(number)
"""
"""
number = 0
def _add():
global number
for i in range(10000000):
number += 1
def _sub():
global number
for i in range(10000000):
number -= 1
t1=threading.Thread(target=_add)
t2=threading.Thread(target=_sub)
t1.start()
t1.join() #t1执行完毕才会继续往下走
t2.start()
t2.join() #t2执行完毕才会继续往下走
print(number)
"""
number = 0
loop = 10000000
def _add(count):
global number
for i in range(count):
number += 1
def _sub(count):
global number
for i in range(count):
number -= 1
t1=threading.Thread(target=_add,args=(loop,))
t2=threading.Thread(target=_sub,args=(loop,))
t1.start()
t2.start()
t1.join() #t1执行完毕才会继续往下走
t2.join() #t2执行完毕才会继续往下走
print(number)
setDaemon守护线程
3.线程安全
import threading
num = 0
lock_object = threading.RLock()
def tesk():
print('开始')
lock_object.acquire()
lock_object.acquire()
global num
for i in range(10000000):
num += 1
lock_object.release()
lock_object.release()
print(num)
for i in range(2):
t = threading.Thread(target=tesk)
t.start()
Lock同步锁:Lock不支持嵌套锁 RLock递归锁:Rlock支持嵌套锁
4.线程池
import os
os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE'
import time
from concurrent.futures import ThreadPoolExecutor,Future
import random
def task(video_url):
print('开始执行任务',video_url)
time.sleep(5)
return random.randint(0,10)
#创建线程池,最多维护10个线程
pool = ThreadPoolExecutor(10)
future_list = []
url_list = ['www.xxx-{}.com'.format(i) for i in range(15)]
for url in url_list:
#在线程中提交一个任务,线程池中如果有空闲线程,则分配一个线程去执行,执行完毕后再将线程交还给线程池,如果没有空闲线程,则等待
#pool.submit(函数名,参数1,参数2,参数3,参数...)
future = pool.submit(task,url)
future_list.append(future)
pool.shutdown(True)
for fu in future_list:
print(fu.result())
四、进程
1.概念
multiprocessiong模块操纵进程
三种模式:
fork(mac):用来拷贝
spawn(windows):如果想创建一个子进程,想让这个子进程去干某个事,它会在内部创建一个python解释器,让python解释器去运行想要实现某个功能
forkserver(unix):父进程启动一个“服务器进程”,所有子进程都由该服务器通过 fork() 创建,避免了直接复制多线程父进程的风险。
.start()
.join()与线程功能相同
2.daemon守护线程
#daemon守护进程
import multiprocessing
import time
from multiprocessing import Process
def task(arg):
time.sleep(2)
print('1234')
if __name__ == '__main__':
multiprocessing.set_start_method('spawn')
p = Process(target=task,args=('xxx',))
p.daemon = True
p.start()
print('1010')
daemon=Ture 设置为守护线程,主进程执行完毕后,子进程也自动关闭
daemon=False 设置为非守护进程,主进程等待子进程
3.进程锁
import time
import multiprocessing
def task(lock):
print('开始')
with lock:
# 假设文件中保存的内容就是一个值:10
with open(r'D:\新建项目\学习\线程与进程\进程\f1.txt',mode='r',encoding='utf-8') as f:
current_num = int(f.read())
print('排队抢票了')
time.sleep(0.5)
current_num -=1
with open(r'D:\新建项目\学习\线程与进程\进程\f1.txt',mode='w',encoding='utf-8') as f:
f.write(str(current_num))
if __name__ == '__main__':
multiprocessing.set_start_method('spawn')
lock =multiprocessing.RLock()
processes_list = []
for i in range(10):
p = multiprocessing.Process(target=task,args=(lock,))
p.start()
processes_list.append(p)
for iten in processes_list:
iten.join()
4.进程池
import time
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import multiprocessing
def task(num):
print('开始',num)
time.sleep(2)
"""
if __name__ =="__main__":
pool = ProcessPoolExecutor(4)
for i in range(10):
pool.submit(task,i)
# #等待进程池中的任务完成后,再继续往下进行
# pool.shutdown(True)
# print(1)
"""
def done(ree):
print(multiprocessing.current_process())
time.sleep(1)
print(ree.result)
time.sleep(1)
if __name__ =="__main__":
pool = ProcessPoolExecutor(4)
for i in range(10):
f = pool.submit(task,i)
f.add_done_callback(done) #done的调用由主进程处理(与线程池不同)
print(multiprocessing.current_process())
pool.shutdown(True)
"""
如果在进程池中的进程锁,则要基于Manger的Lock和Rlock
"""
为什么要使用Manger的Lock和Rlock,而使用multiprocessing和threading的就会报错
1. 进程间共享内存的隔离性
- 每个进程有独立的内存空间,普通锁(如
multiprocessing.Lock
)仅在创建它的进程内有效。 - 当进程池的子进程尝试使用主进程创建的锁时,由于内存隔离,子进程无法直接访问主进程的锁对象,导致同步失败或报错。
2. 序列化与传递问题
- 进程池中的任务参数需要能被Pickle序列化。普通的
multiprocessing.Lock
对象不可被序列化,传递时会引发PicklingError
。 Manager
创建的锁实际是一个代理对象(Proxy),它通过IPC(进程间通信)连接到Manager服务器进程中的真实锁,代理对象可以被序列化,从而安全传递到子进程。
3. Manager
的中间层作用
Manager
启动了一个独立的服务器进程,用于管理共享对象(如锁)。- 所有进程通过代理访问同一把锁,确保锁的状态在所有进程间同步,无论子进程以
fork
还是spawn
方式启动。
4. threading.Lock
的线程局限性
threading.Lock
仅用于同一进程内的线程同步,无法跨进程工作。在进程池中使用它会导致未定义行为或错误。
Comments NOTHING