<?php
/*
* 消息队列
* @access Queue.php
* @author liuhaipeng <liuhaipeng@dhgate.com>
* @create 2013-10-17 上午10:42:11
* @version 1.0
* @copyright Copyright © 2013, DHgate.com
*/
class Scrm_Queue {
protected static $connect = NULL;
protected static $channel = NULL;
protected static $exchangeName = NULL;
protected static $queue = NULL;
protected static $queueName = NULL;
public function __construct($config, $queueName = NULL, $exchangeName = NULL) {
self::$queueName = $queueName;
self::$exchangeName = $exchangeName;
// 创建amqpconnection实例
$connect = new AMQPConnection($config);
if (! $connect->connect()) {
die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($connect);
self::$connect = &$connect;
self::$channel = &$channel;
}
public function __destruct() {
self::$connect->disconnect();
}
/**
* 写入消息队列
* @param unknown $message
*/
public function setQueue($message = array(), $routing_key = ‘key_1‘) {
// 创建交换机
$exchange = new AMQPExchange(self::$channel);
$exchange->setName(self::$exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); // 交换器进行持久化,即 RabbitMQ 重启后会自动重建
$exchange->declare();
// 创建队列
$queue = new AMQPQueue(self::$channel);
$queue->setName(self::$queueName);
$queue->setFlags(AMQP_DURABLE);
// $queue->declare(); // AMQP 1.2.0 由 declare() 改为 declareQueue()
// 绑定交换机与队列,并指定路由键
$queue->bind($exchange->getName(), $routing_key);
$message = json_encode($message);
return $exchange->publish($message, $routing_key);
}
/**
* 读取消息队列中未处理的消息
* @return multitype:mixed
*/
public function getQueue() {
$queue = new AMQPQueue(self::$channel);
$queue->setName(self::$queueName);
$queue->setFlags(AMQP_DURABLE); // 持久化
// $messageCount = $queue->declare();
$result = array();
$i = 0;
// AMQP_NOPARAM / AMQP_AUTOACK
while($message = $queue->get(AMQP_NOPARAM)) {
$getBody = $message->getBody();
$result[$i] = json_decode($getBody, true);
$queue->ack($message->getDeliveryTag());
$i++;
if ($i >= 500) break;
}
return $result;
}
}AMQP,布布扣,bubuko.com
AMQP
原文:http://blog.csdn.net/peng905/article/details/28425699