(使用Java客户端)
一、概述
在Work Queue的章节中我们学习了如何使用Work Queue分配耗时的任务给多个工作者,但是如果我们需要运行一个函数在远程计算机上,这是一个完全不同的情景,这种模式通常被称之为RPC。
在本章节的学习中,我们将使用RabbitMQ来构建一个RPC系统:一个远程客户端和一个可扩展的RPC服务器,我们没有任何费时的任务进行分配,我们将创建一个虚拟的RPC服务返回Fibonacci数。
1.1、客户端接口(Client Interface)
为了说明一个RPC服务可以使用,我们将创建一个简单的客户端类,这将通过方法名的调用发送一个RPC请求和接收块得到答复:
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient(); String result = fibonacciRpc.call("4"); System.out.println( "fib(4) is " + result);注意:
尽管RPC是计算机领域中非常普遍的模式,它经常受到批评,当程序不知道是否这是一个缓慢的RPC调用函数,像在不可预知接口的系统进行调试,增加了不必要的复杂性,而不是简化软件,滥用会导致不可修复的代码,如果要使用它记住考虑以下建议:
1、能明确区分被调用的函数是局部的还是远程的。
2、您的文件系统、组件之间的依赖关系是很清晰的。
3、处理问题?客户应该知道当RPC服务器挂掉的时候该如何做。
1.2、回调队列(Callback Queue)
总的说来使用RabbitMQ来实现RPC是比较简单的,当客户端发送请求消息和服务器响应消息的答复,为了接收到响应我们需要发送一个callback队列地址在请求中,我们可以使用默认的队列,让我们试试:
callbackQueueName = channel.queueDeclare().getQueue(); BasicProperties props = new BasicProperties .Builder() .replyTo(callbackQueueName) .build(); channel.basicPublish("", "rpc_queue", props, message.getBytes()); // ... then code to read a response message from the callback_queue ...说明:
AMQP协议对一个消息预设了一个14个属性的集合,大部分属性很少被使用,有以下例外:
1、deliveryMode:标志一个消息的持久化(值为2)或者状态(其他任何值)。
2、contentType:用于描述编码的MIME类型,比如,要经常使用JSON编码大的一个好的设置方法为application/json
3、replyTo:常用的回调队列名称。
4、correlationId:被用于一个RPC相应请求相关。
同时我们需要一个新的类:
import com.rabbitmq.client.AMQP.BasicProperties;
1.3、相关ID (correlation Id)
在上述方法我们为每个RPC请求创建一个回调队列,那是很低效的但是幸运的是有一个更好的方式,让我们创建一个单一回调队列供每个客户端调用。
这出现了一个新的问题,在队列中接收到一个不清楚这个请求属于哪个响应时的响应,我们要将它设置为每个请求的一个特有的值,然后从一个回调队列中接收一个消息时就要查看这个属性值,在此基础上,我们将能匹配一个请求的响应,如果我们看到一个未知的correlationId值,我们可以安全地将这些消息丢弃因为它不属于我们的要求。
你也许会问,我们为什么要丢弃回调队列中未知的消息呢?而不是一个错误引起的失败呢?这是由于一个可能在服务器的竞争引起的,虽然不太可能,但是它还是有可能发生的,RPC服务器在给我们大答复之后将挂掉,但是发送确认消息的请求,如果这种情况发生,将再次重启RPC服务器处理请求,这就是为什么在客户端必须处理重复的响应。
二、实现
2.1、结构如下图所示:
从上图可知,我们RPC工作流程如下:
1、当客户端启动时,它创建一个匿名的独立的回调队列。
2、一个RPC请求中,客户端发送一个消息具有两个特性:replyTo它包含将要到达的回调队列和correlation_id,这是每个请求的一个固有的值。
3、请求发送到一个rpc_queue队列。
4、RPC服务器正在等待队列的请求,当一个请求到达时,它的工作久是发送一个消息结果返回给客户端,使用了replyTo队列。
5、客户端等待回调replyTo队里的数据,当消息出现时,它检查correlationId值是否和请求返回给应用程序响应的值匹配。
2.2、代码实现
Fibonacci 函数:
private static int fib(int n) throws Exception { if (n == 0) return 0; if (n == 1) return 1; return fib(n-1) + fib(n-2); }
我们声明的Fibonacci函数,它假定只有有效的整整输入(别指望一个大的数字,它可能是最慢的递归实现),我们RPC服务器RPCServer.java代码如下:
private static final String RPC_QUEUE_NAME = "rpc_queue"; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUEUE_NAME, false, consumer); System.out.println(" [x] Awaiting RPC requests"); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new BasicProperties .Builder() .correlationId(props.getCorrelationId()) .build(); String message = new String(delivery.getBody()); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); String response = "" + fib(n); channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes()); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }说明:
1、像之前所有实例一样,开始建立连接、通道和队列
2、可能要运行多个服务器进程,为了传播同样的负载在多个服务器上,我们需要通过channel.basicQos来设置prefetchcount值。
3、通过basicConsume访问队列,然后进入循环,等待请求消息、处理消息和发送响应。
RPCClient.java代码如下:
private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; private QueueingConsumer consumer; public RPCClient() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); replyQueueName = channel.queueDeclare().getQueue(); consumer = new QueueingConsumer(channel); channel.basicConsume(replyQueueName, true, consumer); } public String call(String message) throws Exception { String response = null; String corrId = java.util.UUID.randomUUID().toString(); BasicProperties props = new BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); channel.basicPublish("", requestQueueName, props, message.getBytes()); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); if (delivery.getProperties().getCorrelationId().equals(corrId)) { response = new String(delivery.getBody()); break; } } return response; } public void close() throws Exception { connection.close(); }说明:
1、建立一个连接通道和声明一个专属的回调队列的回复。
2、订阅回调队列,这样可以接受RPC响应
3、调用方法发起实际的RPC请求。
4、生成一个唯一的correlationId并且保存它,while循环将使用这个值来匹配相对应的响应。
5、发送请求消息,它有两个属性值relpTo和correlationId。
6、等待相匹配的响应。
7、while循环做简单的工作,为每个响应检查是否correlationId就是我们需要的,如果是,保存该响应。
8、返回响应给客户端。
客户端请求代码:
RPCClient fibonacciRpc = new RPCClient(); System.out.println(" [x] Requesting fib(30)"); String response = fibonacciRpc.call("30"); System.out.println(" [.] Got '" + response + "'"); fibonacciRpc.close();
2.3、完整的代码清单
RPCClient.java
package com.xuz.rpc; import java.util.UUID; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.AMQP.BasicProperties; public class RPCClient { private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; private QueueingConsumer consumer; public RPCClient() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); connection = factory.newConnection(); channel = connection.createChannel(); //响应队列名,服务端会把返回的信息发送到这个队列中。 replyQueueName = channel.queueDeclare().getQueue(); consumer = new QueueingConsumer(channel); channel.basicConsume(replyQueueName, true, consumer); } public String call(String message) throws Exception { String response = null; //每个请求生成一个唯一的correlationId String corrId = UUID.randomUUID().toString(); //设置请求响应基本参数:correlationId(UUID)和rpc_queue BasicProperties props = new BasicProperties.Builder().correlationId( corrId).replyTo(replyQueueName).build(); System.out.println("客户端响应队列的属性:["+props.getCorrelationId()+"," +props.getReplyTo()+"]"); channel.basicPublish("", requestQueueName, props, message.getBytes()); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //如果获得响应队列中的getCorrelationId和当前corrId相等,则保存响应并返回 if (delivery.getProperties().getCorrelationId().equals(corrId)) { response = new String(delivery.getBody(), "UTF-8"); break; } } return response; } /** * 关闭连接 * @throws Exception */ public void close() throws Exception { connection.close(); } public static void main(String[] argv) { RPCClient rpcClient = null; String response = null; try { rpcClient = new RPCClient(); //调用Call方法传入请求消息:测试RPC response = rpcClient.call("测试RPC"); System.out.println(" 响应消息:[" + response + "]"); } catch (Exception e) { e.printStackTrace(); } finally { if (rpcClient != null) { try { rpcClient.close(); } catch (Exception ignore) { } } } } }
package com.xuz.rpc; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.AMQP.BasicProperties; public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; /** * 定义函数 * @param n 输入的正整数 * @return */ private static int fib(int n) { if (n == 0) return 0; if (n == 1) return 1; return fib(n - 1) + fib(n - 2); } public static void main(String[] argv) { Connection connection = null; Channel channel = null; try { //获取连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设定主机 factory.setHost("127.0.0.1"); //创建连接 connection = factory.newConnection(); //创建通道 channel = connection.createChannel(); //声明RPC队列 channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); //设置公平调度 channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUEUE_NAME, false, consumer); System.out.println("[等待RPC远程请求!]"); while (true) { String response = null; System.out.println("[服务端等待接收消息!]"); QueueingConsumer.Delivery delivery = consumer.nextDelivery(); System.out.println("[服务端成功接收消息!]"); BasicProperties props = delivery.getProperties(); //从响应队列获取reply参数 BasicProperties replyProps = new BasicProperties.Builder() .correlationId(props.getCorrelationId()).build(); System.out.println("服务端响应队列的属性:["+replyProps.getCorrelationId()+"]"); try { String message = new String(delivery.getBody(), "UTF-8"); response = "服务端已经处理了消息:[" + message+"]"; } catch (Exception e) { System.out.println(" [.] " + e.toString()); response = ""; } finally { //将结果返回给客户端 channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes("UTF-8")); //设置确认消息 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (Exception ignore) { } } } } }2.4、RPC测试
1、运行RPCClient发送响应请求:
2、运行PRCServer接收处理响应请求:
源码下载:
RabbitMQ (消息队列)专题学习07 RPC,布布扣,bubuko.com
原文:http://blog.csdn.net/xuzheng_java/article/details/34176389