• 如何实现队列消费/自定义进程
    • 实现代码
      • 定义消费进程逻辑
      • 注册消费进程

    如何实现队列消费/自定义进程

    可能我们会经常遇见需要不断消费队列内内容的场景,我们以EasySwoole中自定义进程的方式,来实现这一功能。

    实现代码

    定义消费进程逻辑

    1. <?php
    2. /**
    3. * Created by PhpStorm.
    4. * User: Tioncico
    5. * Date: 2018/10/18 0018
    6. * Time: 9:43
    7. */
    8. namespace App\Process;
    9. use EasySwoole\Component\Process\AbstractProcess;
    10. use Swoole\Process;
    11. class Consumer extends AbstractProcess
    12. {
    13. private $isRun = false;
    14. public function run($arg)
    15. {
    16. // TODO: Implement run() method.
    17. /*
    18. * 举例,消费redis中的队列数据
    19. * 定时500ms检测有没有任务,有的话就while死循环执行
    20. */
    21. $this->addTick(500,function (){
    22. if(!$this->isRun){
    23. $this->isRun = true;
    24. $redis = new \redis();//此处为伪代码,请自己建立连接或者维护redis连接
    25. while (true){
    26. try{
    27. $task = $redis->lPop('task_list');
    28. if($task){
    29. // do you task
    30. }else{
    31. break;
    32. }
    33. }catch (\Throwable $throwable){
    34. break;
    35. }
    36. }
    37. $this->isRun = false;
    38. }
    39. var_dump($this->getProcessName().' task run check');
    40. });
    41. }
    42. public function onShutDown()
    43. {
    44. // TODO: Implement onShutDown() method.
    45. }
    46. public function onReceive(string $str, ...$args)
    47. {
    48. // TODO: Implement onReceive() method.
    49. }
    50. }

    注册消费进程

    在EasySwoole的全局事件中,注册消费进程。

    1. <?php
    2. use App\Consumer;
    3. use EasySwoole\EasySwoole\ServerManager;
    4. public static function mainServerCreate(EventRegister $register)
    5. {
    6. $allNum = 3;
    7. for ($i = 0 ;$i < $allNum;$i++){
    8. ServerManager::getInstance()->getSwooleServer()->addProcess((new Consumer("consumer_{$i}"))->getProcess());
    9. }
    10. }

    爬虫例子:https://github.com/HeKunTong/easyswoole3_demo