RabbitMQ 与 PHP 应用
简介
MQ 全称为 Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来连接它们。
消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。
排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
RabbitMQ is a message broker: it accepts and forwards messages. You can think about it as a post office: when you put the mail that you want posting in a post box, you can be sure that the letter carrier will eventually deliver the mail to your recipient. In this analogy, RabbitMQ is a post box, a post office, and a letter carrier.
翻译:RabbitMQ 是一个消息代理:它接受和转发消息。您可以将其视为邮局:当您将要邮寄的邮件放入邮箱时,您可以确定邮递员最终会将邮件递送给您的收件人。在这个类比中,RabbitMQ 是一个邮箱、一个邮局和一个邮递员。
RabbitMQ 场景
1. 解藕
订单系统:用户下单后需要通知库存系统。
传统做法:订单系统调用库存系统的 API(一般直接操作数据库)
缺点:
- 订单系统和库存系统耦合
- 高并发情况下会给库存系统造成巨大的请求压力(性能不佳甚至宕机)
- 如果库存系统无法访问(宕机等)导致库存系统库存减失败,从而导致订单系统失败
引入消息队列 (RabbitMQ):
- 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功
- 库存系统:订阅下单的消息,采用拉 / 推的方式,获取下单信息,库存系统根据下单信息,进行库存操作
- 假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦
- 为了保证库存肯定有,可以将队列大小设置成库存数量,或者采用其他方式解决。
基于消息的模型,关心的是“通知”,而非“处理”
短信、邮件通知、缓存刷新等操作使用消息队列进行通知
消息队列和 RPC 的区别与比较:
RPC: 异步调用,及时获得调用结果,具有强一致性结果,关心业务调用处理结果。
消息队列:两次异步 RPC 调用,将调用内容在队列中进行转储,并选择合适的时机进行投递(错峰流控)
2. 异步提升效率
场景:短信、邮件发送
用户注册成功之后,需要发送短信和邮件通知。传统的做法有两种 1. 串行的方式;2. 并行方式
串行的方式:处理周期长
并行方式: 相比较串行的方式,可以提升处理时间
引入消息队列 将发送邮件、短信不必要的业务逻辑异步处理,减少处理周期,减轻数据库压力,提升性能
3. 流量削峰
流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛
应用场景:系统其他时间 A 系统每秒请求量就 100 个,系统可以稳定运行。系统每天晚间八点有秒杀活动,每秒并发请求量增至 1 万条,但是系统最大的处理能力只能每秒处理 1000 个请求,于是系统崩溃,服务器宕机。
之前架构:大量用户(100 万用户)通过浏览器在晚上八点高峰期同时参与秒杀活动。大量的请求涌入我们的系统中,高峰期达到每秒钟 5000 个请求,大量的请求打到 MySQL 上,每秒钟预计执行 3000 条 SQL。
但是一般的 MySQL 每秒钟扛住 2000 个请求就不错了,如果达到 3000 个请求的话可能 MySQL 直接就瘫痪了,从而系统无法被使用。但是高峰期过了之后,就成了低峰期,可能也就 1 万用户访问系统,每秒的请求数量也就 50 个左右,整个系统几乎没有任何压力。
引入 MQ:100 万用户在高峰期的时候,每秒请求有 5000 个请求左右,将这 5000 请求写入 MQ 里面,系统 A 每秒最多只能处理 2000 请求,因为 MySQL 每秒只能处理 2000 个请求。
系统 A 从 MQ 中慢慢拉取请求,每秒就拉取 2000 个请求,不要超过自己每秒能处理的请求数量即可。MQ,每秒 5000 个请求进来,结果只有 2000 个请求出去,所以在秒杀期间(将近一小时)可能会有几十万或者几百万的请求积压在 MQ 中。
这个短暂的高峰期积压是没问题的,因为高峰期过了之后,每秒就只有 50 个请求进入 MQ 了,但是系统还是按照每秒 2000 个请求的速度在处理,所以说,只要高峰期一过,系统就会快速将积压的消息消费掉。
引入消息队列的优缺点
优点:
在特殊场景下有其对应的好处,解耦、异步、削峰
缺点:
-
系统的可用性降低
系统引入的外部依赖越多,系统越容易挂掉,本来只是 A 系统调用 BCD 三个系统接口就好,ABCD 四个系统不报错整个系统会正常运行。引入了 MQ 之后,虽然 ABCD 系统没出错,但 MQ 挂了以后,整个系统也会崩溃。 -
系统的复杂性提高
引入了 MQ 之后,需要考虑的问题也变得多了,如何保证消息没有重复消费?如何保证消息不丢失?怎么保证消息传递的顺序?
-
一致性问题
A 系统发送完消息直接返回成功,但是 BCD 系统之中若有系统写库失败,则会产生数据不一致的PHP – RabbitMQ 消息队列的应用
-
安装 php-amqplib
➜ mkdir rabbitmq-demo ➜ composer init ➜ composer require php-amqplib/php-amqplib
-
创建 receiver.php 和 send.php
-
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$queue = 'order-sys';
$connection = new AMQPStreamConnection(
'localhost',
5672,
"admin",
"admin",
'my-rabbitmq-vhost'
);
$channel = $connection->channel();
$channel->queue_declare($queue, false, true, false, false);
for ($i = 0; $i <= 100; $i++) {
$arr = [
'id' => 'message_' . $i,
'order_id' => str_replace('.', '', microtime(true) . mt_rand(10, 99)) . $i,
'content' => 'content-' . time(),];
$data = json_encode($arr);
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]);
$channel->basic_publish($msg, '', $queue);
echo 'Send message: ' . $data .PHP_EOL;
}
$channel->close();
$connection->close();
receive.php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$queue = 'order-sys';
$connection = new AMQPStreamConnection(
'localhost',
5672,
'admin',
'admin',
'my-rabbitmq-vhost'
);
$channel = $connection->channel();
$channel->queue_declare($queue, false, true, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C ' . PHP_EOL;
$callback = function ($msg) {
echo " Received message:", $msg->body, PHP_EOL;
sleep(1);
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_qos(null, 1, null); // 处理和确认完消息后再消费新的消息
$channel->basic_consume($queue, '', false, false, false, false, $callback);
while (count($channel->callbacks)) {$channel->wait();
}
$channel->close();
$connection->close();