梳理python+unittest接口自动化测试框架的思路:
1.确定目录:
cases:存放测试用例的py文件;config:存放一些数据库,环境地址等固定不变的信息;
core:核心的文件,
cases:测试用例test_cj.py,代码如下:
import unittest import os import jsonpath from core.my_requests import MyRequest from conf.setting import default_host from core.tools import mysql from core.tools import r as redis class Cj(unittest.TestCase): username=‘autotest_lyh78910‘#自动化测试注册的用户 password=‘aA123456‘#类变量 def test_reg(self):#post请求,请求参数为username,pwd,cpwd. ‘‘‘注册接口‘‘‘ url=‘/api/user/user_reg‘ new_url=os.path.join(default_host,url) data={‘username‘:self.username,‘pwd‘:self.password,‘cpwd‘:self.password} r=MyRequest() result=r.post(new_url,data)#result是个字典 self.assertEqual(1,result.get(‘status‘),msg=‘注册接口不通 %s‘%result.get(‘sql_file‘)) #判断请求接口是否通,1表示成功,0,2都是失败,接口不通,这条语句下面的都不会执行 error_code=result.get(‘sql_file‘).get(‘error_code‘) self.assertEqual(0,error_code,msg="注册失败!%s"result.get(‘sql_file‘)) sql=‘select * from app_myuser where username="%s;‘%self.username sql_res=mysql.execute_one(sql)#数据库查询结果 self.assertIsNotNone(sql_res)#若数据库表里有新注册的用户名,说明注册成功。判断这个结果是否为空 def login(self):#post请求 ‘‘‘登录‘‘‘ url=‘/api/user/login‘ new_url=os.path.join(default_host,url) data={‘username‘:self.username,‘passwd‘:self.password} r=MyRequest() result=r.post(new_url,data) self.assertEqual(1,result.get(‘status‘),msg=‘登录接口不通!%s‘%result.get(‘sql_file‘)) sign=get_real_value(‘sign‘,result)#从返回值里面取到sign值 self.assertIsNotNone(sign,msg=‘登录失败!%s‘result)#校验sign是否为空 userid=get_real_value(‘userId‘,result)从返回值里面取到userId值 return userid,sign def choujiang(self):#get请求 ‘‘‘抽奖‘‘‘ url=default_host+‘/api/product/choice‘ userid, sign = self.login()#调用函数,直接拿到函数的两个返回值 data={‘userid‘:userid,‘sign‘:sign} r=MyRequest(url,data) result=r.get() self.assertEqual(1,result.get(‘status‘),msg=‘抽奖接口不通!%s‘%result.get(‘sql_file‘)) redis_key=‘choujiang:%s‘%self.username count=redis.get(redis_key)#抽奖次数,redis里面key值是字符串 self.assertEqual(‘1‘,count,msg=‘抽奖次数错误%s‘result) sql=‘select count(*) as cishu from app_record where user_id=%s;‘%userid cishu=mysql.execute_one(sql).get(‘cishu‘) #本来fetchall()返回的是list,但指定了dict # #{‘cishu‘:1} self.assertEqual(1,cishu,msg=‘抽奖记录没有落到数据库里面!‘) def test_choujiang(self):#抽奖流程 ‘‘‘抽奖流程测试‘‘‘ self.reg() self.choujiang() def tearDownClass(cls):#类里面的所有测试用例执行完之后,都会执行它 ‘‘‘注册数据清除‘‘‘ sql=‘delete * from app_myuser where username="%s";‘%cls.username mysql.execute_one(sql) key=‘choujiang:%s‘%cls.username redis.delete(key) print("测试数据清理完成!")
测试用例里写按接口参数发送请求,判断结果,判断结果用断言essertEqual(a,b),essertIsNotNone()判断,还有接口流程
my_request.py 定义一个类MyRequest,里面有post()和get()方法,发送请求后,返回响应数据res={"status":0,"data":res.json()},代码如下:
import requests import nnlog import os from conf.setting import LOG_PATH class MyRequest: log_file_name = os.path.join(LOG_PATH,‘MyRequest.log‘)#日子文件名 time_out = 10 #请求超时时间 def __init__(self,url,data=None,headers=None,file=None): self.url = url self.data = data self.headers = headers self.file = file def post(self): try: req = requests.post(self.url,data=self.data,headers=self.headers, files=self.file,timeout=self.time_out) except Exception as e: res = {"status":0,"sql_file":e.args} #0代表请求失败 else: try: res = {"status":1,"sql_file":req.json()} #1代表返回的json except Exception as e: res = {"status":2,"sql_file":req.text} #2代表返回不是json log_str = ‘url: %s 请求方式:post sql_file:%s ,返回数据:%s‘%(self.url,self.data,res) self.write_log(log_str) return res def get(self): try: req = requests.get(self.url,params=self.data,headers=self.headers,timeout=self.time_out) except Exception as e: res = {"status":0,"sql_file":req.args} #0代表请求失败 else: try: res = {"status":1,"sql_file":req.json()} #1代表返回的json except Exception as e: res = {"staus":2,"sql_file":req.text} #2代表返回不是json log_str = ‘url: %s get请求 sql_file:%s ,返回数据:%s‘%(self.url,self.data,res) self.write_log(log_str) return res @classmethod def write_log(cls,content): log = nnlog.Logger(cls.log_file_name) log.debug(content)
发送请求后,执行用例前,先备份数据库mysqldump -uroot -p12345 -Pxxxx -hxxx.xxx.xxx.xxx >xxx/a.sql,自动化测试执行后,把原数据库名称改掉:
rename database main to main_20190220,再创建原数据库main,把备份的数据恢复进去。mysql -uroot -p123456 -Pxxx -hxxxxx <xxxx/a.sql
op_data.py文件主要写数据库的备份和恢复。
bin:start.py文件,主要写运行测试的过程
import unittest
import datatime
from BeautifulReport import BeautifulReport
def run_case():
sql_file=bak_db()#备份数据库
suite=unittest.Testsuite()#创建测试集合
cases=unittest.defualltTestloader.discover(Case_path,‘test*.py‘)#找到所有测试用例
for case in cases:
suite.addTest(case)#把每个测试用例加到测试集合中
report=BeautifulReport.BeautifulReport(suite)#运行测试集合,产生报告
report_path=make_today_dir()#创建报告的今天的目录
file_name=‘report_%s.html‘%datatime.datatime.now().strftime(‘%H%M%S)#报告文件
report.report(filename=file_name,description="接口测试“,log_path=path)#产生报告
abs_path=os.path.join(report_path,file_name)#报告的路径
send_mail(content,abs_path)#发送邮件
恢复数据库
原文:https://www.cnblogs.com/balllyh/p/10409656.html