一条流水线的工作过程,cpu的最小执行单位
线程的创建与进程相同
线程的效率非常高,并且线程开启不需要消耗什么资源
from threading import Thread
import time
def func(n):
print(n)
if __name__ == '__main__':
for i in range(10000):
t = Thread(target=func, args=(i,))
t.start()
print("主进程运行完毕")
'''线程类 继承 Theard '''
class MyThread(Thread):
def __init__(self,name):
super().__init__()
self.name = name
# 线程中 必须重写 run 方法
def run(self):
time.sleep(1)
print(self.name)
if __name__ == '__main__':
thread = MyThread("张无忌")
# 开启线程
thread.start()
# 等待 线程运行完毕才 向后运行
thread.join()
time.sleep(1)
print("主进程运行")
import time
from threading import Thread
from multiprocessing import Process
num = 100
def func():
global num
num = 0
if __name__ == '__main__':
t = Thread(target=func,)
t.start()
t.join()
print(num) # 0
保证数据安全,但是牺牲了效率,同步执行锁内的代码
互相抢到了对方的需要的锁,导致双方相互等待,程序没法进行
import time
from threading import Thread, Lock
class MyThread(Thread):
def __init__(self, lockA, lockB):
super().__init__()
self.lockA = lockA
self.lockB = lockB
def run(self):
self.f1()
self.f2()
def f1(self):
self.lockA.acquire()
print('我拿了A锁')
self.lockB.acquire()
print('我是一个很好的客户!')
self.lockB.release()
self.lockA.release()
def f2(self):
self.lockB.acquire()
time.sleep(0.1)
print('我拿到了B锁')
self.lockA.acquire()
print('我是一名合格的技师')
self.lockA.release()
self.lockB.release()
if __name__ == '__main__':
lockA = Lock()
lockB = Lock()
t1 = MyThread(lockA, lockB)
t1.start()
t2 = MyThread(lockA, lockB)
t2.start()
print('我是经理')
可以多次acquire,通过一个计数器来记录被锁了多少次,只有计数器为0的时候,大家才能继续抢锁
from threading import Thread, RLock
import time
class MyThread(Thread):
def __init__(self, lookA, lookB):
super().__init__()
self.lockA = lookA
self.lockB = lookB
def run(self):
self.f1()
self.f2()
def f1(self):
self.lockA.acquire()
print("我拿到了锁A")
self.lockB.acquire()
print("我是f1")
self.lockB.release()
self.lockA.release()
def f2(self):
self.lockB.acquire()
print("我拿到了锁B")
time.sleep(1)
self.lockA.acquire()
print("我是f2")
self.lockA.release()
self.lockB.release()
if __name__ == '__main__':
lockA = lockB = RLock()
t1 = MyThread(lockA, lockB)
t1.start()
t2 = MyThread(lockA, lockB)
t2.start()
print("我是劳保")
主线程等待所有非守护线程的结束才结束,主线程的代码运行结束,还要等待非守护线程的执行完毕.这个过程中守护线程还存在
所有非守护线程结束才结束,主线程的代码结束,只要还有非守护线程,那么守护线程也不会结束
from threading import Thread
import time
def func1():
time.sleep(3)
print(11111111)
def func2(aa):
time.sleep(1)
print(2222222222)
if __name__ == '__main__':
t = Thread(target=func1,)
t.start()
t1 = Thread(target=func2,)
t.daemon = True
t1.start()
print("主进程")
控制同时能够进入锁内去执行代码的线程数量(进程数量),维护了一个计数器,刚开始创建信号量的时候假如设置的是4个房间,进入一次acquire就减1 ,出来一个就+1,如果计数器为0,那么其他的任务等待,这样其他的任务和正在执行的任务是一个同步的状态,而进入acquire里面去执行的那4个任务是异步执行的.
import time
from threading import Thread, Semaphore
def func1(s):
s.acquire()
time.sleep(1)
print('大宝剑!!!')
s.release()
if __name__ == '__main__':
# 设置要 同时 运行线程的 个数
s = Semaphore(4)
for i in range(10):
t = Thread(target=func1,args=(s,))
t.start()
主线程等待子线程的 原因
import time
from threading import Thread
from multiprocessing import Process
def func(n):
time.sleep(5)
print(n)
if __name__ == '__main__':
# 主线程等待的是子线程的任务全部执行完毕
t = Thread(target=func, args=('我是子线程',))
t.start() # 速度非常快
# 主进程等待的是给子进程收尸
# p = Process(target=func,args=('我是子进程',))
# p.start()
# print('主进程结束!!')
print('主线程结束')
from threading import Thread
import threading
def func():
# 当前线程
print("子线程 id >>>>", threading.get_ident())
# 当前线程的对象
print("当前线程 >>>>", threading.current_thread())
# 子线程的名字
print("子线程 name>>>", threading.current_thread().getName()) # Thread-1
# 主线程的对象
print("主线程对象>>>", threading.main_thread())
print("当前线程id 号>>>>>>>>",threading.get_ident())
if __name__ == '__main__':
t = Thread(target=func, )
t.start()
# 当前正在运行的所有线程
print("正在运行的所有线程>>>", threading.enumerate())
# 开启线程 数量
print("以开启线程 数量>>>", threading.active_count())
# 当前线程的对象
print("当前线程 name>>>>", threading.current_thread().getName())
# 主线程的对象
print("主线程的对象>>>", threading.main_thread())
# 当前线程id 号
print("当前线程id 号>>>>>>>>",threading.get_ident())
print(threading.stack_size())
from threading import Thread, Event
e = Event() # e的状态有两种,False True,当事件对象的状态为False的时候,wait的地方会阻塞
e.set() # 将事件对象的状态改为True
e.clear() # 将事件对象的状态改为Flase
print('在这里等待')
e.wait() # 阻塞
print('还没好!!')
import queue
import queue
#先进先出 FIFO
q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')
# q.put_nowait() #没有数据就报错,可以通过try来搞
print(q.get())
print(q.get())
print(q.get())
# q.get_nowait() #没有数据就报错,可以通过try来搞
'''
first
second
third
'''
import queue
q=queue.LifoQueue() #队列,类似于栈,栈我们提过吗,是不是先进后出的顺序啊
q.put('first')
q.put('second')
q.put('third')
# q.put_nowait()
print(q.get())
print(q.get())
print(q.get())
# q.get_nowait()
'''
third
second
first
'''
import queue
# 优先级 队列
q = queue.PriorityQueue()
# put放入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
# 不能是字典
q1 = queue.LifoQueue()
# q.put((3,"d"))
# q.put((2,"c"))
# q.put((1,"f"))
# 如果两个值的优先级一样,那么按照后面的值的acsii码顺序来排序,如果字符串第一个数元素相同,比较第二个元素的acsii码顺序
q.put((10,"f"))
q.put((10,"c"))
# 优先级相同的两个数据,他们后面的值必须是相同的数据类型才能比较,可以是元祖,也是通过元素的ascii码顺序来排序
q.put((100,"f"))
q.put((110,"c"))
# print(q.get())
print(q.get())
print(q.get())
切换 ProcessPoolExecutor, ThreadPoolExecutor 为 进程池和线程池
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time
def func(n):
time.sleep(1)
print(n)
return n**2
if __name__ == '__main__':
pp = ThreadPoolExecutor()
pp_list = []
res = pp.submit(func,1)
# print(res)
for i in range(10):
res = pp.submit(func,i) # 异步提交 10 个任务
pp_list.append(res)
pp.shutdown() # 等待任务全部执行完 = close + join
print(pp_list) # <Future >> 将来的 at 0x15b33b4dc50 state=running> >>正在运行 <Future at 0x15b33bb9e80 state=pending> >> 挂起
# time.sleep(5)
print([i.result() for i in pp_list])
for res in pp_list:
print(res.result()) # 跟 res.get() 一样
print(pp_list)
# <Future at 0x15b33b4dc50 state=finished returned int>
# finished returned int 结束 返回 int
异步执行的,map自带join功能
from threading import current_thread
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def func(n):
time.sleep(1)
return n ** 2
if __name__ == '__main__':
# 开启线程池 个数 4 个
t = ThreadPoolExecutor(max_workers=4)
gen = t.map(func, range(10)) # 异步执行的,map自带join功能
print(gen) # <generator object Executor.map.<locals>.result_iterator at 0x0000020F83747308>
for i in range(10):
print(i)
print([ii for ii in gen])
# Executor 线程池
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import time
# 当前 线程
from threading import current_thread
def func(n):
time.sleep(1)
# 获取当前进程的名字
# print(current_thread().getName())
print("current_thread>>>>", current_thread().name)
# print(n)
return n ** 2
def func2(n):
# 回调函数
print(n.result())
# 获取当前进程的名字
print("current_thread>>>>", current_thread().name) # ThreadPoolExecutor-0_0
if __name__ == '__main__':
# 开启进程池
# pp = ProcessPoolExecutor(max_workers=4)
# 开启 线程池
pp = ThreadPoolExecutor(max_workers=4)
for i in range(10):
# 回调函数的使用
res = pp.submit(func, i).add_done_callback(func2)
# print(res.result())
import time
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
def func(n):
time.sleep(1)
# print(n,current_thread().ident)
return n**2
def func2(n):
print(n)
if __name__ == '__main__':
t_p = ThreadPoolExecutor(max_workers = 4)
# t_p = ProcessPoolExecutor(max_workers = 4)
# t_p.submit(func,n = 1)
t_res_list = []
for i in range(10):
# res_obj = t_p.submit(func,i) #异步提交了这个10个任务,
res_obj = t_p.submit(func,i) #异步提交了这个10个任务,
# res_obj.result() #他和get一样
t_res_list.append(res_obj)
t_p.shutdown() # close + join
print('t_res_list',t_res_list)
for e_res in t_res_list:
print(e_res.result())
原文:https://www.cnblogs.com/zhang-zi-yi/p/10755811.html