drop table ma_schedue_task ; ---test_task(:1,:2) create table ma_schedue_task( created_by varchar2(100) default ‘system‘ not null, created_date date default sysdate not null, updated_by varchar2(100) default ‘system‘ not null, updated_date date default sysdate not null, id_ma_schedue_task varchar2(32) default sys_guid() not null, procedure_name varchar2(100), task_months number default 0 not null, task_days number default 0 not null, task_hours number default 0 not null , task_minutes number default 0 not null, first_time date , prev_time date, next_time date, is_effected varchar2(2) default ‘N‘ not null, has_param varchar2(2) default ‘N‘ not null, thread_num number default 1 not null, task_priority number default 100 not null ); comment on table ma_schedue_task is ‘任务配置表‘; comment on column ma_schedue_task.created_by is ‘创建人‘; comment on column ma_schedue_task.created_date is ‘创建时间‘; comment on column ma_schedue_task.updated_by is ‘更新人‘; comment on column ma_schedue_task.updated_date is ‘更新时间‘; comment on column ma_schedue_task.id_ma_schedue_task is ‘主键‘; comment on column ma_schedue_task.procedure_name is ‘过程名,可带参数‘; comment on column ma_schedue_task.task_months is ‘频率:月‘; comment on column ma_schedue_task.task_days is ‘频率:天‘; comment on column ma_schedue_task.task_hours is ‘频率:时‘; comment on column ma_schedue_task.task_minutes is ‘频率:分钟‘; comment on column ma_schedue_task.first_time is ‘首次执行时间‘; comment on column ma_schedue_task.prev_time is ‘上次执行时间‘; comment on column ma_schedue_task.next_time is ‘下次执行时间‘; comment on column ma_schedue_task.is_effected is ‘是否生效:N-否,Y-是‘; comment on column ma_schedue_task.has_param is ‘是否带有入参:N-否,Y-是‘; comment on column ma_schedue_task.thread_num is ‘线程数‘; comment on column ma_schedue_task.task_priority is ‘优先级‘; create index idx_id_ma_schedue_task on ma_schedue_task(id_ma_schedue_task) initrans 16; alter table ma_schedue_task add constraint pk_id_ma_schedue_task primary key(id_ma_schedue_task) using index idx_id_ma_schedue_task; drop table ma_schedue_param ; create table ma_schedue_param( created_by varchar2(100) default ‘system‘ not null, created_date date default sysdate not null, updated_by varchar2(100) default ‘system‘ not null, updated_date date default sysdate not null, id_ma_schedue_param varchar2(32) default sys_guid() not null, id_ma_schedue_task varchar2(32), param_order number, param_value varchar2(100), param_group number, run_status varchar2(2) ); comment on table ma_schedue_param is ‘任务配置表‘; comment on column ma_schedue_param.created_by is ‘创建人‘; comment on column ma_schedue_param.created_date is ‘创建时间‘; comment on column ma_schedue_param.updated_by is ‘更新人‘; comment on column ma_schedue_param.updated_date is ‘更新时间‘; comment on column ma_schedue_param.id_ma_schedue_param is ‘主键‘; comment on column ma_schedue_param.id_ma_schedue_task is ‘关联任务表ID‘; comment on column ma_schedue_param.param_order is ‘参数顺序‘; comment on column ma_schedue_param.param_value is ‘参数值‘; comment on column ma_schedue_param.param_group is ‘参数组别,指定那些参数是一起执行的‘; comment on column ma_schedue_param.run_status is ‘状态:W-等待执行,R-执行中,E-执行失败,S-执行成功‘; create index idx_id_ma_schedue_param on ma_schedue_param(id_ma_schedue_param) initrans 16; alter table ma_schedue_param add constraint pk_id_ma_schedue_param primary key(id_ma_schedue_param) using index idx_id_ma_schedue_param;
测试
--测试不含参数的类型 create or replace procedure test_task is begin dbms_output.put_line(‘hi,test_task‘); end test_task; DELETE FROM MA_SCHEDUE_TASK WHERE 1=1; insert into ma_schedue_task(procedure_name,task_days,first_time,has_param,thread_num,is_effected) values(‘test_task‘,1,sysdate-2/24,‘N‘,1,‘Y‘); COMMIT; DELETE FROM MA_SCHEDUE_TASK WHERE 1=1; insert into ma_schedue_task(procedure_name,task_months,first_time,has_param,thread_num,is_effected) values(‘test_task‘,1,sysdate-2/24,‘N‘,1,‘Y‘); COMMIT; DELETE FROM MA_SCHEDUE_TASK WHERE 1=1; insert into ma_schedue_task(procedure_name,Task_Hours,first_time,has_param,thread_num,is_effected) values(‘test_task‘,1,sysdate-2/24,‘N‘,1,‘Y‘); COMMIT; DELETE FROM MA_SCHEDUE_TASK WHERE 1=1; insert into ma_schedue_task(procedure_name,Task_Minutes,first_time,has_param,thread_num,is_effected) values(‘test_task‘,30,sysdate-2/24,‘N‘,1,‘Y‘); COMMIT; --测试含了入参的类型 create or replace procedure test_task_param(i_name in varchar2,i_point in varchar2) is begin dbms_output.put_line(i_name||‘积分‘||i_point); end test_task_param; declare v_task_id varchar2(32); begin --select sys_guid() into v_task_id ; delete from ma_schedue_task; delete from ma_schedue_param; insert into ma_schedue_task (procedure_name, task_days, first_time, has_param, thread_num, is_effected) values (‘test_task_param(:1,:2)‘, 1, sysdate - 2 / 24, ‘Y‘, 1, ‘Y‘) returning ID_MA_SCHEDUE_TASK into v_task_id; insert into ma_schedue_param (id_ma_schedue_task, param_order, param_value, param_group, run_status) values (v_task_id, 1, ‘乱世佳人‘, 1, ‘W‘); insert into ma_schedue_param (id_ma_schedue_task, param_order, param_value, param_group, run_status) values (v_task_id, 2, ‘10000‘, 1, ‘W‘); end; / select * from ma_schedue_task; select * from ma_schedue_param; declare begin pkg_schedue_task.dispatch_task; end; / select * from ma_error_log
任务调度
--grant DEBUG CONNECT SESSION to scott; sys;change_on_install as sysdba create or replace package pkg_schedue_task is procedure scan_task(i_current_time in date,o_ref out sys_refcursor); procedure exec_task( i_task_id in varchar2); procedure dispatch_task; procedure add_task (i_record in ma_schedue_task %rowtype); --配置无参数的方法 procedure add_task (i_record in ma_schedue_task %rowtype,i_param in ma_schedue_param %rowtype); --配置带参数的方法 end pkg_schedue_task; / create or replace package body pkg_schedue_task is procedure dispatch_task is v_ref sys_refcursor; v_task ma_utils.ma_task_record; begin scan_task(sysdate, v_ref); loop fetch v_ref into v_task; exit when v_ref%notfound; exec_task(v_task.task_id); end loop; close v_ref; end dispatch_task; --扫描需要执行的任务 procedure scan_task(i_current_time date, o_ref out sys_refcursor) is begin open o_ref for select t.id_ma_schedue_task, t.thread_num from ma_schedue_task t where t.is_effected = ‘Y‘ and ((t.first_time <= i_current_time and t.next_time is null) or (t.next_time is not null and t.next_time <= i_current_time)) order by t.task_priority asc; end scan_task; --执行任务 procedure exec_task(i_task_id in varchar2) is v_task ma_schedue_task%rowtype; v_no_task_exp exception; v_task_no_param_exp exception; ls_pn varchar2(1000) := ‘pkg_schedue_task.exec_task‘; v_current_time date; v_next_time date; v_group ma_schedue_param.param_group%type; v_sql varchar2(4000); v_cur number; v_result number; begin <<get_task>> begin --获取任务信息 select * into v_task from ma_schedue_task r where r.id_ma_schedue_task = i_task_id and r.is_effected = ‘Y‘ and rownum =1; exception when no_data_found then raise v_no_task_exp; end get_task; --获取任务下次执行时间,如果是第一次执行,则去初次执行时间 select decode(v_task.next_time, null, v_task.first_time, v_task.next_time) into v_current_time from dual; --根据设置的频度修改下次执行时间 v_next_time := v_current_time + v_task.TASK_MINUTES / (24 * 60) + v_task.TASK_hours / 24 + v_task.TASK_DAYS; v_next_time := add_months(v_next_time, v_task.task_months); update ma_schedue_task t set t.next_time = v_next_time, t.prev_time = sysdate where t.id_ma_schedue_task = i_task_id; commit; --如果任务不带参数,则直接过程,但此处存在SQL注入的可能 if v_task.has_param = ‘N‘ then ma_utils.exec_plsql_block(v_task.PROCEDURE_NAME); return; end if ; --获取处于等待状态的最小参数分组值 select min(r.param_group) into v_group from ma_schedue_param r where r.id_ma_schedue_task = i_task_id and r.run_status = ‘W‘; if v_group is null then raise v_task_no_param_exp; end if; update ma_schedue_param r set r.run_status = ‘R‘ where r.id_ma_schedue_task = i_task_id and r.param_group = v_group and r.run_status = ‘W‘; commit; v_sql := ‘ begin ‘ || rtrim(v_task.procedure_name, ‘;‘) || ‘;end;‘; v_cur := dbms_sql.open_cursor; dbms_sql.parse(v_cur, v_sql, dbms_sql.native); --绑定参数 for param in (select r.param_order, r.param_value from ma_schedue_param r where r.id_ma_schedue_task = i_task_id and r.param_group = v_group and r.run_status = ‘R‘ order by param_order) loop dbms_sql.bind_variable(v_cur, ‘:‘ || param.param_order, param.param_value); end loop; v_result := dbms_sql.execute(v_cur); dbms_sql.close_cursor(v_cur); update ma_schedue_param r set r.run_status = ‘S‘ where r.id_ma_schedue_task = i_task_id and r.param_group = v_group and r.run_status = ‘R‘; commit; exception when v_no_task_exp then ma_utils.add_error_log(ls_pn, i_task_id || ‘任务不存在‘, ‘INFO‘); when v_task_no_param_exp then ma_utils.add_error_log(ls_pn, i_task_id || ‘任务没有配置实参值‘, ‘ERROR‘); when others then update ma_schedue_param r set r.run_status = ‘E‘ where r.id_ma_schedue_task = i_task_id and r.param_group = v_group and r.run_status = ‘R‘; commit; ma_utils.add_error_log(ls_pn, i_task_id || substr(sqlerrm, 1, 300), ‘ERROR‘); end exec_task; end pkg_schedue_task; /
错误日志
drop table ma_error_log ; ---错误日志表 create table ma_error_log( created_by varchar2(100) default ‘system‘ not null, created_date date default sysdate not null, updated_by varchar2(100) default ‘system‘ not null, updated_date date default sysdate not null, id_ma_error_log varchar2(32) default sys_guid() not null, procedure_name varchar2(100), error_msg varchar2(2500), error_level varchar2(30) ); comment on table ma_error_log is ‘任务配置表‘; comment on column ma_error_log.created_by is ‘创建人‘; comment on column ma_error_log.created_date is ‘创建时间‘; comment on column ma_error_log.updated_by is ‘更新人‘; comment on column ma_error_log.updated_date is ‘更新时间‘; comment on column ma_error_log.id_ma_error_log is ‘主键‘; comment on column ma_error_log.procedure_name is ‘发生异常时调用的方法‘; comment on column ma_error_log.error_msg is ‘错误信息‘; comment on column ma_error_log.error_level is ‘错误级别:info,important,error‘; create index idx_id_ma_error_log on ma_error_log(id_ma_error_log) initrans 16; create index idx_error_created_date on ma_error_log(created_date) initrans 16; alter table ma_error_log add constraint pk_id_ma_error_log primary key(id_ma_error_log) using index idx_id_ma_error_log; --创建工具包 create or replace package ma_utils is type ma_task_record is record(task_id varchar2(32),thread_num number); procedure add_error_log(i_procedure_name in varchar2, i_error_msg in varchar2, i_error_level in varchar2); procedure exec_plsql_block(i_plsql in varchar2); end ma_utils; / create or replace package body ma_utils is procedure add_error_log(i_procedure_name in varchar2, i_error_msg in varchar2, i_error_level in varchar2) is pragma autonomous_transaction; begin insert into ma_error_log (procedure_name, error_msg, error_level) values (i_procedure_name, i_error_msg, i_error_level); commit; exception when others then rollback; end add_error_log; procedure exec_plsql_block(i_plsql in varchar2) is begin execute immediate ‘begin ‘ || rtrim(i_plsql, ‘;‘) || ‘ ; end ;‘; end exec_plsql_block; end ma_utils; /
原文:https://www.cnblogs.com/yhq1314/p/10613401.html