# -*- coding: UTF-8 -*- import json import time from clear_requests import * from datetime import datetime, timedelta def get_del_task_names_from_taskversion(searchKeyword="",projectId=‘‘,task_num=3,no_del_task_names=[],del_task_names=[],valid_days=7,taskType=""): tasks = [] # no_del_task_types = ["dm","ods","acc","dim","fact","dws","mid","tag"] no_del_task_types = ["tag"] # valid_days = 7 delta = timedelta(days=valid_days) if(not projectId and not searchKeyword): raise("参数projectId和searchKeyword不能同时为空!") try: # 获取所有“0602上线测试”项目里面的任务名称和类型,并放在一个list中,将指定名称的任务和名称中含有0905的任务排除 task_info = json.loads(get_task_info(searchKeyword=searchKeyword,task_num=task_num,projectId=projectId)) if(task_info["code"]["code"] != "0000"): logging.error(‘获取任务信息失败:\n%s‘ %(json.dumps(task_info)[:1000]), exc_info=True) raise Exception("获取任务信息失败。程序终止...") task_info_rows = task_info["bo"]["rows"] task_info_rows.reverse() for i in task_info_rows: createtime = datetime.strptime(i["createdDate"],"%Y-%m-%d %H:%M:%S") now = datetime.now() # logging.info("***task %s, type is %s,create date is %s***"%(i["taskName"], i["taskType"],i["createdDate"])) if(i["taskName"] in del_task_names): tasks.append((i["taskId"],i["taskType"],i["taskName"])) elif((now-createtime) < delta): logging.info(‘Task "%s" created time is %s,less than %s days, can\‘t be Dealed!‘%(i["taskName"],i["createdDate"],valid_days)) elif(i["taskType"].lower() in no_del_task_types): logging.info(‘Task "%s\‘s" type is %s , this type can\‘t be Dealed!‘ %(i["taskName"], i["taskType"])) # elif(i["taskName"] in no_del_task_names or i["taskName"].find("0905") != -1): # logging.info(‘Task "%s" can\‘t be Dealed! It is used by daily testing.‘ %i["taskName"]) else: tasks.append([i["taskId"],i["taskType"],i["taskName"]]) except Exception as ex: logging.error("获取任务信息失败。The Error1 is:%s" %ex, exc_info=True) return tasks def get_ods_task_names(projectid=‘282221398641901568‘,queryInfo=‘‘): task_info = json.loads(get_ods_task_info(projectid=projectid,queryInfo=queryInfo)) if(task_info["code"]["code"] != "0000"): logging.error(‘获取任务信息失败:\n%s‘ %(json.dumps(task_info)[:1000]), exc_info=True) raise Exception("获取ods任务信息失败。程序终止...") tasks = [(i["taskId"],i["taskType"],i["taskName"]) for i in task_info["bo"]["rows"]] return tasks def del_and_offline_tasks(tasks): try: logging.info("These tasks‘s version will be dealed are:\n%s\n"%tasks) # 根据前面提供的列表,对任务进行下线和删除版本 for i in tasks: task_ver_info = json.loads(get_task_ver(i[0],i[1])) if(task_ver_info["code"]["code"] != "0000"): logging.error(‘Taskid "%s" get version failed, the error info is:\n%s‘ %(i[0],json.dumps(task_ver_info)[500])) continue for j in task_ver_info["bo"]["rows"]: if(j["taskVerStatus"]): logging.info(‘Task "%s" will be offlined, the Version is: %s, the creator is: %s, create date is: %s‘ %(j["taskName"], j["taskVerCode"],j["creator"],j["createdDate"])) task_offline(j["taskVerId"], j["taskType"]) logging.info(‘Task "%s" will be deleted, the Version is: %s, the creator is: %s, create date is: %s‘ %(j["taskName"], j["taskVerCode"],j["creator"],j["createdDate"])) del_task_ver(j["taskVerId"], j["taskType"]) else: logging.info(‘Task "%s" will be deleted, the Version is: %s, the creator is: %s, create date is: %s‘ %(j["taskName"], j["taskVerCode"],j["creator"],j["createdDate"])) del_task_ver(j["taskVerId"], j["taskType"]) except Exception as ex: logging.error("The Error2 is:%s" %ex, exc_info=True) def stop_task_from_adma(otherCondition=‘edw_tag‘, status="", limit=2 ,time=cur_date,taskType=""): virtual_task_info = get_virtual_task() virtual_task_names = "" for m in virtual_task_info: virtual_task_names = virtual_task_names +"," + m["featureName"] virtual_task_names = json.dumps(virtual_task_names) task_info = get_task_name_from_adma(otherCondition=otherCondition,project_name = virtual_task_names, status=status, limit=limit ,time=time,taskType="") task_names =[x["taskName"] for x in task_info["searchInfo"]] logging.info("要删除的任务总数是:%d" %len(task_names)) for j in task_names: logging.info("将要停止adma任务:{}".format(j)) stop_task(j) def clear_all_tasks_mian(): logging.info("\n----------This is the start seperate line----------\n") del_task_names = ["acc_ods_alm_liuyue_test_0905_0323","acc_ods_alm_liuyue_test_0905_0323_hbase"] no_del_task_names = [] projectid = ‘282221398641901568‘ #测试环境projectid = ‘282221398641901568‘ ,liuyue项目;0602,projectid=‘200292042823204864‘ # 获取要删除的任务列表 # tasks = get_ods_task_names(queryInfo=‘autotest‘,projectid=‘‘) # tasks = get_del_task_names_from_taskversion(projectId=‘‘,searchKeyword="employee") tasks = get_del_task_names_from_taskversion(searchKeyword=‘edw_tag‘,projectId="",task_num=300,del_task_names=del_task_names,no_del_task_names= no_del_task_names,valid_days=0,taskType="") # 运维中心下线和删除版本 del_and_offline_tasks(tasks) # 删除ods任务草稿 # if(len(tasks)>0): # for task in tasks: # logging.info("ods task %s‘s draft will be deleted!" %task[2]) # del_ods_task(task[0]) # else: # logging.info("There is no task to delete!") logging.info("\n----------This is the end seperate line----------\n") # clear_all_tasks_mian() stop_task_from_adma(otherCondition=‘edw_dws‘, status="\"initial\"", limit=1 ,time=cur_date,taskType="")
# -*- coding: UTF-8 -*- import requests import base64 import json import time import random import logging import datetime logging.basicConfig(filename="deal_log.txt",filemode="a", format="%(asctime)s-%(funcName)s-%(lineno)d-%(levelname)s:%(message)s",level=logging.INFO) console = logging.StreamHandler() logging.getLogger().addHandler(console) cur_date = datetime.datetime.today().strftime("%Y-%m-%d") header = {"Content-Type":"application/json","X-Emp-No":"10237221","X-Lang-Id":"zh_CN","X-Auth-Value":get_token()} # base_url = ‘http://d.zte.com.cn‘ base_url = ‘http://10.136.142.108‘ adma_ip = ‘10.5.21.133:26180‘ # 获取token def get_token(): url =‘http://uac.zte.com.cn/uaccommauth/auth/comm/createRefreshToken.serv‘ header = {"Content-Type":"application/json"} datastr = "eyJsb2dpbkNsaWVudElwIjogIjEyNy4wLjAuMSIsICJhY2NvdW50IjogIjEwMjM3MjIxIiwgInBhc3NXb3JkIjogIjI5OTgxMDA2YS0iLCAibG9naW5TeXN0ZW 1Db2RlIjogIlVBQyIsICJ2ZXJpZnlDb2RlIjogIjJkNz EzODQ2N2I3OWRlYTZlNTI2NWQ1ZTIzMDA3YTg4In0=" r = requests.post(url=url, headers = header, data = json.dumps(json.loads(base64.b64decode(datastr)))) r_dict = json.loads(r.content) token = r_dict["other"]["token"] return token # 开发版本查询界面,查询任务信息 def get_task_info(searchKeyword = "",projectId = "",task_num = 3,taskType=""): logging.info("The params is:searchKeyword = %s,projectId= %s ,task_num= %s ,taskType=%s"%(searchKeyword,projectId,task_num,taskType)) url =base_url + ‘/zte-itp-dcp-dataoperation/task/getpage‘ data ={"page":1,"rows":task_num,"bo":{"projectIds":[projectId],"schedulingType":"","searchKeyword":searchKeyword,"taskType":taskType}} r = requests.post(url=url, headers = header, data = json.dumps(data)) return r.content # 点击查看任务版本信息 def get_task_ver(taskid, tasktype): url =base_url + ‘/zte-itp-dcp-dataoperation/dobaselineinfo/gettaskverinfos‘ data = {"bo":{"baselineExtractStatus":"","searchKeyword":"","taskType":tasktype,"taskId":taskid},"page":1,"rows":10,"sort":"", "order":"","other":{}} r = requests.post(url=url, headers = header, data = json.dumps(data)) return r.content # 点击下线 def task_offline(taskverid, tasktype): url =base_url + ‘/zte-itp-dcp-dataoperation/task/offlinetaskbytaskverid‘ data = {"taskType":tasktype,"taskName":"ods_test_ods_1_global_privs_xl_1112_21522","taskVerId":taskverid,"taskId": "3917491946274488322","taskVerCode":"V1.0.02"} r = requests.post(url=url, headers = header, data = json.dumps(data)) # 点击删除版本 def del_task_ver(taskverid, tasktype): url =base_url + ‘/zte-itp-dcp-dataoperation/task/deletetaskbytaskverid‘ data = [{"taskType":tasktype,"taskName":"edw_test_full_123102","taskVerId":taskverid,"taskId":"409460023074258944", "taskVerCode":"V1.0.0"}] if(tasktype.lower() ==‘tag‘ or tasktype.lower()==‘profile‘): logging.info("标签和画像类型的任务无法在运维中心删除版本,需要在标签工厂界面删除!") else: r = requests.post(url=url, headers = header, data = json.dumps(data)) logging.info("任务:%s已被删除!"%(taskverid)) # 数据同步查询界面,查询ods任务信息 def get_ods_task_info(queryInfo="",projectid=‘282221398641901568‘, tasktype=""): url =base_url + ‘/zte-itp-dcp-datamodeling/detaskconf/gettaskquery‘ data = {"bo":{"goalDomainSubId":"","projId":projectid,"queryInfo":queryInfo,"lastUpdatedBy":"","taskType":tasktype}, "order":"desc","other":{},"page":1,"rows":1000,"sort":"lastUpdatedDate"} r = requests.post(url=url, headers = header, data = json.dumps(data)) return r.content # 数据同步查询界面,删除ods任务草稿 def del_ods_task(taskid): url =base_url + ‘/zte-itp-dcp-datamodeling/detaskconf/deletetask‘ data = {"bo":{"taskId":taskid}} r = requests.post(url=url, headers = header, data = json.dumps(data)) # 新增数据源 def add_datasource(sourceName, dbJdbcUrl, dbAccount, dbPwd, sourceEnvType, dbType): url =base_url + ‘/zte-itp-dcp-datamodeling/dedatasourceconf/add‘ dbPwd = base64.b64encode(dbPwd) data = {"dbAccount":dbAccount,"dbJdbcUrl":dbJdbcUrl, "dbNames":"","portNumber":null,"dbPwd":dbPwd,"sourceEnvType":sourceEnvType,"dbType":dbType,"switchType":"1", "domainSubId":"200247186260262912","dbStatus":0,"domainSubName":"ods/alm","projId":"282221398641901568","region":"shenzhen", "sourceDesc":"","sourceName":sourceName,"clusterName":"","maxConnectionQty":20} r = requests.post(url=url, headers = header, data = json.dumps(data)) # 删除数据源 def del_datasource(sourceid): url =base_url + ‘/zte-itp-dcp-datamodeling/dedatasourceconf/delete/‘ r = requests.delete(url=url, headers = header, params=sourceid) # 查询数据源 def search_datasource(sourceName): url =base_url + ‘/zte-itp-dcp-datamodeling/dedatasourceconf/getlist‘ data = {"bo":{"dbType":"","domainSubName":"","sourceEnvType":"","sourceName":sourceName,"projId":"282221398641901568"},"page":1, "rows":10,"sort":"lastUpdatedDate","order":"desc"} r = requests.post(url=url, headers = header, data=data) # adma统计分析界面,获取任务信息 def get_task_name_from_adma(otherCondition=‘‘,project_name = ‘test_0602‘, status="", limit=100 ,time=cur_date,taskType=""): url =‘https://‘ +adma_ip +‘/vmaxmetadata/metadatamanage/analysis/tasksearch‘ header = {"Content-Type":"application/json"} data = {"taskSearchCondition":{ "virtualTaskName":project_name,"status":status,"repeat":"","taskTriggerType":"","taskType":taskType, "executeType":"","executeTime":{"executeTimeSecondStart":0,"executeTimeSecondEnd":86400},"dbType":"","reasonCategory":"", "otherCondition":otherCondition,"durationAsc":""}, "draw":20,"limit":limit,"page":1,"provincecode":"all","time":time,"classify":"all", "order":{"column":"taskId","dir":"asc"}} r = requests.post(url=url, headers = header, data = json.dumps(data), verify = False) r_dict = json.loads(r.content) return r_dict # adma接口,停止任务 def stop_task(task_name=‘‘): url =‘https://‘ + adma_ip +‘/vmaxmetadata/metadatamanage/visualrestapi/add‘ header = {"Content-Type":"application/json"} data = { "taskType": "302", "userName": "admin", "tasksInfo": { "algorithmNames": task_name } } r = requests.post(url=url, headers = header, data = json.dumps(data), verify = False) r_dict = json.loads(r.content) return r_dict # adma接口,查询虚拟任务 def get_virtual_task(time=cur_date): url =‘https://‘ + adma_ip +‘/vmaxmetadata/metadatamanage/analysis/featuresearch‘ header = {"Content-Type":"application/json"} param = dict(provincecode="all",time=time,classify="all",_="1585277235758") r = requests.get(url=url, headers = header, params=param, verify = False) r_dict = json.loads(r.content) return r_dict # print(get_token()) # get_task_info("200292042823204864") # get_task_info(searchKeyword="employee") # get_task_ver(‘409470761318776832‘,‘ACC‘) # task_offline(‘409470761318776832‘) # del_task_ver(‘409470761318776832‘) # print(get_task_name_from_adma()) # logging.error("This is a error log.") # get_ods_task_info("test") # del_datasource("123") # stop_task("edw_tag_profile_autotest_tu8r_1580912760_cf064_hb")
原文:https://www.cnblogs.com/yahutiaotiao/p/12631936.html