function foo() { $db=new Db(); $result=(yield $db->query()); yield $result; }
class HttpServer implements Server { private $swooleHttpServer; public function __construct(\swoole_http_server $swooleHttpServer) { $this->swooleHttpServer = $swooleHttpServer; } public function start() { $this->swooleHttpServer->on(‘start‘, [$this, ‘onStart‘]); $this->swooleHttpServer->on(‘shutdown‘, [$this, ‘onShutdown‘]); $this->swooleHttpServer->on(‘workerStart‘, [$this, ‘onWorkerStart‘]); $this->swooleHttpServer->on(‘workerStop‘, [$this, ‘onWorkerStop‘]); $this->swooleHttpServer->on(‘workerError‘, [$this, ‘onWorkerError‘]); $this->swooleHttpServer->on(‘task‘, [$this, ‘onTask‘]); $this->swooleHttpServer->on(‘finish‘, [$this, ‘onFinish‘]); $this->swooleHttpServer->on(‘request‘, [$this, ‘onRequest‘]); $this->swooleHttpServer->start(); } onRequest方法: public function onRequest(\swoole_http_request $request, \swoole_http_response $response) { $requestHandler = new RequestHandler($request, $response); $requestHandler->handle(); } 在ReqeustHandler中执行handle方法,来解析请求的路由,并创建控制器,调用相应的方法,相 public function handle() { $this->context = new Context($this->request, $this->response, $this->getFd()); $this->router = new Router($this->request); try { if (false === $this->router->parse()) { $this->response->output(‘‘); return; } $coroutine = $this->doRun(); $task = new Task($coroutine, $this->context); $task->run(); } catch (\Exception $e) { PcsExceptionHandler::handle($e, $this->response); } } private function doRun() { $ret = (yield $this->dispatch()); yield $this->response->send($ret); }
namespace Pcs\Coroutine; use Pcs\Network\Context\Context; class Task { private $coroutine; private $context; private $status; private $scheduler; private $sendValue; public function __construct(\Generator $coroutine, Context $context) { $this->coroutine = $coroutine; $this->context = $context; $this->scheduler = new Scheduler($this); } public function run() { while (true) { try { $this->status = $this->scheduler->schedule(); switch ($this->status) { case TaskStatus::TASK_WAIT: echo "task status: TASK_WAIT\n"; return null; case TaskStatus::TASK_DONE: echo "task status: TASK_DONE\n"; return null; case TaskStatus::TASK_CONTINUE; echo "task status: TASK_CONTINUE\n"; break; } } catch (\Exception $e) { $this->scheduler->throwException($e); } } } public function setCoroutine($coroutine) { $this->coroutine = $coroutine; } public function getCoroutine() { return $this->coroutine; } public function valid() { if ($this->coroutine->valid()) { return true; } else { return false; } } public function send($value) { $this->sendValue = $value; $ret = $this->coroutine->send($value); return $ret; } public function getSendVal() { return $this->sendValue; } } Task依赖于Generator对象$coroutine,在Task类中定义了一些get/set方法,以及一些Generator的方法,Task::run()方法用来执行对协程的调度,调度行为由Schedule来执行,每次调度都会返回当前这次调度的状态。多个协程共用一个调度器,而这里run方法会为每个协程创建一个调度器,原因是每个协程都是一个客户端的请求,使用一个单独的调度器能减少相互间的影响,而且多个协程之间的调度顺序是swoole来处理的,这里的调度器不用关心。下面给出调度的代码: namespace Pcs\Coroutine; class Scheduler { private $task; private $stack; const SCHEDULE_CONTINUE = 10; public function __construct(Task $task) { $this->task = $task; $this->stack = new \SplStack(); } public function schedule() { $coroutine = $this->task->getCoroutine(); $value = $coroutine->current(); $status = $this->handleSystemCall($value); if ($status !== self::SCHEDULE_CONTINUE) return $status; $status = $this->handleStackPush($value); if ($status !== self::SCHEDULE_CONTINUE) return $status; $status = $this->handleAsyncJob($value); if ($status !== self::SCHEDULE_CONTINUE) return $status; $status = $this->handelYieldValue($value); if ($status !== self::SCHEDULE_CONTINUE) return $status; $status = $this->handelStackPop(); if ($status !== self::SCHEDULE_CONTINUE) return $status; return TaskStatus::TASK_DONE; } public function isStackEmpty() { return $this->stack->isEmpty(); } private function handleSystemCall($value) { if (!$value instanceof SystemCall) { return self::SCHEDULE_CONTINUE; } } private function handleStackPush($value) { if (!$value instanceof \Generator) { return self::SCHEDULE_CONTINUE; } $coroutine = $this->task->getCoroutine(); $this->stack->push($coroutine); $this->task->setCoroutine($value); return TaskStatus::TASK_CONTINUE; } private function handleAsyncJob($value) { if (!is_subclass_of($value, Async::class)) { return self::SCHEDULE_CONTINUE; } $value->execute([$this, ‘asyncCallback‘]); return TaskStatus::TASK_WAIT; } public function asyncCallback($response, $exception = null) { if ($exception !== null && $exception instanceof \Exception ) { $this->throwException($exception, true); } else { $this->task->send($response); $this->task->run(); } } private function handelYieldValue($value) { if (!$this->task->valid()) { return self::SCHEDULE_CONTINUE; } $ret = $this->task->send($value); return TaskStatus::TASK_CONTINUE; } private function handelStackPop() { if ($this->isStackEmpty()) { return self::SCHEDULE_CONTINUE; } $coroutine = $this->stack->pop(); $this->task->setCoroutine($coroutine); $value = $this->task->getSendVal(); $this->task->send($value); return TaskStatus::TASK_CONTINUE; } public function throwException($e, $isFirstCall = false) { if ($this->isStackEmpty()) { $this->task->getCoroutine()->throw($e); return; } try { if ($isFirstCall) { $coroutine = $this->task->getCoroutine(); } else { $coroutine = $this->stack->pop(); } $this->task->setCoroutine($coroutine); $coroutine->throw($e); $this->task->run(); } catch (\Exception $e) { $this->throwException($e); } } }
function funcA() { return funcB(); } function genA() { yield genB(); }
private function handleAsyncJob($value) { if (!is_subclass_of($value, Async::class)) { return self::SCHEDULE_CONTINUE; } $value->execute([$this, ‘asyncCallback‘]); return TaskStatus::TASK_WAIT; } public function asyncCallback($response, $exception = null) { if ($exception !== null && $exception instanceof \Exception ) { $this->throwException($exception, true); } else { $this->task->send($response); $this->task->run(); } } 当协程调度的返回值是继承了Async的子类或者是实现了Asycn接口的实例的时候,会执行Async的execute方法。这里用mysqli数据库查询类举例。 public function execute(callable $callback) { $this->callback = $callback; $serv = ServerHolder::getServer(); $serv->task($this->sql, -1, [$this, ‘queryReady‘]); } public function queryReady(\swoole_http_server $serv, $task_id, $data) { $queryResult = unserialize($data); $exception = null; if ($queryResult->errno != 0) { $exception = new \Exception($queryResult->error); } call_user_func_array($this->callback, [$queryResult, $exception]); }
private function handelYieldValue($value) { if (!$this->task->valid()) { return self::SCHEDULE_CONTINUE; } $ret = $this->task->send($value); return TaskStatus::TASK_CONTINUE; }
原文:https://www.cnblogs.com/wadhf/p/11823333.html