1、SDK安装:composer.json
{ "require": { "aliyunmq/mq-http-sdk": ">=1.0.1" } }
2、config配置文件增加配置
‘rocketmq‘ => [ "endPoint" => ‘http://test.cn-shanghai.aliyuncs.com‘, "accessId" => ‘test123456‘, "accessKey" => ‘test12456789‘, "consumer" => [ ‘test‘ => ‘GID_TEST_TEST_1‘, ], "instanceId" => ‘MQ_INST_123456_Bdffdsf‘, "topic" => [ ‘test‘ => ‘test‘, ] ]
3、服务类
<?php namespace common\test; use MQ\Model\Message; use MQ\Model\TopicMessage; use MQ\MQClient; use MQ\MQConsumer; use MQ\MQProducer; use Yii; class RocketMqService { /** * @var MQClient */ private $client; /** * @var MQConsumer */ private $consumer = null; /** * @var MQProducer */ private $producer = null; private $topic; private $groupId; private $instanceId; public function __construct($service) { $this->client = new MQClient( Yii::$app->params[‘rocketmq‘][‘endPoint‘], Yii::$app->params[‘rocketmq‘][‘accessId‘], Yii::$app->params[‘rocketmq‘][‘accessKey‘] ); $this->instanceId = Yii::$app->params[‘rocketmq‘][‘instanceId‘]; $this->groupId = isset(Yii::$app->params[‘rocketmq‘][‘consumer‘][$service]); $this->topic = Yii::$app->params[‘rocketmq‘][‘topic‘][$service]; } /** * 发送mq消息 * @param array $content 消息体数组 * @param string $tag 消息tag * @return mixed * @author K.k */ public function sendMessage(array $content, string $tag) { if (!$this->producer){ $this->producer = $this->client->getProducer($this->instanceId, $this->topic); } $message = new TopicMessage(json_encode($content)); $message->setMessageTag($tag); return $this->producer->publishMessage($message); } /** * 订阅消息 * @param string $tag * @throws */ public function subscribeMq($tag = null) { $this->consumer = $this->client->getConsumer($this->instanceId, $this->topic, $this->groupId, $tag); } /** * 消息确认 * @param Message $message */ public function ack(Message $message) { $receiptHandles[] = $message->getReceiptHandle(); $this->consumer->ackMessage($receiptHandles); } /** * 订阅消息 * @param $tag * @param int $numOfMessages * @param int $waitSeconds * @return bool|Message */ public function consumerMessage($tag, $numOfMessages = 5, $waitSeconds = 0) { $this->consumer = $this->client->getConsumer($this->instanceId, $this->topic, $this->groupId, $tag); try { $messages = $this->consumer->consumeMessage($numOfMessages, $waitSeconds); } catch (\Exception $e) { return false; } return $messages; } }
4、发送消息
$body = [ ‘userId‘ => 123456, ‘userName‘ => ‘test‘, ]; $mq = new RocketMqService(‘test‘); $result = $mq->sendMessage($body, ‘test_tag‘);
5、消费消息
try{ $rockMqService = new RocketMqService(); while (true) { $messages = $rockMqService->consumerMessage(‘test‘); if (!$messages) { break; } $receiptHandles = array(); foreach ($messages as $message) { $receiptHandles[] = $message->getReceiptHandle(); $body = $message->getMessageBody(); $data = json_decode($body, true); //code...... } $rockMqService->ackConsumerMessage($receiptHandles); sleep(3); } }catch (Exception $e) { }
原文:https://www.cnblogs.com/diguaer/p/14549523.html