这段时间一直业务比较忙,因公司用的 databases 队列,用起来 感觉不是很爽,故简单封装了一个rabbitmq类(业务代码随便写的)
首先是账号密码配置
config.php
<?php return $arr = [ ‘RabbitMq‘ => [ // Rabbitmq 服务地址 ‘host‘ => ‘127.0.0.1‘, // Rabbitmq 服务端口 ‘port‘ => ‘5672‘, // Rabbitmq 帐号 ‘login‘ => ‘guest‘, // Rabbitmq 密码 ‘password‘ => ‘guest‘, ‘vhost‘=>‘/‘ ] ];
基类 base.php
<?php include dirname(__FILE__).‘/object.php‘; include dirname(__FILE__).‘/config.php‘; class RabbitMq implements object { //保存类实例的静态成员变量 static private $_instance; static private $_conn; static private $amp ; static private $route = ‘key_1‘; static private $q ; static private $ex ; static private $queue; public static function getInstance(){ global $arr; if (!(self::$_instance instanceof self)) { self::$_instance = new self($arr[‘RabbitMq‘]); return self::$_instance; } return self::$_instance; } private function __construct($conn) { //创建连接和channel $conn = new AMQPConnection($conn); if(!$conn->connect()) { die("Cannot connect to the broker!\n"); } self::$_conn = new AMQPChannel($conn); self::$amp = $conn; } /* * * * parm 交换机名 * parm 队列名 * * */ public function listen($exchangeName,$queuename){ self::$queue = $queuename; return $this->setExchange($exchangeName,$queuename); } //连接交换机 public function setExchange($exchangeName,$queueName){ //创建交换机 $ex = new AMQPExchange(self::$_conn); self::$ex = $ex; $ex->setName($exchangeName); $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型 $ex->setFlags(AMQP_DURABLE); //持久化 $ex->declare(); return self::setQueue($queueName,$exchangeName); } //创建队列 private static function setQueue($queueName,$exchangeName){ // 创建队列 $q = new AMQPQueue(self::$_conn); $q->setName($queueName); $q->setFlags(AMQP_DURABLE); $q->declareQueue(); // 用于绑定队列和交换机 $routingKey = self::$route; $q->bind($exchangeName, $routingKey); self::$q = $q; return(self::$_instance); } /* * 消费者 * $fun_name = array($classobj,$function) or function name string * $autoack 是否自动应答 * * function processMessage($envelope, $queue) { $msg = $envelope->getBody(); echo $msg."\n"; //处理消息 $queue->ack($envelope->getDeliveryTag());//手动应答 } */ public function run($func, $autoack = True){ if (!$func || !self::$q) return False; while(True){ if ($autoack) { if(!self::$q->consume($func, AMQP_AUTOACK)){ // self::$q->ack($envelope->getDeliveryTag()); echo 123; } } self::$q->consume($func); } } private static function closeConn(){ self::$amp->disconnect(); } public function pushlish($msg){ while (1) { sleep(1); if (self::$ex->publish(date(‘H:i:s‘) . "用户" . "注册", self::$route)) { //写入文件等操作 echo $msg; } } } //__clone方法防止对象被复制克隆 public function __clone() { trigger_error(‘Clone is not allow!‘, E_USER_ERROR); } }
consume 监听类(一个操作对应一个class)
<?php include dirname(__FILE__).‘/base.php‘; class Add { public static function run(){ $dbms=‘mysql‘; //数据库类型 $host=‘127.0.01‘; //数据库主机名 $dbName=‘test‘; //使用的数据库 $user=‘root‘; //数据库连接用户名 $pass=‘admin‘; //对应的密码 $dsn="$dbms:host=$host;dbname=$dbName"; sleep(1); try { $dbh = new PDO($dsn, $user, $pass); //初始化一个PDO对象 /*你还可以进行一次搜索操作 foreach ($dbh->query(‘SELECT * from FOO‘) as $row) { print_r($row); //你可以用 echo($GLOBAL); 来看到这些值 } */ $dbh = null; } catch (PDOException $e) { die ("Error!: " . $e->getMessage() . "<br/>"); } //默认这个不是长连接,如果需要数据库长连接,需要最后加一个参数:array(PDO::ATTR_PERSISTENT => true) 变成这样: $db = new PDO($dsn, $user, $pass, array(PDO::ATTR_PERSISTENT => true)); $sql = ‘INSERT INTO `test`.`t_reg`(`names`) VALUES (9)‘; $row = $db->query($sql); if(!$row){ return false; } echo ‘OK‘; } } $consume = new Add(); //tudo //$s = RabbitMq::getInstance()->listen(‘jiaohuanji‘,‘queue1‘)->run(array($consume,‘run‘)); 将run函数带入到consume里面作为回调 在consume里面增加$funname ,增加代码粘性 $s = RabbitMq::getInstance()->listen(‘jiaohuanji‘,‘queue1‘)->run(array($consume,‘run‘));
push 类(发送者)
<?php include "base.php"; RabbitMq::getInstance()->listen(‘jiaohuanji‘,‘queue1‘)->pushlish(‘请求已发送‘);
接口interface
<?php interface object { public static function getInstance(); }
监听 add.php
执行 send.php 即可完成简单的rabit操作
原文:https://www.cnblogs.com/huangguowen/p/10254444.html