异步任务调度二
/// <author>cxg 2020-7-14</author>
(*使用:
unit Unit1;
interface
uses tasks, MsgPack,
Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
Dialogs, StdCtrls;
type
TForm1 = class(TForm)
Button1: TButton;
procedure Button1Click(Sender: TObject);
private
{ Private declarations }
tasks: TThreadCfg;
public
{ Public declarations }
procedure callback(task: TMsgPack);
end;
var
Form1: TForm1;
implementation
{$R *.dfm}
procedure TForm1.Button1Click(Sender: TObject);
var task: TMsgPack;
queue: TTaskQueue;
begin
task := TMsgPack.Create;
task.Force(‘f1‘).AsString := ‘测试‘;
queue := TTaskQueue.Create;
queue.enQueue(task);
tasks := TThreadCfg.Create(1, queue);
tasks.onCallback := callback;
tasks.newThreads;
end;
procedure TForm1.callback(task: TMsgPack);
begin
Caption := task.force(‘f1‘).AsString;
tasks.Free;
end;
end.
*)
unit tasks;
interface
uses
Windows, MsgPack, Contnrs,
SyncObjs, Classes,
SysUtils;
type
TCallBack = procedure(task: TMsgPack) of object;
type
TTaskQueue = class //任务队列(线程安全)
private
fQueue: TQueue;
fCS: TCriticalSection;
public
constructor Create;
destructor Destroy; override;
procedure enQueue(task: Pointer);
function deQueue: Pointer;
end;
type
TThreadCfg = class //管理 工作线程
private
fQueue: TTaskQueue;
fCallBack: TCallBack;
fThreadNum: Integer;
fWorkers: array of TThread;
public
constructor Create(const threadNum: Integer; const queue: TTaskQueue);
destructor Destroy; override;
procedure newThreads;
property onCallback: TCallBack read fCallBack write fCallBack;
end;
type
TWorkThread = class(TThread) //工作线程
private
fConfig: TThreadCfg;
public
constructor Create(cfg: TThreadCfg);
destructor Destroy; override;
procedure Execute; override;
end;
implementation
function GetCPUNum: Integer;
var
si: SYSTEM_INFO;
begin
GetSystemInfo(si);
Result := si.dwNumberOfProcessors;
end;
{ TTaskQueue }
constructor TTaskQueue.Create;
begin
fQueue := TQueue.Create;
fCS := TCriticalSection.Create;
end;
function TTaskQueue.deQueue: Pointer;
begin
fCS.Enter;
Result := fQueue.Pop;
fCS.Leave;
end;
destructor TTaskQueue.Destroy;
begin
FreeAndNil(fQueue);
FreeAndNil(fCS);
inherited;
end;
procedure TTaskQueue.enQueue(task: Pointer);
begin
fCS.Enter;
fQueue.Push(task);
fCS.Leave;
end;
{ TThreadCfg }
constructor TThreadCfg.Create(const threadNum: Integer;
const queue: TTaskQueue);
begin
fThreadNum := threadNum;
fQueue := queue;
if fThreadNum = 0 then
fThreadNum := GetCPUNum;
SetLength(fWorkers, fThreadNum);
end;
destructor TThreadCfg.Destroy;
var i: Integer;
begin
for i := 0 to fThreadNum - 1 do //停止并释放工作线程
begin
fWorkers[i].Terminate;
fWorkers[i].WaitFor;
fWorkers[i].Free;
end;
fQueue.Free; //释放队列
inherited;
end;
procedure TThreadCfg.newThreads;
var i: Integer;
begin
for i := 0 to fThreadNum - 1 do
begin
fWorkers[i] := TWorkThread.Create(Self);
fWorkers[i].Resume;
end;
end;
{ TWorkThread }
constructor TWorkThread.Create(cfg: TThreadCfg);
begin
inherited Create(True);
FreeOnTerminate := true;
fConfig := cfg;
end;
destructor TWorkThread.Destroy;
begin
inherited;
end;
procedure TWorkThread.Execute;
var pack: TMsgPack;
begin
while not Self.Terminated do
begin
if fConfig.fQueue.fQueue.Count > 0 then
begin
pack := TMsgPack(fConfig.fQueue.deQueue);
if Assigned(fConfig.fCallBack) then
begin
fConfig.fCallBack(pack);
pack.Free; //释放
end;
end;
Sleep(1);
SwitchToThread;
end;
end;
end.
原文:https://www.cnblogs.com/hnxxcxg/p/13298034.html