1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
<!--?php
$connect =newAMQPConnection();
$connect--->connect();
$channel =newAMQPChannel($connect);
$exchange =newAMQPExchange($channel);
$exchange->setName(‘exchange‘);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
$queue =newAMQPQueue($channel);
$queue->setName(‘logs‘);
$queue->declare();
$queue->bind(‘exchange‘,‘logs‘);
while(true) {
$queue->consume(‘callback‘);
}
$connection->close();
function callback($envelope, $queue) {
var_dump($envelope->getBody());
$queue->nack($envelope->getDeliveryTag());
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
<!--?php
$connect =newAMQPConnection();
$connect--->connect();
$channel =newAMQPChannel($connect);
$exchange =newAMQPExchange($channel);
$exchange->setName(‘exchange‘);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
$exchange->publish(‘direct type test‘,‘logs‘);
var_dump("Send Message OK");
$connect->disconnect();
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
<!--?php
$connect =newAMQPConnection();
$connect--->connect();
$channel =newAMQPChannel($connect);
$exchange =newAMQPExchange($channel);
$exchange->setName(‘exchange‘);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
$queue =newAMQPQueue($channel);
$queue->setName(‘logs‘);
@$queue->declare();
$queue->bind(‘exchange‘,‘logs‘);
while(true) {
$queue->consume(‘callback‘);
}
$connection->close();
function callback($envelope, $queue) {
var_dump($envelope->getBody());
$queue->nack($envelope->getDeliveryTag());
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
<!--?php
$connect =newAMQPConnection();
$connect--->connect();
$channel =newAMQPChannel($connect);
$exchange =newAMQPExchange($channel);
$exchange->setName(‘exchange‘);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
for($index =1; $index <5; $index++) {
$exchange->publish($index,‘logs‘);
var_dump("Send:$index");
}
$exchange->delete();
$connect->disconnect();
|
1
2
3
4
|
for($index =1; $index <50; $index++) {
$exchange->publish($index,‘logs‘);
var_dump("Send:$index");
}
|
1
2
3
4
5
|
function callback($envelope, $queue) {
var_dump($envelope->getBody());
sleep(3);
$queue->nack($envelope->getDeliveryTag());
}
|
1
|
$channel =newAMQPChannel($connect);
|
1
2
|
$channel =newAMQPChannel($connect);
$channel->setPrefetchCount(1);
|
php amqp 消息队列 RabbitMQ 交换器类型 直连 (三)
原文:http://my.oschina.net/u/127806/blog/321116