FastCacheQueue
EasySwoole FastCache組件在>= 1.2.1
的時候新增類似· beanstalkd消息隊列 ·特性。
- 可以創建多個queue
- 支持延遲投遞
- 任務超時恢復執行
- 任務重發執行
- 任務最大重發次數
- 支持putJob、delayJob、releaseJob、reserveJob、buryJob、kickJob等命令
基本使用
FastCacheQueue依托于FastCache,具體安裝請查看FastCache
服務注冊
更新后,EasySwoole\FastCache\CacheProcessConfig類多出以下方法
/** 設置進程最大內存 默認512M */
public function setMaxMem(string $maxMem): void
/** 設置消息隊列保留時間 默認60s (取出任務后沒有及時確認會重新放回隊列) */
public function setQueueReserveTime(int $queueReserveTime): void
/** 設置消息隊列最大重發次數 默認10 達到次數后重發將會被丟棄 */
public function setQueueMaxReleaseTimes(int $queueMaxReleaseTimes): void
開始使用
下文示例代碼的Job和Cache都使用以下命名空間
use EasySwoole\FastCache\Cache;
use EasySwoole\FastCache\Job;
投遞任務
投遞成功之后 將會返回該任務的jobId。
沒有失敗情況,除非fastCache注冊注冊失敗。
$job = new Job();
$job->setData("siam"); // 任意類型數據
$job->setQueue("siam_queue");
$jobId = Cache::getInstance()->putJob($job);
var_dump($jobId);
取出任務
可以開啟自定義進程當消費者,循環監聽隊列,執行任務處理。
注意:任務執行完成一定要有一個結果。要么刪除該任務,要么重發。否則當任務取出一定時間后(默認60s)會自動放回隊列中。
$job = Cache::getInstance()->getJob('siam_queue');// Job對象或者null
if ($job === null){
echo "沒有任務\n";
}else{
// 執行業務邏輯
var_dump($job);
// 執行完了要刪除或者重發,否則超時會自動重發
Cache::getInstance()->deleteJob($job);
}
清空ready任務隊列
var_dump(Cache::getInstance()->flushReadyJobQueue('siam_queue'));
var_dump(Cache::getInstance()->jobQueueSize('siam_queue'));
延遲執行任務
$job = new Job();
$job->setData("siam");
$job->setQueue("siam_queue_delay");
$job->setDelay(5);// 延時5s
$jobId = Cache::getInstance()->putJob($job);
var_dump($jobId);
// 馬上取會失敗 隔5s取才成功
$job = Cache::getInstance()->getJob('siam_queue_delay');
var_dump($job);
刪除任務
可以是由getJob取出的對象,也可以自己聲明Job對象,傳入JobId來刪除。
$job = new Job();
$job->setJobId(1);
$job->setQueue('siam_queue_delay');
Cache::getInstance()->deleteJob($job);
任務重發
任務執行失敗,或者某些場景需要重新執行,則可以重發。
重發時,可以指定是否延遲執行。
// get出來的任務執行失敗可以重發
$job = new Job();
$job->setData("siam");
$job->setQueue("siam_queue");
$jobId = Cache::getInstance()->putJob($job);
$job = Cache::getInstance()->getJob('siam_queue');
if ($job === null){
echo "沒有任務\n";
}else{
// 執行業務邏輯
$doRes = false;
if (!$doRes){
// 業務邏輯失敗,需要重發
// 如果延遲隊列需要馬上重發,在這里需要清空delay屬性
// $job->setDelay(0);
// 如果普通隊列需要延遲重發,則設置delay屬性
// $job->setDelay(5);
$res = Cache::getInstance()->releaseJob($job);
var_dump($res);
}else{
// 執行完了要刪除或者重發,否則超時會自動重發
Cache::getInstance()->deleteJob($job);
}
}
返回現在有什么隊列
$queues = Cache::getInstance()->jobQueues();
var_dump($queues);
返回某個隊列的長度
$queueSize = Cache::getInstance()->jobQueueSize("siam_queue");
$queueSize2 = Cache::getInstance()->jobQueueSize("siam_queue_delay");
var_dump($queueSize);
var_dump($queueSize2);
清空隊列 可指定名稱
// 清空全部
$res = Cache::getInstance()->flushJobQueue();
var_dump($res);
// 清空siam_queue隊列
$res = Cache::getInstance()->flushJobQueue('siam_queue');
var_dump($res);
將任務改為延遲狀態
//添加任務
$job = new Job();
$job->setData("LuffyQAQ");
$job->setQueue("LuffyQAQ_queue_delay");
$jobId = Cache::getInstance()->putJob($job);
//方法一 直接傳入jobId
$job->setJobId($jobId);
$job->setDelay(30);
var_dump(Cache::getInstance()->delayJob($job));
//方法二 取出任務
$job = Cache::getInstance()->getJob('LuffyQAQ_queue_delay');
$job->setDelay(30);
var_dump(Cache::getInstance()->delayJob($job));
//使用jobQueueSize查看隊列長度
$queueSize = Cache::getInstance()->jobQueueSize("LuffyQAQ_queue_delay");
var_dump($queueSize);
從延遲執行隊列中拿取
//傳入隊列名
var_dump(Cache::getInstance()->getDelayJob('LuffyQAQ_queue_delay'));
清空delay任務隊列
var_dump(Cache::getInstance()->flushDelayJobQueue('LuffyQAQ_queue_delay'));
var_dump(Cache::getInstance()->jobQueueSize('LuffyQAQ_queue_delay'));
將任務改為保留狀態
//添加任務
$job = new Job();
$job->setData("LuffyQAQ");
$job->setQueue("LuffyQAQ_queue_reserve");
$jobId = Cache::getInstance()->putJob($job);
//方法一 直接傳入jobId
$job->setJobId($jobId);
var_dump(Cache::getInstance()->reserveJob($job));
//方法二 取出任務
$job = Cache::getInstance()->getJob('LuffyQAQ_queue_reserve');
var_dump(Cache::getInstance()->reserveJob($job));
//使用jobQueueSize查看隊列長度
$queueSize = Cache::getInstance()->jobQueueSize("LuffyQAQ_queue_reserve");
var_dump($queueSize);
從保留隊列中拿取
//傳入隊列名
var_dump(Cache::getInstance()->getReserveJob('LuffyQAQ_queue_reserve'));
清空reserve任務隊列
var_dump(Cache::getInstance()->flushReserveJobQueue('LuffyQAQ_queue_reserve'));
var_dump(Cache::getInstance()->jobQueueSize('LuffyQAQ_queue_reserve'));
將任務改為埋藏狀態
$job = new Job();
$job->setQueue('LuffyQAQ_queue_bury');
$job->setData('LuffyQAQ');
$jobId = Cache::getInstance()->putJob($job);
$job->setJobId($jobId);
var_dump(Cache::getInstance()->buryJob($job));
//使用jobQueueSize查看隊列長度
$queueSize = Cache::getInstance()->jobQueueSize("LuffyQAQ_queue_bury");
var_dump($queueSize);
從埋藏隊列中拿取
//傳入隊列名
var_dump(Cache::getInstance()->getBuryJob('LuffyQAQ_queue_bury'));
將埋藏隊列任務恢復到ready中
var_dump(Cache::getInstance()->kickJob($job));
清空bury任務隊列
var_dump(Cache::getInstance()->flushBuryJobQueue('LuffyQAQ_queue_bury'));
var_dump(Cache::getInstance()->jobQueueSize('LuffyQAQ_queue_bury'));