消息生成者:
channel();//在已连接基础上建立生产者与mq之间的通道 $channel->exchange_declare($exchangeName,?‘direct‘,?false,?true,?false);?//声明初始化交换机 $channel->queue_declare($queueName,?false,?true,?false,?false);//声明初始化一条队列 $channel->queue_bind($queueName,?$exchangeName,?$routingKey);?//将队列与某个交换机进行绑定,并使用路由关键字 $msgBody?=?json_encode(["name"?=>?"iGoo",?"age"?=>?22]); $msg?=?new?AMQPMessage($msgBody,?[‘content_type‘?=>?‘text/plain‘,?‘delivery_mode‘?=>?2]);?//生成消息 $channel->basic_publish($msg,?$exchangeName,?$routingKey);//推送消息到某个交换机 echo?"?[x]?Sent?‘Hello?World!‘\n"; $channel->close(); $connection->close();
消息消费者:
channel(); $channel->exchange_declare($exchangeName,?‘direct‘,?false,?true,?false);?//声明初始化交换机 $channel->queue_declare($queueName,?false,?true,?false,?false); $channel->queue_bind($queueName,?$exchangeName,?$routingKey);?//将队列与某个交换机进行绑定,并使用路由关键字 echo?"?[*]?Waiting?for?messages.?To?exit?press?CTRL+C\n"; $callback?=?function?($msg)?{ ????echo?‘?[x]?Received?‘,?$msg->body,?"\n"; }; $channel->basic_consume($queueName,?‘‘,?false,?true,?false,?false,?$callback); while?($channel->is_consuming())?{//这个是阻塞模式,有消息就执行回调 ????$channel->wait(); } /*?$channel->close(); $connection->close();?*/
? ??
原文:https://blog.51cto.com/u_13227207/2834124