首页 > 其他 > 详细

大数据框架hadoop的IPC机制实例

时间:2014-11-27 02:14:13      阅读:322      评论:0      收藏:0      [点我收藏+]
??? Hadoop?IPC(Inter-Process?Communication,进程间通信)这是一种简洁,低消耗的通信机制,可以精确控制进程间通信中如连接、超时、缓存等细节。Hadoop?IPC机制的实现使用了Java动态代理,Java?NIO等技术。

如下是一个使用Hadoop?IPC实现客户端调用服务器端方法的示例功能是返回服务器端的一个文件信息

1?文件信息类IPCFileStatus

????代码如下所示:

package?org.seandeng.hadoop.ipc;

?

import?java.io.DataInput;

import?java.io.DataOutput;

import?java.io.IOException;

import?java.util.Date;

import?org.apache.hadoop.io.Text;

import?org.apache.hadoop.io.Writable;

?

public?class?IPCFileStatus?implements?Writable?{

????private?String?filename;

????private?long?time;

????public?IPCFileStatus()?{

????}

????public?IPCFileStatus(String?filename)?{

????????this.filename=filename;

????????this.time=(new?Date()).getTime();

????}

????public?String?getFilename()?{

????????return?filename;

????}

????public?void?setFilename(String?filename)?{

????????this.filename?=?filename;

????}

????public?long?getTime()?{

????????return?time;

????}

????public?void?setTime(long?time)?{

????????this.time?=?time;

????}

????public?String?toString()?{

????????return?"File:?"+filename+"?Create?at?"+(new?Date(time));

????}

????public?void?readFields(DataInput?in)?throws?IOException?{

????????this.filename?=?Text.readString(in);

????????this.time?=?in.readLong();

????}

????public?void?write(DataOutput?out)?throws?IOException?{

????????Text.writeString(out,?filename);

????????out.writeLong(time);

????}

}

由于IPCFileStatus类的对象需要从服务器端传到客户端,所以就需要进行序列化,Writable接口就是Hadoop定义的一个序列化接口。?
????由于客户端要调用服务器的方法,所以客户端需要知道服务器有哪些方法可以调用,在IPC中使用的是定义接口的方法,如定义一个IPC接口,客户端和服务器端都知道这个接口,客户端通过IPC获取到一个服务器端这个实现了接口的引用,待要调用服务器的方法时,直接使用这个引用来调用方法,这样就可以调用服务器的方法了。

2?接口IPCQueryStatus

????定义一个服务器端和客户端接口IPCQueryStatus如下所示:

package?org.seandeng.hadoop.ipc;

?

import?org.apache.hadoop.ipc.VersionedProtocol;

?

public?interface?IPCQueryStatus?extends?VersionedProtocol?{

????IPCFileStatus?getFileStatus(String?filename);

}

????在接口IPCQueryStatus中,定义了一个getFileStatus(String?filename)方法根据文件名得到一个IPCFileStatus对象,注意到IPCQueryStatus接口继承自接口?org.apache.hadoop.ipc.VersionedProtocol接口,VersionedProtocol接口是Hadoop?IPC接口必须继承的一个接口,它定义了一个方法getProtocolVersion(),用于返回服务器端的接口实现的版本号,有两个参数,分别是协议接口对应的接口名称protocol和客户端期望服务器的版本号clientVersion,主要作用是检查通信双方的接口是否一致,VersionedProtocol的代码如下:

package?org.apache.hadoop.ipc;

?

import?java.io.IOException;

/**

?*?Superclass?of?all?protocols?that?use?Hadoop?RPC.

?*?Subclasses?of?this?interface?are?also?supposed?to?have

?*?a?static?final?long?versionID?field.

?*/

public?interface?VersionedProtocol?{

??/**

???*?Return?protocol?version?corresponding?to?protocol?interface.

???*?@param?protocol?The?classname?of?the?protocol?interface

???*?@param?clientVersion?The?version?of?the?protocol?that?the?client?speaks

???*?@return?the?version?that?the?server?will?speak

???*/

??public?long?getProtocolVersion(String?protocol,?

?????????????????????????????????long?clientVersion)?throws?IOException;

}

3?实现类IPCQueryStatusImpl

????定义好了接口,那么在服务器端就需要有一个接口的实现类,用于实现具体的业务逻辑,下面的IPCQueryStatusImpl类实现了IPCQueryStatus接口,仅仅简单实现了IPCQueryStatus规定两个方法

package?org.seandeng.hadoop.ipc;

?

import?java.io.IOException;

?

public?class?IPCQueryStatusImpl?implements?IPCQueryStatus?{

????public?IPCQueryStatusImpl()?{}

?

????public?IPCFileStatus?getFileStatus(String?filename)?{

????????IPCFileStatus?status=new?IPCFileStatus(filename);

????????System.out.println("Method?getFileStatus?Called,?return:?"+status);

????????return?status;

????}

????/**

?????*?用于服务器与客户端,进行IPC接口版本检查,再服务器返回给客户端时调用,如果服务器端的IPC版本与客户端不一致

?????*?那么就会抛出版本不一致的异常

?????*/

????public?long?getProtocolVersion(String?protocol,?long?clientVersion)?throws?IOException?{

????????System.out.println("protocol:?"+protocol);

????????System.out.println("clientVersion:?"+clientVersion);

????????return?IPCQueryServer.IPC_VER;

????}

}

getFileStatus()方法根据参数filename创建了一个IPCFileStatus对象,getProtocolVersion()方法返回服务器端使用的接口版本。接口和实现类都完成之后就可以用客户端和服务器进行通信了。

4?IPCQueryServer

????服务器端进行一些成员变量的初始化,然后使用Socket绑定IP,然后在某个端口上监听客户端的请求IPCQueryServer类相关代码如下所示:

import?org.apache.hadoop.conf.Configuration;

import?org.apache.hadoop.ipc.RPC;

import?org.apache.hadoop.ipc.Server;

?

public?class?IPCQueryServer?{

????public?static?final?int?IPC_PORT?=?32121;

????public?static?final?long?IPC_VER?=?5473L;

?

????public?static?void?main(String[]?args)?{

????????try?{

????????????Configuration?conf?=?new?Configuration();

????????????IPCQueryStatusImpl?queryService=new?IPCQueryStatusImpl();

????????????System.out.println(conf);

????????????Server?server?=?RPC.getServer(queryService,?"127.0.0.1",?IPC_PORT,?1,?false,?conf);

????????????server.start();

?

????????????System.out.println("Server?ready,?press?any?key?to?stop");

????????????System.in.read();

?

????????????server.stop();

????????????System.out.println("Server?stopped");

????????}?catch?(Exception?e)?{

????????????e.printStackTrace();

????????}

????}

}

????在服务器端先创建一个IPCQueryStatusImpl的对象,传递到RPC.getServer()方法中。服务器端使用RPC.getServer()方法穿给创建服务器端对象server,代码中RPC.getServer()方法的几个参数说明如下:

·?第一个参数queryService标识该服务器对象对外提供的服务对象实例,即客户端所要调用的具体对象,下面客户端的代码调用的接口如此对应;

·?第二个参数"127.0.0.1"表示监绑定所有的IP地址;

·?第三个参数IPC_PORT表示监听的端口;

·?第四个参数1表示Server端的Handler实例(线程)的个数为1

·?第五个参数false表示打开调用方法日志;

·?第六个参数是Configuration对象,用于定制Server端的配置

创建Server对象之后,调用Server.start()方法开始监听客户端的请求,并根据客户端的请求提供服务。

5?请求类IPCQueryClient

客户端需要先获取到一个代理对象,然后才能进行方法调用,在IPC中,使用RPC.getProxy()方法获取代理对象。客户端的代码如下:?

package?org.seandeng.hadoop.ipc;

?

import?java.net.InetSocketAddress;

?

import?org.apache.hadoop.conf.Configuration;

import?org.apache.hadoop.ipc.RPC;

?

public?class?IPCQueryClient?{

????public?static?void?main(String[]?args)?{

????????try?{

????????????System.out.println("Interface?name:?"+IPCQueryStatus.class.getName());

????????????System.out.println("Interface?name:?"+IPCQueryStatus.class.getMethod("getFileStatus",?String.class).getName());

????????????InetSocketAddress?addr=new?InetSocketAddress("localhost",?IPCQueryServer.IPC_PORT);

????????????IPCQueryStatus?query=(IPCQueryStatus)?RPC.getProxy(IPCQueryStatus.class,?IPCQueryServer.IPC_VER,?addr,new?Configuration());

????????????IPCFileStatus?status=query.getFileStatus("Z:\\temp\\7c64984cf5c3410fbe28037865d010a3.pdf");

????????????System.out.println(status);

????????????RPC.stopProxy(query);

????????}?catch?(Exception?e)?{

????????????e.printStackTrace();

????????}

????}

}

????客户端的代码很简单,首先构造一个要请求服务器的网络地址(IP和端口),然后通过RPC.getProxy()方法获取到一个IPCQueryStatus对象,然后进行相应的方法调用。其中客户端代码中RPC.getProxy()方法的参数说明如下:

·?第一个参数是IPC接口对象,可以通过IPC接口的静态成员class直接获得。接口的静态成员class保存了该接口的java.lang.Class实例,它表示正在运行的Java应用程序中的类和接口,提供一系列与Java反射相关的重要功能;

·?第二个参数是接口版本,由于接口会根据需求不断地进行升级,形成多个版本的IPC接口,如果客户端和服务器端使用的IPC接口版本不一致,结果将是灾难性的,所以在建立IPC时,需要对IPC的双方进行版本检查;

·?第三个参数是服务器的Socket地址,用于建立IPC的底层TCP连接;

·?第四个参数是Configuration对象,用于定制IPC客户端参数

6?执行结果

客户端的代码编写完成之后就可以运行程序了,先启动服务器端,再运行一个客户端,就完成了一次客户端调用服务器的过程,客户端调用了服务器端?IPCQueryStatusImpl对象的getFileStatus()方法,服务器端返回了方法调用结果即IPCFileStatus对象。服务器端和客户端执行日志如下所示:

服务器端:

2014-11-26?13:00:49,147?WARN??conf.Configuration?(Configuration.java:<clinit>(191))?-?DEPRECATED:?hadoop-site.xml?found?in?the?classpath.?Usage?of?hadoop-site.xml?is?deprecated.?Instead?use?core-site.xml,?mapred-site.xml?and?hdfs-site.xml?to?override?properties?of?core-default.xml,?mapred-default.xml?and?hdfs-default.xml?respectively

Configuration:?core-default.xml,?core-site.xml

2014-11-26?13:00:50,124?INFO??ipc.Server?(Server.java:run(328))?-?Starting?SocketReader

2014-11-26?13:00:50,222?INFO??ipc.Server?(Server.java:run(598))?-?IPC?Server?Responder:?starting

2014-11-26?13:00:50,223?INFO??ipc.Server?(Server.java:run(434))?-?IPC?Server?listener?on?32121:?starting

Server?ready,?press?any?key?to?stop

2014-11-26?13:00:50,224?INFO??ipc.Server?(Server.java:run(1358))?-?IPC?Server?handler?0?on?32121:?starting

protocol:?org.seandeng.hadoop.ipc.IPCQueryStatus

clientVersion:?5473

Method?getFileStatus?Called,?return:?File:?Z:\temp\7c64984cf5c3410fbe28037865d010a3.pdf?Create?at?Wed?Nov?26?13:01:02?CST?2014

客户端:

Interface?name:?org.seandeng.hadoop.ipc.IPCQueryStatus

Interface?name:?getFileStatus

2014-11-26?13:00:59,790?WARN??conf.Configuration?(Configuration.java:<clinit>(191))?-?DEPRECATED:?hadoop-site.xml?found?in?the?classpath.?Usage?of?hadoop-site.xml?is?deprecated.?Instead?use?core-site.xml,?mapred-site.xml?and?hdfs-site.xml?to?override?properties?of?core-default.xml,?mapred-default.xml?and?hdfs-default.xml?respectively

File:?Z:\temp\7c64984cf5c3410fbe28037865d010a3.pdf?Create?at?Wed?Nov?26?13:01:02?CST?2014

?

大数据框架hadoop的IPC机制实例

原文:http://seandeng888.iteye.com/blog/2160714

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!