目录
在使用 RabbitMQ 之前,你要安装好 RabbitMQ 服务,具体安装方法可以参考 windows下安装RabbitMQ
1、安装扩展
进入TP5 更目录下,输入命令安装:
composer require php-amqplib/php-amqplib
2、自定义命令
TP5 的自定义命令,这里也简单说下。
第一步:
创建命令类文件,新建 application/api/command/Test.php。
<?php namespace app\\api\\command; use think\\console\\Command; use think\\console\\Input; use think\\console\\Output; /** * 自定义命令测试 */ class Test extends Command { /** * 配置 */ protected function configure() { // 设置命令的名称和描述 $this->setName(\'test\')->setDescription(\'这是一个测试命令\'); } /** * 执行 */ protected function execute(Input $input, Output $output) { $output->writeln(\"测试命令\"); } }
这个文件定义了一个叫test的命令,备注为 这是一个测试命令,执行命令会输出:test command。
第二步:
配置 command.php文件,在 application/command.php文件中添加命令。
<?php return [ \'app\\api\\command\\Test\', ];
第三步:
测试命令,在项目根目录下输入命令:
php think test
回车运行之后输出:
test command
到这里,自定义命令就结束了,test命令就自定义成功了。
3、rabbitmq服务端
下来我们自定义 RabbitMQ 启动命令,守护进程运行,启动 rabbirmq 服务端接收消息。
在 application/api/command 目录下,新建 Ramq.php 文件,在执行命令的方法中,调用 RabbitMQ 启动守护进程方法即可。
<?php namespace app\\api\\command; use PhpAmqpLib\\Connection\\AMQPStreamConnection; use think\\console\\Command; use think\\console\\Input; use think\\console\\Output; /** * RabbitMq 启动命令 */ class Ramq extends Command { protected $consumerTag = \'customer\'; protected $exchange = \'xcuser\'; protected $queue = \'xcmsg\'; protected function configure() { $this->setName(\'ramq\')->setDescription(\'rabbitmq\'); } protected function execute(Input $input, Output $output) { $output->writeln(\"消息队列开始\"); $this->start(); // 指令输出 $output->writeln(\'消费队列结束\'); } /** * 关闭 */ function shutdown($channel, $connection) { $channel->close(); $connection->close(); } /** * 回调处理信息 */ function process_message($message) { if ($message->body !== \'quit\') { echo $message->body; } //手动应答 $message->delivery_info[\'channel\']->basic_ack($message->delivery_info[\'delivery_tag\']); if ($message->body === \'quit\') { $message->delivery_info[\'channel\']->basic_cancel($message->delivery_info[\'consumer_tag\']); } } /** * 启动 守护进程运行 */ public function start() { $host = \'127.0.0.1\'; $port = 5672; $user = \'guest\'; $pwd = \'guest\'; $vhost = \'/\'; $connection = new AMQPStreamConnection($host, $port, $user, $pwd, $vhost); $channel = $connection->channel(); $channel->queue_declare($this->queue, false, true, false, false); $channel->exchange_declare($this->exchange, \'direct\', false, true, false); $channel->queue_bind($this->queue, $this->exchange); $channel->basic_consume($this->queue, $this->consumerTag, false, false, false, false, array($this, \'process_message\')); register_shutdown_function(array($this, \'shutdown\'), $channel, $connection); while (count($channel->callbacks)) { $channel->wait(); } } }
在application/command.php文件中,添加rabbitmq自定义命令。
return [ \'app\\api\\command\\Ramq\',// rabbitmq ];
4、发送端
最后,我们再写发送消息的控制器,实现消息队列,具体代码如下:
<?php namespace app\\api\\controller; use PhpAmqpLib\\Connection\\AMQPStreamConnection; use PhpAmqpLib\\Message\\AMQPMessage; use think\\Controller; /** * 发送端 */ class MessageQueue extends Controller { const exchange = \'xcuser\'; const queue = \'xcmsg\'; /** * 发送消息 */ public function pushMessage($data) { $host = \'127.0.0.1\'; $port = 5672; $user = \'guest\'; $pwd = \'guest\'; $vhost = \'/\'; $connection = new AMQPStreamConnection($host, $port, $user, $pwd, $vhost); $channel = $connection->channel(); $channel->exchange_declare(self::exchange, \'direct\', false, true, false); $channel->queue_declare(self::queue, false, true, false, false); $channel->queue_bind(self::queue, self::exchange); $messageBody = $data; $message = new AMQPMessage($messageBody, array(\'content_type\' => \'text/plain\', \'delivery_mode\' => AMQPMessage::DELIVERY_MODE_PERSISTENT)); $channel->basic_publish($message, self::exchange); $channel->close(); $connection->close(); echo \'ok\'; } /** * 执行 */ public function index() { $data = json_encode([\'msg\' => \'测试数据\', \'id\' => \'15\']); $this->pushMessage($data); } }
5、验证
先执行自定义命令,启动 rabbitmq 守护进程。在项目更目录下打开命令行,输入下面命令:
php think ramq
然后在浏览器访问发送信息的方法,http://你的域名/api/message/index,你发送一次消息,在命令行就会输出一条消息。这样我们就用 RabbitMQ 实现了一个简单的消息队列。
© 版权声明
THE END
暂无评论内容