Task
EasySwoole 3.3.0+
異步任務(wù)放棄了 Swoole
的原生 task
,采用獨(dú)立組件實(shí)現(xiàn)。
相對于原生 Swoole Task
,easyswoole/task 組件實(shí)現(xiàn)了以下功能:
- 可以投遞閉包任務(wù)
- 可以在
TaskWorker
等其他自定義進(jìn)程繼續(xù)投遞任務(wù) - 實(shí)現(xiàn)任務(wù)限流與狀態(tài)監(jiān)控
安裝
composer require easyswoole/task
框架中使用
同步調(diào)用:
\EasySwoole\EasySwoole\Task\TaskManager::getInstance()->sync(function (){
echo 'sync';
});
異步調(diào)用:
\EasySwoole\EasySwoole\Task\TaskManager::getInstance()->async(function () {
echo 'async';
}, function ($reply, $taskId, $workerIndex) {
// $reply 返回的執(zhí)行結(jié)果
// $taskId 任務(wù)id
echo 'async success';
});
由于 php
本身就不能序列化閉包,該閉包投遞是通過反射該閉包函數(shù),獲取 php
代碼直接序列化 php
代碼,然后直接 eval
代碼實(shí)現(xiàn)的。
所以投遞閉包無法使用外部的對象引用,以及資源句柄,復(fù)雜任務(wù)請使用任務(wù)模板方法。
任務(wù)模版
自定義一個(gè)任務(wù)模版
<?php
namespace App\Task;
use EasySwoole\Task\AbstractInterface\TaskInterface;
class CustomTask implements TaskInterface
{
protected $data;
public function __construct($data)
{
// 保存投遞過來的數(shù)據(jù)
$this->data = $data;
}
public function run(int $taskId, int $workerIndex)
{
// 執(zhí)行邏輯
}
public function onException(\Throwable $throwable, int $taskId, int $workerIndex)
{
// 異常處理
}
}
如何使用
$task = \EasySwoole\EasySwoole\Task\TaskManager::getInstance();
// 投遞異步任務(wù)
$task->async(new CustomTask(['user' => 'custom']));
// 投遞同步任務(wù)
$data = $task->sync(new CustomTask(['user' => 'custom']));
投遞返回值
easyswoole/task
組件在 1.0.8
及以前版本支持,如下 4
個(gè)投遞返回值:
-
> 0
投遞成功(異步任務(wù)專屬,返回taskId
,同步任務(wù)直接返回run()
方法運(yùn)行之后返回的值) -
-1
task
進(jìn)程繁忙,投遞失敗 (已經(jīng)到達(dá)最大運(yùn)行數(shù)量maxRunningNum
) -
-2
投遞數(shù)據(jù)解包失敗,當(dāng)投遞數(shù)據(jù)傳輸時(shí)數(shù)據(jù)異常時(shí)會報(bào)錯(cuò),此錯(cuò)誤為組件底層錯(cuò)誤,一般不會出現(xiàn) -
-3
任務(wù)出錯(cuò) (該任務(wù)執(zhí)行時(shí)出現(xiàn)異常錯(cuò)誤,被組件攔截并輸出錯(cuò)誤)
在 1.0.9
~ 1.1.1
版本,除了支持上述 4
個(gè)投遞返回值,還新增支持了以下 2
個(gè)投遞返回值:
-
-4
投遞的任務(wù)數(shù)據(jù)不合法,一般是投遞了不能序列化的數(shù)據(jù)才會出現(xiàn)。 -
-5
投遞的任務(wù)在運(yùn)行時(shí)出錯(cuò)
在最新的版本及以后版本中,又新增支持了以下 2
個(gè)投遞返回值:
-
-6
投遞的任務(wù)數(shù)據(jù)包已過期,一般是Task
進(jìn)程比較繁忙時(shí)才會出現(xiàn)。 -
-7
投遞任務(wù)時(shí),任務(wù)運(yùn)行完成后沒有任何數(shù)據(jù)返回。一般是因?yàn)閳?zhí)行任務(wù)時(shí)間過長導(dǎo)致UnixSocket
超時(shí),才會出現(xiàn)。
獨(dú)立使用
該組件可獨(dú)立使用,代碼如下:
<?php
use EasySwoole\Task\Config;
use EasySwoole\Task\Task;
require_once __DIR__ . '/vendor/autoload.php';
/**
* 配置項(xiàng)中可以修改工作進(jìn)程數(shù)、臨時(shí)目錄,進(jìn)程名,最大并發(fā)執(zhí)行任務(wù)數(shù),異?;卣{(diào)等
*/
$config = new Config();
$task = new Task($config);
// 添加 swoole 服務(wù)
$http = new \Swoole\Http\Server("0.0.0.0", 9501);
// 注入 swoole 服務(wù),進(jìn)行創(chuàng)建 task 進(jìn)程
$task->attachToServer($http);
// 在 onrequest 事件中調(diào)用 task (其他地方也可以,這只是示例)
$http->on("request", function (\Swoole\Http\Request $request, \Swoole\Http\Response $response) use ($task) {
if (isset($request->get['sync'])) {
// 同步調(diào)用 task
$ret = $task->sync(function ($taskId, $workerIndex) {
return "{$taskId}.{$workerIndex}";
});
$response->end("sync result " . $ret);
} else if (isset($request->get['status'])) {
var_dump($task->status());
} else {
// 異步調(diào)用 task
$id = $task->async(function ($taskId, $workerIndex) {
\co::sleep(1);
var_dump("async id {$taskId} task run");
});
$response->end("async id {$id} ");
}
});
// 啟動(dòng)服務(wù)
$http->start();
版本強(qiáng)調(diào)
框架低版本升級為 EasySwoole 3.3.0+
,需要手動(dòng)進(jìn)行配置修改。
需要?jiǎng)h除 MAIN_SERVER.SETTING.task_worker_num
,MAIN_SERVER.SETTING.task_enable_coroutine
配置項(xiàng)。
請?jiān)陧?xiàng)目根目錄的 dev.php/produce.php
的 MAIN_SERVER
配置項(xiàng)中,增加 TASK
子配置項(xiàng):
<?php
// 這里省略
return [
// 這里省略 ...
'MAIN_SERVER' => [
// 這里省略 ...
'TASK' => [
'workerNum' => 4,
'maxRunningNum' => 128,
'timeout' => 15
]
],
// 這里省略 ...
];
Task管理
查看所有Task進(jìn)程的狀態(tài)
php easyswoole.php task status