-------------------------------------------------多线程
进程是多个资源的集合
线程就是进程里面具体干活的
线程与线程之间是相互独立的
代码最先开始执行的的线程是主线程,通过主线程启动的线程是子线程
电脑CPU是几核的就能同时执行几个任务;没有真正意义上的并发
python的多线程实际是只使用一个CPU在执行,pythonGIL全局解释器锁限制了python使用线程
多线程执行函数拿不到函数的返回值,创建list、dict,将结果加入list或dict中,可得到结果
def down_load():
time.sleep(5)
for i in range(5): 5个独立的线程运行
t = threading.Thread(target=down_load) #实例化线程
t.start() #启动线程
print(‘当前线程数‘,threading.activeCount()) #查看当前线程数
print(‘当前线程‘,threading.current_thread()) #查看当前线程
-----------------------------------实例--------------------------
def down_load():
time.sleep(5)
print(‘运行完了‘)
print(‘单线程‘,threading.current_thread()) # 查看当前线程-子线程
start_time = time.time() #线程开始时间
for i in range(5):
t = threading.Thread(target=down_load)
t.start()
print(‘多线程‘,threading.current_thread()) # 查看当前线程-主线程
while threading.activeCount()!=1: #判断子线程是否运行完,从而用来统计线程运行时间;当=1时表示所有子循环都运行完了只剩主循环
pass
print(‘当前线程数‘,threading.activeCount()) #查看当前线程数
print(‘当前线程‘,threading.current_thread()) #查看当前线程
end_time = time.time()
print(end_time - start_time)
------------------------------------------下载图片----------
def down_load_pic(url):
req = requests.get(url) #下载图片
m = md5(url.encode()) #加密图片地址
with open( m.hexdigest()+‘.png‘,‘wb‘) as fw:
fw.write(req.content)
url_list = [‘http://www.nnzhp.cn/wp-content/uploads/2019/10/f410afea8b23fa401505a1449a41a133.png‘,
‘http://www.nnzhp.cn/wp-content/uploads/2019/11/481b5135e75c764b32b224c5650a8df5.png‘,
‘http://www.nnzhp.cn/wp-content/uploads/2019/11/b23755cdea210cfec903333c5cce6895.png‘,
‘http://www.nnzhp.cn/wp-content/uploads/2019/11/542824dde1dbd29ec61ad5ea867ef245.png‘]
# 单线程
start_time = time.time()
for url in url_list:
down_load_pic(url)
end_time = time.time()
print(end_time - start_time)
# 多线程
start_time = time.time() #线程开始时间
for url in url_list: #循环出url
t = threading.Thread(target=down_load_pic,args=(url,)) #实例化线程,args参数有只有一个时后需加,
t.start() #启动线程
while threading.activeCount()!=1: #判断线程
pass
end_time = time.time() #线程结束时间
print(end_time-start_time) #统计整个线程的执行时间
----------------------------------线程池----------------------
def down_load_pic(url):
req = requests.get(url) #下载图片
m = md5(url.encode()) #加密图片地址
with open( m.hexdigest()+‘.png‘,‘wb‘) as fw:
fw.write(req.content)
url_list = [‘http://www.nnzhp.cn/wp-content/uploads/2019/10/f410afea8b23fa401505a1449a41a133.png‘,
‘http://www.nnzhp.cn/wp-content/uploads/2019/11/481b5135e75c764b32b224c5650a8df5.png‘,
‘http://www.nnzhp.cn/wp-content/uploads/2019/11/b23755cdea210cfec903333c5cce6895.png‘,
‘http://www.nnzhp.cn/wp-content/uploads/2019/11/542824dde1dbd29ec61ad5ea867ef245.png‘]
pool = threadpool.ThreadPool(20) #实例化线程池(执行线程池大小)
reqs = threadpool.makeRequests(down_load_pic,url_list) #分配资源
# for req in reqs:
# pool.putRequest(req)
[pool.putRequest(req) for req in reqs] #将分配好的资源循环放到线程池里面
print(threading.activeCount)
pool.wait() #等待所有子线程运行完之后一起干活
print(‘end‘)
---------------------------守护线程----------------------------
# 守护线程:主线程结束守护线程立即死掉
import threading,time
def down_load():
time.sleep(5)
print(‘运行完了‘)
print(‘单线程‘,threading.current_thread()) # 查看当前线程
for i in range(5):
t = threading.Thread(target=down_load)
t.setDaemon(True) #设置子线程为守护线程,主线程结束了之后,子线程立即结束
t.start()
print(‘over‘)
---------------------------锁-------------------------------------------
锁:多个线程操作同一个数据的时候加锁
num = 0
look = threading.Lock() #申请锁
def add():
global num
look.acquire() #加锁
num += 1
look.release() #解锁 不解锁的话会死锁
with look: #用with自动进行加锁、解锁
num += 1
for i in range(20):
t = threading.Thread(target=add,)
t.start()
while threading.activeCount()!=1:
pass
print(num)
------------------------------------------多进程-----------------------------------
import multiprocessing
多进程可以利用多核CPU
多线程用于io密集型任务;多进程使用与CPU密集型任务
io:input/output 网络io/磁盘io
网络io:譬如网络上上传、下载东西
磁盘io:譬如从数据库取东西
CPU密集型:譬如排序、计算、数据分析等
import multiprocessing,time
def down_load():
time.sleep(5)
print(‘运行完了‘)
if __name__ == ‘__main__‘:
for i in range(5):
p = multiprocessing.Process(target=down_load) #实例化一个多进程
p.start() #启用进程
while len(multiprocessing.active_children())!=0: #等待子进程结束
pass
print(multiprocessing.active_children()) #将所有运行的子进程放置在list中
print(multiprocessing.cpu_count()) #将所有运行的子进程放置在list中
print(multiprocessing.current_process())
---------------------------单元测试---------------------------------
自己写代码来测试自己写的代码功能有没有实现
测试运行顺序是按照函数名称的开头字母运行的
python unittest
def add(a,b):
return a+b
class AddTest(unittest.TestCase):
@classmethod
def setUpClass(cls): #所有用例执行之前执行setUpClass
print(‘setUpClass‘)
def setUp(self): #每条用例执行之前都会执行setup
print(‘setup‘)
def tearDown(self): #每条用例执行之后都会执行tearDown
print(‘tearDown‘)
def test_normal(self): #函数必须以test开头,才会被认为是测试用例
result = add(1,1)
self.assertEqual(2,result) #assertEqual()校验两个值是否相等
def test_error(self):
result = add(1,1)
self.assertEqual(1, result,‘结果计算错误‘) #第三个参数提示运行失败提示信息
self.assertIn(1.[1,2,3]) #assertIn()判断是否在XX里面
self.assertNotEqual(1,1) #assertNotEqual() 判断两个值是否不相等
self.assertNotIn(1,[2,5,4]) #assertNotIn()判断是不不存在XX里面
@classmethod
def tearDownClass(cls): #所有用例执行之后执行tearDownClass
print(‘tearDownClass‘)
# if __name__ == ‘__main__‘:
unittest.main() #会识别当前文件测得测试用例并且执行
------------------------------------------测试报告---------------------------------
----------------------------HTMLTestRunner
def add(a,b):
return a+b
class AddTest(unittest.TestCase):
@classmethod
def setUpClass(cls): #所有用例执行之前执行setUpClass
print(‘setUpClass‘)
def setUp(self): #每条用例执行之前都会执行setup
print(‘setup‘)
def tearDown(self): #每条用例执行之后都会执行tearDown
print(‘tearDown‘)
def test_normal(self):
result = add(1,1)
self.assertEqual(2,result)
print("normal")
@classmethod
def tearDownClass(cls): #所有用例执行之后执行tearDownClass
print(‘tearDownClass‘)
# unittest.main()
file = open(‘test.html‘,‘wb‘)
runner = HTMLTestRunner.HTMLTestRunner(file,title=‘测试报告‘)
test_suite = unittest.makeSuite(AddTest)
runner.run(test_suite)
file.close
---------------------------------BeautifulReport
def add(a,b):
return a+b
class AddTest(unittest.TestCase):
@classmethod
def setUpClass(cls): #所有用例执行之前执行setUpClass
print(‘setUpClass‘)
def setUp(self): #每条用例执行之前都会执行setup
print(‘setup‘)
def tearDown(self): #每条用例执行之后都会执行tearDown
print(‘tearDown‘)
def test_normal(self):
result = add(1,1)
self.assertEqual(2,result)
print("normal")
@classmethod
def tearDownClass(cls): #所有用例执行之后执行tearDownClass
print(‘tearDownClass‘)
# unittest.main()
test_suite = unittest.makeSuite(AddTest)
report = bfr.BeautifulReport(test_suite)
report.report(filename=‘bf_report.html‘,description=‘bf测试用例报告‘)
print(report.failure_count) #失败次数
print(report.success_count) #成功次数
--------------------------------------------参数化--------------------------
#数据驱动
#代码驱动
#关键字驱动
data = [
[‘admin‘,‘123456‘,True,‘正常登录‘],
[‘admin‘,‘1122‘,False,‘异常登录‘],
[‘sdfsdf‘,‘1111‘,False,‘黑名单账户‘],
[‘http://127.0.0.1:8889/api/login‘,‘port‘,{‘username‘:‘xiaohei‘,‘password‘:‘123456‘},‘登录成功‘]
]
def login(user,password):
if user==‘admin‘ and password==‘123456‘:
return True
return False
class LoginTest(unittest.TestCase):
@parameterized.parameterized.expand(data) #解析
def test_login(self,user,password,expect,desc):
self._testMethodDoc = desc #用例描述
result = login(user,password)
self.assertEqual(expect,result)
@parameterized.parameterized.expand(data)
def test_login2(self,user,password,expect):
‘‘‘登录‘‘‘ #用例描述,统一的用例描述+参数化
result = login(user,password)
self.assertEqual(expect,result)
bf = BeautifulReport.BeautifulReport(unittest.makeSuite(LoginTest))
bf.report(filename=‘11-17测试报告‘,description=‘接口测试报告‘)
------------------------安装虚拟环境----------------------------------
导入虚拟环境第三方模块:pip3 install virtualenv
1、新建文件夹envtest
2、dos环境打开文件夹()
cd Desktop 打开Desktop
cd envtest 打开envtest文件夹
virtualenv py3
cd py3 打开py3
dir 查看py3内容
cd Scripts 打开Scripts
dir 查看打开Scripts内容
activate
3、dos环境安装完成,文件夹envtest存在python文件
4、pycharm中配置虚拟环境
File --settings--project:项目名称--Add Local--选择虚拟文件python所在目录--Apply -- ok
5、虚拟环境配置完成
6、虚拟环境安装第三方模块
pycharm中--Terminal 输入安装第三方模块的命令:pip3 install requests
7、若安装地三方模块在正常环境
pycharm中--Terminal 输入 deactivate 退出虚拟环境
8、若进入虚拟环境安装第三方模块
进入安装虚拟环境(同2)
virtualenv name 创建干净的虚拟环境
virtualenv name --no-site-packages 不将原环境的第三方模块包带到虚拟环境
virtualenv name --system-site-packages 将原环境的第三方模块包带到虚拟环境
deactivate 退出虚拟环境
原文:https://www.cnblogs.com/wannn/p/11915775.html