博文記錄
PHP 2022-08-29 19:41:41 850 3

在 Hyperf 开发项目过程中经常使用 Job 进行一些耗时任务处理,而 Job 的执行是在异步消费队列中,在异步消费进程启动时会对数据进行初始化存储到容器的服务类理,以供处理后续的 Job 任务。

当用户通过 HTTP 接口更新数据到数据库中时,此时异步消费进程中的内存数据并没有得到及时更新,这就会造成一些困扰。

如何解决这个问题呢?

  1. 在 Job 执行时,每次读取一次最新的配置信息。虽然不会有延迟,但是当 Job 过多时,读取数据库的频次会非常高,影响 Job 处理速度,以及网络资源消耗。
  2. 在 Job 执行时,判断上次更新的时间,如果超过预设时间则进行更新。缺点是依然有延迟。

希望能够通过一个事件的方式进行触发更新,当 HTTP 接口产生了数据变更,此时将内存数据进行更新。

但是 HTTP 接口所在的进程是多个 worker 进程,并不能直接对异步进程进行读写操作。通过系统自带的  $container 进行获取服务类,是获取当前 worker 进程内的,而不是异步消费进程的。

还有个思路,就是当异步消费进程配置为 1 个时,可以在配置更新时产生一个新 Job,在新 Job 里面对内存进行更新,因为 Job 执行所处的进程是异步消费进程。

但如果配置了多个消费进程,如 3 个,就无法通过这样的方式,因为 Job 无法确定会落在哪个进程里执行,只能让一个进程失效,无法实现更新所有进程。

配置文件在 async_queue.php 中配置消费进程  processes 进程个数。

解决方案

这里通过 swoole 自带的进程通信进行触发自定义消息事件,并创建触发器监听自定义消息事件,自定义触发器所在的进程也是异步消费进程,只需要触发器对自身进程进行更新就行了。

在调用触发端一次性获取所有进程,进行投递消息即可实现对所有异步消费进程进行更新。

创建一个自定义 Event

app/Event/BlackListUpdated.php

<?php

namespace App\Event;
class BlackListUpdated
{

    public $message = '';

    public function __construct(string $message)
    {
        $this->message = $message;
    }

}

 

创建自定义事件监听器

app/Listener/BlackListUpdatedListener.php

<?php

namespace App\Listener;


use App\Event\BlackListUpdated;
use App\Services\BlackListService;
use Hyperf\Contract\ContainerInterface;
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\Process\ProcessCollector;
use Hyperf\Event\Annotation\Listener;
use Hyperf\Framework\Event\OnPipeMessage;
use Hyperf\Process\Event\PipeMessage as UserProcessPipeMessage;

/**
 * @Listener
 *
 * Class BlackListUpdatedListener
 * @package App\Listener
 */
class BlackListUpdatedListener implements ListenerInterface
{
    protected $container;

    public function __construct(ContainerInterface $container)
    {
        $this->container = $container;
    }


    public function listen(): array
    {
        // 返回一個該監聽器要監聽的事件陣列,可以同時監聽多個事件
        return [
            OnPipeMessage::class,
            UserProcessPipeMessage::class,
        ];
    }

    /**
     * @param object $event
     */
    public function process(object $event)
    {
        // PipeMessage
        if ($event instanceof OnPipeMessage || $event instanceof UserProcessPipeMessage) {
            if($event->data instanceof BlackListUpdated){
                $this->container->get(BlackListService::class)->flushKeyword();
            }
        }

    }
}

这里不要监听 BlackListUpdated 事件,因为并不能给所有进程投递 BlackListUpdated 事件。

在 process 方法执行中判断消息数据类型对容器数据进行更新。

消息事件投递端

在 HTTP 接口中对数据更新后进行投递事件。

<?php

//...
    /**
     * 加入关键词
     *
     * @param Request $request
     * @return array
     */
    public function addKeyword(Request $request): array
    {
        $keyword = $request->input('keyword');
        $this->blackListService->addKeyword($keyword);

        if (class_exists(ProcessCollector::class) && !ProcessCollector::isEmpty()) {
            $processes = ProcessCollector::all();
            if ($processes) {
                $string = serialize(new BlackListUpdated('update keyword'. date('Y-m-d H:i:s')));
                foreach ($processes as $process) {
                    $process->exportSocket()->send($string, 10);
                }
            }
        }

        return eeJson('add keyword into blacklist service');
    }

//...
Profile Picture
大雁养老

- From Web - 2022-10-18 11:49

学些了学习了


Profile Picture
buy cialis online using paypal

- From OS X Safari - 2月前

Appreciating the time and energy you put into your blog and in depth information you present. It's great to come across a blog every once in a while that isn't the same unwanted rehashed material. Great read! I've bookmarked your site and I'm including your RSS feeds to my Google account.

100 Likes
Comment

Profile Picture
buy cialis daily online

- From Windows Edge - 1月前

Good site you have here.. It's difficult to find excellent writing like yours these days. I really appreciate people like you! Take care!!

41 Likes
Comment

media iamge
StudioEIM - 冒险者讲习所
0:00