摘要

一般情况下,我们使用PHP开发的中、小型WEB应用基本是同步执行的,即基本遵从:①用户访问特定URL->②PHP执行对应业务函数(校验、查询数据库)-> ③返回响应结果。但对于稍微大型WEB应用,第②步骤可能会包含更多待处理的任务(如:发送短信、发送邮件、保存日志、获取第三方API结果等),这些任务甚至是非常消耗时间和消耗计算资源的,但又是和返回用户执行结果基本无关的。我们将这类计算任务在第②步骤时通过一种特定的通信方式快速高效的传递到后端其他服务器,这样可以尽可能快的响应用户的执行结果,同时减小入口服务器的压力,后端服务器将会监听到传入的任务并开始执行。此文中,我们在Symfony4.2中配置和应用RabbitMQ实现的异步处理计算任务。

MQ

MQ全称为Message Queue(即:消息队列),MQ是一种应用程序之间的通信方式。应用程序通过向消息队列发送/接收消息(通常包含应用程序传递的各种参数)来实现通信。在相互通信的消息队列之间,需要由专门的消息队列服务器提供消息队列服务才能实现消息队列通信。下图是一个通过消息队列来完成PDF文档创建(通常非常耗时的任务)的示意图。
通过消息队列来完成PDF文档创建-引用自Medium:《RabbitMQ: Messaging that just works》

AMQP

AMQP全称为Advanced Message Queuing Protocol, 是一种在应用程序之间提供统一消息服务的应用层标准的高级消息队列协议,是一个开放的标准。

RabbitMQ

RabbitMQ是在Erlang(一种通用的具有并发优势的编程语言)基础上实现的基于Mozilla Public License开源协议的消息队列服务器。使用RabbitMQ可以Windows/Linux/Mac上免费搭建消息队列服务。

安装RabbitMQ

在这里我们假设您和我一样都是在Windows10电脑上进行测试和实践,如果您使用的是Linux或者Mac系统,您应该自觉的在之后的各类软件下载页面找到适合您操作系统的版本。
在安装RabbitMQ服务器之前您需要提前完成Erlang,登录Erlang官网即可找到下载页面(或点击这里)。
安装完成Erlang之后,您现在可以登录RabiitMQ官网下载适合您操作系统的安装包进行安装(可以点击这里
安装完成RabbitMQ之后,您需要找到安装RabbitMQ的安装目录下的sbin文件夹,进入文件夹并通过cmd窗口执行该文件夹下的插件管理命令以激活本地的消息队列服务管理功能。

CMD命令:rabbitmq-plugins.bat enable rabbitmq_management

演示-激活本地的消息队列服务管理功能

成功开启本地管理功能后,您现在可以通过本地电脑的浏览器访问监听于http://localhost:15672/的管理界面,并可使用默认的账户进行登录(Username: guest, Password: guest),登录后您可以查看到所有消息队列的信息。

访问http://localhost:15672/ 登录消息队列服务管理界面

Rabbitmq-bundle

rabbitmq-bundle是专门为Symfony框架整合的RabbitMQ服务扩展包,扩展包信息可以在Packagist上获取到。改扩展包要求您的Symfony框架版本>=2.3,打开Symfony开发控制台,使用Composer安装此扩展。

composer require php-amqplib/rabbitmq-bundle

提示安装成功后,您需要修改位于项目YourProject/config/packages/文件夹下的old_cound_rabbit_mq.yaml,取消部分默认配置的注释,取消后文件内容类似如下:

old_sound_rabbit_mq:
connections:
    default:
        url: '%env(RABBITMQ_URL)%'
producers:
    # use 'old_sound_rabbit_mq.task_producer' service to send data.
    task:
        connection:       default
        exchange_options: { name: 'task', type: direct }

consumers:
    task:
        connection:       default
        exchange_options: { name: 'task', type: direct }
        queue_options:    { name: 'task'}  #队列名称
        callback:         App\Consumer\TaskConsumer #消息回调类

以上配置定义了一个默认的名为task的消息生产者(producer)和一个名为task的消息消费者(consumer)。下面我们需要为Symfony配置链接到本地队列服务器,即修改项目目录下的.env配置文件,配置后部分内容类似如下:

###> php-amqplib/rabbitmq-bundle ###
RABBITMQ_URL=amqp://guest:guest@localhost:5672
###< php-amqplib/rabbitmq-bundle ###

至此,Symfony已经可以正常连接到本地队列服务器了。现在还需要为消费者配置回调类来处理接从队列中收到的消息根据old_cound_rabbit_mq.yaml配置文件内容,我们为默认的队列配置了位于App\Consumer\TaskConsumer的回调类,所以我们这里需要新增一个文件YourProject/src/Consumer/TaskConsumer.php,文件内容如下:

<?php

namespace App\Consumer;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;

class TaskConsumer implements ConsumerInterface
{
public function execute(AMQPMessage $msg)
{
    //Process picture upload.
    //$msg will be an instance of `PhpAmqpLib\Message\AMQPMessage` with the $msg->body being the data sent over RabbitMQ.
    #打印获取到的消息内容到控制台
    echo json_decode($msg->body, 1);
    
    #如果回调任务执行失败
    $callbackFalse = false;
    if ($callbackFalse) {
        #如果您的业务逻辑没有成功处理本次消息,您可以返回 false 来告诉队列服务您拒收本次消息并将消息保持在队列中以便于其他消费者调用或稍后调用。
        #任何不等于 false 的响应都将告诉队列服务您接收了消息,队列服务将会释放对应的消息
        return false;
    }
        // todo:: 在这里您可以书写更多的数据处理逻辑,例如:发送短信、邮件、Call第三方API,更新其他数据库的订单状态、写入日志等
    }
}

接下来,我们便可以新建一个Controller和方法来测试消息发送和接收了,使用Symfony的命令行新增一个MqController

php bin/console make:controller

成功后,我们修改下MqController文件的内容以便于测试,文件内容类似如下:

<?php

namespace App\Controller;

use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Routing\Annotation\Route;
use Symfony\Bundle\FrameworkBundle\Controller\Controller;

class MqController extends Controller
{
/**
 * @Route("/mq/test", name="mq")
 */
public function index(MessageBusInterface $bus)
{
     #组织消息内容 数组即可
     $msg = array('user_id' => 1235, 'image_path' => '/path/to/new/pic.png');
     #发送消息到队列
     $this->get('old_sound_rabbit_mq.task_producer')
         ->setContentType('application/json')
         ->publish(json_encode($msg));

     #返回给web请求者
     return $this->json(['status'=>'success send message']);
    }
}

消息发送者我们定义好了之后,现在我们可以在Symfony的命令行中执行命令来消费队列中的消息并执行对应的回调函数。

php bin/console rabbitmq:consumer task

您也可以指定消费消息的条数如消费50条后退出,则执行:

php bin/console rabbitmq:consumer -m 50 task

至此基本测试和整合就完成了,您可以通过old_cound_rabbit_mq.yaml配置多个队列和消费者回调类来适配您的业务需求。rabbitmq-bundle也提供了灵活的消息路由方式,可以路由消息到特定的节点或者内部函数。您可以参考rabbitmq-bundle的扩展包的使用配置示例来获取更多信息。
文中的多处代码均是我在我的项目实践工程中截取的,可能并不能直接搬移使用,或者可能给您造成困惑。本文主要是介绍RabbitMQ队列服务在Symfony4.2中的基本配置和应用示例,更多的介绍了MQ相关的基础知识。非常欢迎您通过留言版进行提问和发表不同意见。

标签: RabbitMQ, Symfony, 消息队列

仅有一条评论

  1. Symfony相关的消息队列资料确实太少了,对于我一个Symfony新手来说帮助太大了!

添加新评论