# -*- coding: UTF-8 -*-
import json
import time
from clear_requests import *
from datetime import datetime, timedelta
def get_del_task_names_from_taskversion(searchKeyword="",projectId=‘‘,task_num=3,no_del_task_names=[],del_task_names=[],valid_days=7,taskType=""):
tasks = []
# no_del_task_types = ["dm","ods","acc","dim","fact","dws","mid","tag"]
no_del_task_types = ["tag"]
# valid_days = 7
delta = timedelta(days=valid_days)
if(not projectId and not searchKeyword):
raise("参数projectId和searchKeyword不能同时为空!")
try:
# 获取所有“0602上线测试”项目里面的任务名称和类型,并放在一个list中,将指定名称的任务和名称中含有0905的任务排除
task_info = json.loads(get_task_info(searchKeyword=searchKeyword,task_num=task_num,projectId=projectId))
if(task_info["code"]["code"] != "0000"):
logging.error(‘获取任务信息失败:\n%s‘ %(json.dumps(task_info)[:1000]), exc_info=True)
raise Exception("获取任务信息失败。程序终止...")
task_info_rows = task_info["bo"]["rows"]
task_info_rows.reverse()
for i in task_info_rows:
createtime = datetime.strptime(i["createdDate"],"%Y-%m-%d %H:%M:%S")
now = datetime.now()
# logging.info("***task %s, type is %s,create date is %s***"%(i["taskName"], i["taskType"],i["createdDate"]))
if(i["taskName"] in del_task_names):
tasks.append((i["taskId"],i["taskType"],i["taskName"]))
elif((now-createtime) < delta):
logging.info(‘Task "%s" created time is %s,less than %s days, can\‘t be Dealed!‘%(i["taskName"],i["createdDate"],valid_days))
elif(i["taskType"].lower() in no_del_task_types):
logging.info(‘Task "%s\‘s" type is %s , this type can\‘t be Dealed!‘ %(i["taskName"], i["taskType"]))
# elif(i["taskName"] in no_del_task_names or i["taskName"].find("0905") != -1):
# logging.info(‘Task "%s" can\‘t be Dealed! It is used by daily testing.‘ %i["taskName"])
else:
tasks.append([i["taskId"],i["taskType"],i["taskName"]])
except Exception as ex:
logging.error("获取任务信息失败。The Error1 is:%s" %ex, exc_info=True)
return tasks
def get_ods_task_names(projectid=‘282221398641901568‘,queryInfo=‘‘):
task_info = json.loads(get_ods_task_info(projectid=projectid,queryInfo=queryInfo))
if(task_info["code"]["code"] != "0000"):
logging.error(‘获取任务信息失败:\n%s‘ %(json.dumps(task_info)[:1000]), exc_info=True)
raise Exception("获取ods任务信息失败。程序终止...")
tasks = [(i["taskId"],i["taskType"],i["taskName"]) for i in task_info["bo"]["rows"]]
return tasks
def del_and_offline_tasks(tasks):
try:
logging.info("These tasks‘s version will be dealed are:\n%s\n"%tasks)
# 根据前面提供的列表,对任务进行下线和删除版本
for i in tasks:
task_ver_info = json.loads(get_task_ver(i[0],i[1]))
if(task_ver_info["code"]["code"] != "0000"):
logging.error(‘Taskid "%s" get version failed, the error info is:\n%s‘ %(i[0],json.dumps(task_ver_info)[500]))
continue
for j in task_ver_info["bo"]["rows"]:
if(j["taskVerStatus"]):
logging.info(‘Task "%s" will be offlined, the Version is: %s, the creator is: %s, create date is: %s‘ %(j["taskName"], j["taskVerCode"],j["creator"],j["createdDate"]))
task_offline(j["taskVerId"], j["taskType"])
logging.info(‘Task "%s" will be deleted, the Version is: %s, the creator is: %s, create date is: %s‘ %(j["taskName"], j["taskVerCode"],j["creator"],j["createdDate"]))
del_task_ver(j["taskVerId"], j["taskType"])
else:
logging.info(‘Task "%s" will be deleted, the Version is: %s, the creator is: %s, create date is: %s‘ %(j["taskName"], j["taskVerCode"],j["creator"],j["createdDate"]))
del_task_ver(j["taskVerId"], j["taskType"])
except Exception as ex:
logging.error("The Error2 is:%s" %ex, exc_info=True)
def stop_task_from_adma(otherCondition=‘edw_tag‘, status="", limit=2 ,time=cur_date,taskType=""):
virtual_task_info = get_virtual_task()
virtual_task_names = ""
for m in virtual_task_info:
virtual_task_names = virtual_task_names +"," + m["featureName"]
virtual_task_names = json.dumps(virtual_task_names)
task_info = get_task_name_from_adma(otherCondition=otherCondition,project_name = virtual_task_names, status=status, limit=limit ,time=time,taskType="")
task_names =[x["taskName"] for x in task_info["searchInfo"]]
logging.info("要删除的任务总数是:%d" %len(task_names))
for j in task_names:
logging.info("将要停止adma任务:{}".format(j))
stop_task(j)
def clear_all_tasks_mian():
logging.info("\n----------This is the start seperate line----------\n")
del_task_names = ["acc_ods_alm_liuyue_test_0905_0323","acc_ods_alm_liuyue_test_0905_0323_hbase"]
no_del_task_names = []
projectid = ‘282221398641901568‘ #测试环境projectid = ‘282221398641901568‘ ,liuyue项目;0602,projectid=‘200292042823204864‘
# 获取要删除的任务列表
# tasks = get_ods_task_names(queryInfo=‘autotest‘,projectid=‘‘)
# tasks = get_del_task_names_from_taskversion(projectId=‘‘,searchKeyword="employee")
tasks = get_del_task_names_from_taskversion(searchKeyword=‘edw_tag‘,projectId="",task_num=300,del_task_names=del_task_names,no_del_task_names= no_del_task_names,valid_days=0,taskType="")
# 运维中心下线和删除版本
del_and_offline_tasks(tasks)
# 删除ods任务草稿
# if(len(tasks)>0):
# for task in tasks:
# logging.info("ods task %s‘s draft will be deleted!" %task[2])
# del_ods_task(task[0])
# else:
# logging.info("There is no task to delete!")
logging.info("\n----------This is the end seperate line----------\n")
# clear_all_tasks_mian()
stop_task_from_adma(otherCondition=‘edw_dws‘, status="\"initial\"", limit=1 ,time=cur_date,taskType="")
# -*- coding: UTF-8 -*-
import requests
import base64
import json
import time
import random
import logging
import datetime
logging.basicConfig(filename="deal_log.txt",filemode="a",
format="%(asctime)s-%(funcName)s-%(lineno)d-%(levelname)s:%(message)s",level=logging.INFO)
console = logging.StreamHandler()
logging.getLogger().addHandler(console)
cur_date = datetime.datetime.today().strftime("%Y-%m-%d")
header = {"Content-Type":"application/json","X-Emp-No":"10237221","X-Lang-Id":"zh_CN","X-Auth-Value":get_token()}
# base_url = ‘http://d.zte.com.cn‘
base_url = ‘http://10.136.142.108‘
adma_ip = ‘10.5.21.133:26180‘
# 获取token
def get_token():
url =‘http://uac.zte.com.cn/uaccommauth/auth/comm/createRefreshToken.serv‘
header = {"Content-Type":"application/json"}
datastr = "eyJsb2dpbkNsaWVudElwIjogIjEyNy4wLjAuMSIsICJhY2NvdW50IjogIjEwMjM3MjIxIiwgInBhc3NXb3JkIjogIjI5OTgxMDA2YS0iLCAibG9naW5TeXN0ZW 1Db2RlIjogIlVBQyIsICJ2ZXJpZnlDb2RlIjogIjJkNz EzODQ2N2I3OWRlYTZlNTI2NWQ1ZTIzMDA3YTg4In0="
r = requests.post(url=url, headers = header, data = json.dumps(json.loads(base64.b64decode(datastr))))
r_dict = json.loads(r.content)
token = r_dict["other"]["token"]
return token
# 开发版本查询界面,查询任务信息
def get_task_info(searchKeyword = "",projectId = "",task_num = 3,taskType=""):
logging.info("The params is:searchKeyword = %s,projectId= %s ,task_num= %s ,taskType=%s"%(searchKeyword,projectId,task_num,taskType))
url =base_url + ‘/zte-itp-dcp-dataoperation/task/getpage‘
data ={"page":1,"rows":task_num,"bo":{"projectIds":[projectId],"schedulingType":"","searchKeyword":searchKeyword,"taskType":taskType}}
r = requests.post(url=url, headers = header, data = json.dumps(data))
return r.content
# 点击查看任务版本信息
def get_task_ver(taskid, tasktype):
url =base_url + ‘/zte-itp-dcp-dataoperation/dobaselineinfo/gettaskverinfos‘
data = {"bo":{"baselineExtractStatus":"","searchKeyword":"","taskType":tasktype,"taskId":taskid},"page":1,"rows":10,"sort":"",
"order":"","other":{}}
r = requests.post(url=url, headers = header, data = json.dumps(data))
return r.content
# 点击下线
def task_offline(taskverid, tasktype):
url =base_url + ‘/zte-itp-dcp-dataoperation/task/offlinetaskbytaskverid‘
data = {"taskType":tasktype,"taskName":"ods_test_ods_1_global_privs_xl_1112_21522","taskVerId":taskverid,"taskId":
"3917491946274488322","taskVerCode":"V1.0.02"}
r = requests.post(url=url, headers = header, data = json.dumps(data))
# 点击删除版本
def del_task_ver(taskverid, tasktype):
url =base_url + ‘/zte-itp-dcp-dataoperation/task/deletetaskbytaskverid‘
data = [{"taskType":tasktype,"taskName":"edw_test_full_123102","taskVerId":taskverid,"taskId":"409460023074258944",
"taskVerCode":"V1.0.0"}]
if(tasktype.lower() ==‘tag‘ or tasktype.lower()==‘profile‘):
logging.info("标签和画像类型的任务无法在运维中心删除版本,需要在标签工厂界面删除!")
else:
r = requests.post(url=url, headers = header, data = json.dumps(data))
logging.info("任务:%s已被删除!"%(taskverid))
# 数据同步查询界面,查询ods任务信息
def get_ods_task_info(queryInfo="",projectid=‘282221398641901568‘, tasktype=""):
url =base_url + ‘/zte-itp-dcp-datamodeling/detaskconf/gettaskquery‘
data = {"bo":{"goalDomainSubId":"","projId":projectid,"queryInfo":queryInfo,"lastUpdatedBy":"","taskType":tasktype},
"order":"desc","other":{},"page":1,"rows":1000,"sort":"lastUpdatedDate"}
r = requests.post(url=url, headers = header, data = json.dumps(data))
return r.content
# 数据同步查询界面,删除ods任务草稿
def del_ods_task(taskid):
url =base_url + ‘/zte-itp-dcp-datamodeling/detaskconf/deletetask‘
data = {"bo":{"taskId":taskid}}
r = requests.post(url=url, headers = header, data = json.dumps(data))
# 新增数据源
def add_datasource(sourceName, dbJdbcUrl, dbAccount, dbPwd, sourceEnvType, dbType):
url =base_url + ‘/zte-itp-dcp-datamodeling/dedatasourceconf/add‘
dbPwd = base64.b64encode(dbPwd)
data = {"dbAccount":dbAccount,"dbJdbcUrl":dbJdbcUrl,
"dbNames":"","portNumber":null,"dbPwd":dbPwd,"sourceEnvType":sourceEnvType,"dbType":dbType,"switchType":"1",
"domainSubId":"200247186260262912","dbStatus":0,"domainSubName":"ods/alm","projId":"282221398641901568","region":"shenzhen",
"sourceDesc":"","sourceName":sourceName,"clusterName":"","maxConnectionQty":20}
r = requests.post(url=url, headers = header, data = json.dumps(data))
# 删除数据源
def del_datasource(sourceid):
url =base_url + ‘/zte-itp-dcp-datamodeling/dedatasourceconf/delete/‘
r = requests.delete(url=url, headers = header, params=sourceid)
# 查询数据源
def search_datasource(sourceName):
url =base_url + ‘/zte-itp-dcp-datamodeling/dedatasourceconf/getlist‘
data = {"bo":{"dbType":"","domainSubName":"","sourceEnvType":"","sourceName":sourceName,"projId":"282221398641901568"},"page":1,
"rows":10,"sort":"lastUpdatedDate","order":"desc"}
r = requests.post(url=url, headers = header, data=data)
# adma统计分析界面,获取任务信息
def get_task_name_from_adma(otherCondition=‘‘,project_name = ‘test_0602‘, status="", limit=100 ,time=cur_date,taskType=""):
url =‘https://‘ +adma_ip +‘/vmaxmetadata/metadatamanage/analysis/tasksearch‘
header = {"Content-Type":"application/json"}
data = {"taskSearchCondition":{
"virtualTaskName":project_name,"status":status,"repeat":"","taskTriggerType":"","taskType":taskType,
"executeType":"","executeTime":{"executeTimeSecondStart":0,"executeTimeSecondEnd":86400},"dbType":"","reasonCategory":"",
"otherCondition":otherCondition,"durationAsc":""},
"draw":20,"limit":limit,"page":1,"provincecode":"all","time":time,"classify":"all",
"order":{"column":"taskId","dir":"asc"}}
r = requests.post(url=url, headers = header, data = json.dumps(data), verify = False)
r_dict = json.loads(r.content)
return r_dict
# adma接口,停止任务
def stop_task(task_name=‘‘):
url =‘https://‘ + adma_ip +‘/vmaxmetadata/metadatamanage/visualrestapi/add‘
header = {"Content-Type":"application/json"}
data = {
"taskType": "302",
"userName": "admin",
"tasksInfo": {
"algorithmNames": task_name
}
}
r = requests.post(url=url, headers = header, data = json.dumps(data), verify = False)
r_dict = json.loads(r.content)
return r_dict
# adma接口,查询虚拟任务
def get_virtual_task(time=cur_date):
url =‘https://‘ + adma_ip +‘/vmaxmetadata/metadatamanage/analysis/featuresearch‘
header = {"Content-Type":"application/json"}
param = dict(provincecode="all",time=time,classify="all",_="1585277235758")
r = requests.get(url=url, headers = header, params=param, verify = False)
r_dict = json.loads(r.content)
return r_dict
# print(get_token())
# get_task_info("200292042823204864")
# get_task_info(searchKeyword="employee")
# get_task_ver(‘409470761318776832‘,‘ACC‘)
# task_offline(‘409470761318776832‘)
# del_task_ver(‘409470761318776832‘)
# print(get_task_name_from_adma())
# logging.error("This is a error log.")
# get_ods_task_info("test")
# del_datasource("123")
# stop_task("edw_tag_profile_autotest_tu8r_1580912760_cf064_hb")
原文:https://www.cnblogs.com/yahutiaotiao/p/12631936.html