如下是一个使用Hadoop?IPC实现客户端调用服务器端方法的示例,功能是返回服务器端的一个文件信息。
????代码如下所示:
package?org.seandeng.hadoop.ipc;
?
import?java.io.DataInput;
import?java.io.DataOutput;
import?java.io.IOException;
import?java.util.Date;
import?org.apache.hadoop.io.Text;
import?org.apache.hadoop.io.Writable;
?
public?class?IPCFileStatus?implements?Writable?{
????private?String?filename;
????private?long?time;
????public?IPCFileStatus()?{
????}
????public?IPCFileStatus(String?filename)?{
????????this.filename=filename;
????????this.time=(new?Date()).getTime();
????}
????public?String?getFilename()?{
????????return?filename;
????}
????public?void?setFilename(String?filename)?{
????????this.filename?=?filename;
????}
????public?long?getTime()?{
????????return?time;
????}
????public?void?setTime(long?time)?{
????????this.time?=?time;
????}
????public?String?toString()?{
????????return?"File:?"+filename+"?Create?at?"+(new?Date(time));
????}
????public?void?readFields(DataInput?in)?throws?IOException?{
????????this.filename?=?Text.readString(in);
????????this.time?=?in.readLong();
????}
????public?void?write(DataOutput?out)?throws?IOException?{
????????Text.writeString(out,?filename);
????????out.writeLong(time);
????}
}
由于IPCFileStatus类的对象需要从服务器端传到客户端,所以就需要进行序列化,Writable接口就是Hadoop定义的一个序列化接口。?
????由于客户端要调用服务器的方法,所以客户端需要知道服务器有哪些方法可以调用,在IPC中使用的是定义接口的方法,如定义一个IPC接口,客户端和服务器端都知道这个接口,客户端通过IPC获取到一个服务器端这个实现了接口的引用,待要调用服务器的方法时,直接使用这个引用来调用方法,这样就可以调用服务器的方法了。
????定义一个服务器端和客户端接口IPCQueryStatus如下所示:
package?org.seandeng.hadoop.ipc;
?
import?org.apache.hadoop.ipc.VersionedProtocol;
?
public?interface?IPCQueryStatus?extends?VersionedProtocol?{
????IPCFileStatus?getFileStatus(String?filename);
}
????在接口IPCQueryStatus中,定义了一个getFileStatus(String?filename)方法,根据文件名得到一个IPCFileStatus对象,注意到IPCQueryStatus接口继承自接口?org.apache.hadoop.ipc.VersionedProtocol接口,VersionedProtocol接口是Hadoop?IPC接口必须继承的一个接口,它定义了一个方法getProtocolVersion(),用于返回服务器端的接口实现的版本号,有两个参数,分别是协议接口对应的接口名称protocol和客户端期望服务器的版本号clientVersion,主要作用是检查通信双方的接口是否一致,VersionedProtocol的代码如下:
package?org.apache.hadoop.ipc;
?
import?java.io.IOException;
/**
?*?Superclass?of?all?protocols?that?use?Hadoop?RPC.
?*?Subclasses?of?this?interface?are?also?supposed?to?have
?*?a?static?final?long?versionID?field.
?*/
public?interface?VersionedProtocol?{
??/**
???*?Return?protocol?version?corresponding?to?protocol?interface.
???*?@param?protocol?The?classname?of?the?protocol?interface
???*?@param?clientVersion?The?version?of?the?protocol?that?the?client?speaks
???*?@return?the?version?that?the?server?will?speak
???*/
??public?long?getProtocolVersion(String?protocol,?
?????????????????????????????????long?clientVersion)?throws?IOException;
}
????定义好了接口,那么在服务器端就需要有一个接口的实现类,用于实现具体的业务逻辑,下面的IPCQueryStatusImpl类实现了IPCQueryStatus接口,仅仅简单实现了IPCQueryStatus规定两个方法。
package?org.seandeng.hadoop.ipc;
?
import?java.io.IOException;
?
public?class?IPCQueryStatusImpl?implements?IPCQueryStatus?{
????public?IPCQueryStatusImpl()?{}
?
????public?IPCFileStatus?getFileStatus(String?filename)?{
????????IPCFileStatus?status=new?IPCFileStatus(filename);
????????System.out.println("Method?getFileStatus?Called,?return:?"+status);
????????return?status;
????}
????/**
?????*?用于服务器与客户端,进行IPC接口版本检查,再服务器返回给客户端时调用,如果服务器端的IPC版本与客户端不一致
?????*?那么就会抛出版本不一致的异常
?????*/
????public?long?getProtocolVersion(String?protocol,?long?clientVersion)?throws?IOException?{
????????System.out.println("protocol:?"+protocol);
????????System.out.println("clientVersion:?"+clientVersion);
????????return?IPCQueryServer.IPC_VER;
????}
}
getFileStatus()方法根据参数filename创建了一个IPCFileStatus对象,getProtocolVersion()方法返回服务器端使用的接口版本。接口和实现类都完成之后就可以用客户端和服务器进行通信了。
????服务器端进行一些成员变量的初始化,然后使用Socket绑定IP,然后在某个端口上监听客户端的请求。IPCQueryServer类相关代码如下所示:
import?org.apache.hadoop.conf.Configuration;
import?org.apache.hadoop.ipc.RPC;
import?org.apache.hadoop.ipc.Server;
?
public?class?IPCQueryServer?{
????public?static?final?int?IPC_PORT?=?32121;
????public?static?final?long?IPC_VER?=?5473L;
?
????public?static?void?main(String[]?args)?{
????????try?{
????????????Configuration?conf?=?new?Configuration();
????????????IPCQueryStatusImpl?queryService=new?IPCQueryStatusImpl();
????????????System.out.println(conf);
????????????Server?server?=?RPC.getServer(queryService,?"127.0.0.1",?IPC_PORT,?1,?false,?conf);
????????????server.start();
?
????????????System.out.println("Server?ready,?press?any?key?to?stop");
????????????System.in.read();
?
????????????server.stop();
????????????System.out.println("Server?stopped");
????????}?catch?(Exception?e)?{
????????????e.printStackTrace();
????????}
????}
}
????在服务器端先创建一个IPCQueryStatusImpl的对象,传递到RPC.getServer()方法中。服务器端使用RPC.getServer()方法穿给创建服务器端对象server,代码中RPC.getServer()方法的几个参数说明如下:
·?第一个参数queryService标识该服务器对象对外提供的服务对象实例,即客户端所要调用的具体对象,下面客户端的代码调用的接口如此对应;
·?第二个参数"127.0.0.1"表示监绑定所有的IP地址;
·?第三个参数IPC_PORT表示监听的端口;
·?第四个参数1表示Server端的Handler实例(线程)的个数为1
·?第五个参数false表示不打开调用方法日志;
·?第六个参数是Configuration对象,用于定制Server端的配置。
创建Server对象之后,调用Server.start()方法开始监听客户端的请求,并根据客户端的请求提供服务。
客户端需要先获取到一个代理对象,然后才能进行方法调用,在IPC中,使用RPC.getProxy()方法获取代理对象。客户端的代码如下:?
package?org.seandeng.hadoop.ipc;
?
import?java.net.InetSocketAddress;
?
import?org.apache.hadoop.conf.Configuration;
import?org.apache.hadoop.ipc.RPC;
?
public?class?IPCQueryClient?{
????public?static?void?main(String[]?args)?{
????????try?{
????????????System.out.println("Interface?name:?"+IPCQueryStatus.class.getName());
????????????System.out.println("Interface?name:?"+IPCQueryStatus.class.getMethod("getFileStatus",?String.class).getName());
????????????InetSocketAddress?addr=new?InetSocketAddress("localhost",?IPCQueryServer.IPC_PORT);
????????????IPCQueryStatus?query=(IPCQueryStatus)?RPC.getProxy(IPCQueryStatus.class,?IPCQueryServer.IPC_VER,?addr,new?Configuration());
????????????IPCFileStatus?status=query.getFileStatus("Z:\\temp\\7c64984cf5c3410fbe28037865d010a3.pdf");
????????????System.out.println(status);
????????????RPC.stopProxy(query);
????????}?catch?(Exception?e)?{
????????????e.printStackTrace();
????????}
????}
}
????客户端的代码很简单,首先构造一个要请求服务器的网络地址(IP和端口),然后通过RPC.getProxy()方法获取到一个IPCQueryStatus对象,然后进行相应的方法调用。其中客户端代码中RPC.getProxy()方法的参数说明如下:
·?第一个参数是IPC接口对象,可以通过IPC接口的静态成员class直接获得。接口的静态成员class保存了该接口的java.lang.Class实例,它表示正在运行的Java应用程序中的类和接口,提供一系列与Java反射相关的重要功能;
·?第二个参数是接口版本,由于接口会根据需求不断地进行升级,形成多个版本的IPC接口,如果客户端和服务器端使用的IPC接口版本不一致,结果将是灾难性的,所以在建立IPC时,需要对IPC的双方进行版本检查;
·?第三个参数是服务器的Socket地址,用于建立IPC的底层TCP连接;
·?第四个参数是Configuration对象,用于定制IPC客户端参数。
客户端的代码编写完成之后就可以运行程序了,先启动服务器端,再运行一个客户端,就完成了一次客户端调用服务器的过程,客户端调用了服务器端?IPCQueryStatusImpl对象的getFileStatus()方法,服务器端返回了方法调用结果即IPCFileStatus对象。服务器端和客户端执行日志如下所示:
服务器端:
2014-11-26?13:00:49,147?WARN??conf.Configuration?(Configuration.java:<clinit>(191))?-?DEPRECATED:?hadoop-site.xml?found?in?the?classpath.?Usage?of?hadoop-site.xml?is?deprecated.?Instead?use?core-site.xml,?mapred-site.xml?and?hdfs-site.xml?to?override?properties?of?core-default.xml,?mapred-default.xml?and?hdfs-default.xml?respectively Configuration:?core-default.xml,?core-site.xml 2014-11-26?13:00:50,124?INFO??ipc.Server?(Server.java:run(328))?-?Starting?SocketReader 2014-11-26?13:00:50,222?INFO??ipc.Server?(Server.java:run(598))?-?IPC?Server?Responder:?starting 2014-11-26?13:00:50,223?INFO??ipc.Server?(Server.java:run(434))?-?IPC?Server?listener?on?32121:?starting Server?ready,?press?any?key?to?stop 2014-11-26?13:00:50,224?INFO??ipc.Server?(Server.java:run(1358))?-?IPC?Server?handler?0?on?32121:?starting protocol:?org.seandeng.hadoop.ipc.IPCQueryStatus clientVersion:?5473 Method?getFileStatus?Called,?return:?File:?Z:\temp\7c64984cf5c3410fbe28037865d010a3.pdf?Create?at?Wed?Nov?26?13:01:02?CST?2014 |
客户端:
Interface?name:?org.seandeng.hadoop.ipc.IPCQueryStatus Interface?name:?getFileStatus 2014-11-26?13:00:59,790?WARN??conf.Configuration?(Configuration.java:<clinit>(191))?-?DEPRECATED:?hadoop-site.xml?found?in?the?classpath.?Usage?of?hadoop-site.xml?is?deprecated.?Instead?use?core-site.xml,?mapred-site.xml?and?hdfs-site.xml?to?override?properties?of?core-default.xml,?mapred-default.xml?and?hdfs-default.xml?respectively File:?Z:\temp\7c64984cf5c3410fbe28037865d010a3.pdf?Create?at?Wed?Nov?26?13:01:02?CST?2014 |
?
原文:http://seandeng888.iteye.com/blog/2160714