首页 > 其他 > 详细

Hadoop IPC 调用

时间:2016-02-24 19:00:41      阅读:220      评论:0      收藏:0      [点我收藏+]

IPC 客户端:
数据节点连接名字节点时:
在startDataNode方法中获取namenode代理对象
DatanodeProtocol namenode = (DatanodeProtocol)
RPC.waitForProxy(DatanodeProtocol.class,DatanodeProtocol.versionID,nameNodeAddr,conf);

IPC调用:

客户端会创建一个代理对象:
1.创建一个Invoker对象,该对象实现了InvocationHandler接口,在其构造器中,
会建立一个Client对象和ConnectionId对象

ConnectionID类:
调用的协议:Class<?> protocol:比如DataNodeProtocol.class
远程服务器地址:InetSocketAddress addr:从configure文件中获取
用户信息: UserGroupInformation ticket
tcpNoDelay标志位false:表示关闭底层缓冲,确保数据及时发送
int pingInterval:心跳时间间隔 1min
int maxIdleTime:call集合中如果为空,最多等待时间,默认10秒,否则关闭连接
rpcTimeout=0
connTimeout:Long.MAX_VALUE

Client类:
Class<? extends Writable> valueClass : ObjectWritable.class
AtomicBoolean running:默认为true,表明Client在运行中,尚未关闭
Hashtable<ConnectionId, Connection> connections:存放键ConnectionID和对应的Connection值对象
Configuration conf

2.获取代理对象:
VersionedProtocol proxy = (VersionedProtocol)Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[]{protocol}, invoker);

在我们调用代理对象中的方法时,如:
namenode.versionRequest();
执行Invoker对象invoke方法:
调用client对象的call方法,获取一个objectWritable对象,再调用该对象的get()方法

call方法:
两个参数 Invocation对象(方法类和参数值),Invocation类实现Writable接口,另一个参数 是connectionId
a.先创建一个Call对象,Call对象存放的是每一次远程调用中的Invocaiton,返回的结果
Client.Call类:
int id: call id
Writable param: Invocation 对象,封装了远程调用的方法和参数值
Writable value:远程方法调用返回的结果
IOException error:在连接过程或调用过程中抛出的的异常
boolean done:如果为true,表明本次call调用完毕

b.连接远程服务器:
1.同步connections集合对象:根据connectionId,获取Connection对象,如果没有connection对象,就创建connection对象,把当前的connectionId和connection对象放入集合中,以便以后重复获取
Connection类
InetSocketAddress server: IPC服务器地址
ConnectionHeader header:连接消息头
ConnectionId remoteId:IPC连接标识

Socket socket:TCP连接的Socket对象
DataInputStream in: 输入流
DataOutputStream out:输出流

Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>():当前正在处理的远程调用
AtomicLong lastActivity = new AtomicLong():IPC连接最近一次通信时间
AtomicBoolean shouldCloseConnection = new AtomicBoolean():连接关闭标志
IOException closeException:导致IPC连接关闭的异常
int pingInterval:接受数据时的等待超时时间,为1min
ConnectionHeader类:
String protocol: 协议名
UserGroupInformation ugi:用户信息
AuthMethod authMethod:用户验证方式
2.同步Connection对象,判断本次连接是否关闭,如果关闭则返回,否则把call id和call对象
放入calls集合中,并调用notify方法,取消Connection对象的run方法中的wait()等待

3.同步Connection对象,使用基本套接字与服务器建立连接,socket对象不为空(表明已经连接)或者需要关闭连接,则不再需要连接:
否则(表明第一次连接远程服务器,需要放hrpc头部,连接头)
a.设置Connection对象的属性值socket对象:
获取socket对象:
socket=SocketChannel.open().socket()
socket.setTcpNoDelay(tcpNoDelay) ---- 关闭Socket底层缓存
连接服务器:
socket.connect(server, timeout);
---timeout等待建立连接的时间,默认为20秒,连接失败,抛出SocketTimeoutException
socket.setSoTimeout(pingInterval) ---接受数据时的等待超时时间,默认为1min,如果超时,抛出SocketTimeoutException
b.如果连接等待超时或者接受数据超时,关闭socket对象:socket.close();socket = null
尝试重新连接,最多45次,否则抛出异常
c.对于其它异常,关闭socket对象:socket.close();socket = null
尝试重新连接,最多10次,否则抛出异常
d.获取socket输入流,socket输出流对象
e.写入hrpc字节数组,写入当前版本4,写入用户验证方式(80简单验证)
f.获取Connection对象中的in和out对象
this.in = new DataInputStream(new BufferedInputStream(new PingInputStream(inStream)));----PingInputStream添加心跳检查的能力
this.out = new DataOutputStream(new BufferedOutputStream(outStream));
h.写入消息头信息:
写入消息头长度,写入协议接口名字,如果是简单用户验证,写入布尔值true和有效用户名,如果有真正用户的话,写入布尔值 true和真正用户名,
否则写入布尔值false
---先把所有的信息写入的ByteArrayOutputStream字节数组里,然后把长度和所有信息写到输出流中
i.更新connection对象中最后一次访问时间,并启动connection线程
以上过程中,如果连接失败
markClosed:shouldCloseConnection设为true,表示关闭当前连接,设置Connection对象中的
closeException(何种异常)取消Connection对象的run方法中的wait()等待(notifyall()方法)
close方法:
从connections集合中,remove 对应的connectionId 和connection对象
如果Connection对象中的in或者out打开,则关闭in或者out对象
获取本次连接对象中的所有call调用,设置异常(call对象中的error),设置call中的done为true,取消call对象等待wati()方法,并从calls集合中删除

4.建立连接后,发送本次调用信息(如果连接已关闭不需要发送调用信息):
对Connection对象中的out加锁: 写入信息的长度,写入call id,写入Invocation序列化对象,
包括方法名,参数个数,参数值,参数类型等信息
ObjectWritable.writeObject(out, parameters[i], parameterClasses[i],conf);
如果写入信息失败,调用markClosed(e),关闭DataOutputBuffer对象
(在waitForWork()最终根据shouldCloseConnection设为 true来关闭连接,最终调用close()方法)

5.connection线程中的run方法:
a.如果calls集合为空,shouldCloseConnection为false,running为true,wait(maxIdleTime)
-----maxIdleTime 10s,等待10s
如果calls不为空,shouldCloseConnection为false,running为true,开始循环处理
receiveResponse()方法:
1.更新上次连接访问时间,
2.开始读取服务器端发送的数据:首先从In对象读取call id,在calls集合中根据call id获取Call对象
3.再次读取call状态信息,判断状态:
a.成功:读取数据到Call对象中的value对象,调用call对象的notify(),done 设为true,从call集合中删除当前call
b.如果是error:读取类名和错误信息到call对象中,建立RemoteException,调用call对象的notify(),done 设为true,从call集合中删除当前call
c.如果是fatal(表示没有权限):
markClosed(new RemoteException(WritableUtils.readString(in), WritableUtils.readString(in)));
---关闭当前连接读取类名和错误信息,建立RemoteException到connectino对象中,shouldCloseConnection 设为true,调用notifyall()
d.在读取数据发生除了接受数据异常外,调用markClosed(e);
e.如果在读数据时,接受数据超时(1min),调用handleTimeout方法:
关闭连接标志位true,client运行标志位false,表明连接已断开,抛出超时异常,最后调用markClosed(e)方法和close()方法
否则发送一次心跳sendPing()
long curTime = System.currentTimeMillis();
----当前时间减去上次访问时间大于等于ping时间
否则,如果shouldCloseConnection为true,不需要调用receiveResponse方法,直接调用close()方法
否则,如果calls为空,说明连接长期出去空闲状态,或者已经处理完毕,
则调用markClosed(null)
---shouldCloseConnection设为true,异常设为null,取消Connection对象的run方法中
的wait()等待(notifyall()方法)
不调用receiveResponse方法,直接调用close()方法
否则,说明整个client需要关闭,并且calls中还有调用请求:
调用markClosed((IOException)new IOException().initCause(new InterruptedException()))
----shouldCloseConnection设为true,异常设为IO异常,调用notifyall(),不调用receiveResponse方法,直接调用close()方法
b.调用close()方法

三个地方更新lastActivity:
1. 第一次连接成功
2. 接受服务器端数据之前
3. 如果接受数据超时,更新lastActivity发送一个心跳

取消call对象的等待:
1.第一次连接服务器时,如果连接失败:调用Connection对象的close方法会调用call对象的callComplete方法取消等待
2.第一次连接成功后,接受该call对象的数据时:
如果发生读异常,调用Connection对象的close方法调用call对象的callComplete方法取消等待
读取完毕:成功,失败,还是致命都会调用callComplete方法


Listener线程:Listener是一个标准的NIO应用
Listener对象属性:
InetSocketAddress address=new InetSocketAddress(bindAddress, port);
---封装服务器的地址和端口号的InetSocketAddress对象
ServerSocketChannel acceptChannel: 服务器端socket通道
int backlogLength 指定在该监听端口上排队的请求的最大长度15
long lastCleanupRunTime 初始化为0,上次清理connection对象时间
ExecutorService readPool 线程池
long cleanupInterval = 10000: 清理Connetion对象的最多时间间隔

Listener()方法:
1.创建一个ServerSocketChannel对象:
InetSocketAddress address=new InetSocketAddress(bindAddress, port);---封装服务器的地址和端口号的InetSocketAddress对象
ServerSocketChannel acceptChannel = ServerSocketChannel.open();
2.设ServerSocketChannel工作于非阻塞方式:
acceptChannel.configureBlocking(false);----调用accetp方法时,没有客户连接立即返回null
3.获取ServerSocket对象:
ServerSocket socket = acceptChannel.socket()
4.绑定本地地址和端口:
socket.bind(InetSocketAddress address,int backlog);
---backlog指定在该监听端口上排队的请求的最大长度15
5.建立一个选择器:
Selector selector= Selector.open()
6.用当前的ServerSocketChannel对象注册SelectionKey.OP_ACCEPT 到当前选择器中:
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
8.设置Listener为守护线程
7.建立reader线程,对每个reader线程分配一个读选择器,启动reader线程

Reader线程的run()方法:
a.
readSelector.select(); ----一直阻塞,直到readSelector被唤醒
while (adding) { adding为true一直循环
this.wait(1000);
}
b.如果有读事件注册并收到,adding为false:
i). 获取Selector的selected-keys集合:
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
ii). 迭代每个SelectionKey,取出一个,并把SelectionKey从Selector的selected-key集合中删除:
iii).调用doRead()方法

Listener线程启动:
run()方法:
1.为Listener选择器接受客户请求:
selector.select();
2.获取Selector的selected-keys集合:
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
3.迭代每个SelectionKey,取出一个,并把SelectionKey从Selector的selected-key集合中删除
4.调用doAccept(SelectionKey key)方法:
a.获取该SelectionKey对应的ServerSocketChannel对象:
ServerSocketChannel server = (ServerSocketChannel) key.channel();
b.获取与客户关联的SocketChannel对象:
channel = server.accept()
c.设置SocketChanel对象为异步方式,并且取消socket底层的缓存:
对于非阻塞模式下,读写数据有多少读写多少
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(tcpNoDelay);
d.获取reader线程:
设置:adding = true;
唤醒读选择器:readSelector.wakeup();

e.reader线程注册当前通道读事件,获取readKey:
SelectionKey readKey= channel.register(readSelector, SelectionKey.OP_READ)
f.创建Connection对象:
Connection c = new Connection(readKey, channel, System.currentTimeMillis())

Server.Connection对象:
SocketChannel channel:Connection连接对应的客户端socket通道
Socket socket:channel.socket()获取通道对应的socket对象 InetAddress addr:socket.getInetAddress(); 获取soket的远程服务器地址
int remotePort: socket.getPort();获取socket的远程服务器端口号
LinkedList<Call> responseQueue = new LinkedList<Call>();
--- 应答队列
ByteBuffer data:初始为空,读取调用的序列化数据
ByteBuffer dataLengthBuffer:ByteBuffer.allocate(4) 分配4个字节
ByteBuffer unwrappedData:初始为空
ByteBuffer unwrappedDataLengthBuffer:ByteBuffer.allocate(4)
boolean rpcHeaderRead = false; 初始化为false,表明还没读取rpc header
boolean headerRead = false; 初始化为false,表明还没读取connectionHeader
ConnectionHeader header = new ConnectionHeader();
ByteBuffer rpcHeaderBuffer;
Class<?> protocol AuthMethod authMethod
UserGroupInformation user
int rpcCount=0: 当前rpc调用次数,初始化为0
long lastContact:上次访问时间,第一次创建Connection对象的时候,也就是要关联
SelectionKey的时候
g.readKey与Connection 关联:
readKey.attach(c);
h.把Connection对象放入connectionList集合中:
connectionList.add(numConnections, c);
numConnections++;
i.最终设置adding =false,并且调用线程reader的notify()方法


doRead(SelectionKey key)方法:
1.获取readKey关联的Connection对象:
Connection c = (Connection)key.attachment();
2.跟新最近访问时间:
c.setLastContact(System.currentTimeMillis());
3.读取和处理数据:
count = c.readAndProcess();
4.如果出了中断以外的异常或者readAndProcess方法返回-1,则关闭当前连接:
closeConnection(c);
c=null
5.否则更新最近一次访问时间:
c.setLastContact(System.currentTimeMillis());

Connection中的readAndProcess()方法:
1.循环读取client端发送的数据,直到读完4个字节到ByteBuffer对象dataLengthBuffer中:
int count =channel.read(buffer)----非阻塞模式下,数据有多少读多少

2.rpcHeaderRead 为false时,表明还没有读取rpc头部,此时:
a.分配两个字节的ByteBuffer对象:rpcHeaderBuffer = ByteBuffer.allocate(2),读取字节到该对象中:
int count =channel.read(buffer)----非阻塞模式下,数据有多少读多少
b.获取rpcHeaderBuffer 中的第一个字节为版本号:
int version = rpcHeaderBuffer.get(0);
c.获取第二个字节为用户验证方式,如simple 验证:
byte[] method = new byte[] {rpcHeaderBuffer.get(1)};
authMethod = AuthMethod.read(new DataInputStream(new ByteArrayInputStream(method)));
d.调用dataLengthBuffer.flip();此时positon=0 limit=4
e.读取dataLengthBuffer中的字节数组,如果不是‘hrpc‘,count设为-1,返回
版本的值不是4,count设为-1,返回
f.调用dataLengthBuffer.clear()方法:此时positon=0 limit=4
最后rpcHeaderRead设为true,表名rpc头部已读过,并设置rpcHeaderBuffer为null

3.接下来读取connectionHeader数据或者正常发送的数据:
a.再次读取四个字节到该对象中,调用dataLengthBuffer.flip()方法:positon=0 limit=4
b.获取发送的数据长度:dataLength = dataLengthBuffer.getInt();
i. 如果数据长度为-1,说明是心跳发送,useWrap如果为false:
调用dataLengthBuffer.clear(),此时positon=0 limit=4,并返回0
ii. 否则,分配dataLength个字节的ByteBuffer对象给data:
ByteBuffer data = ByteBuffer.allocate(dataLength);
c. 读取数据直到data已满:
1). 调用dataLengthBuffer.clear():positon=0 limit=4,是它回到初始状态
2). 翻转data:data.flip(): positon=0 limit=dataLength
3). 处理一次调用数据processOneRpc(data.array());
4). 如果本次读取的是connectionHeader数据,则继续处理

processOneRpc(byte[] buf) 方法:
1.如果读取的是connectionHeader数据:
a. 获取dataInputStream对象:
DataInputStream in = new DataInputStream(new ByteArrayInputStream(buf));
b.调用ConnectionHeader的readFields方法到ConnectionHeader对象中,header.readFields(in):
1). 读取实现的协议接口类名字:protocol = Text.readString(in);
2).获取布尔值true和有效用户名
boolean realUserNamePresent = in.readBoolean();
String username = in.readUTF();
3).获取布尔值true和有真正用户名:
boolean realUserNamePresent = in.readBoolean();
String realUserName = in.readUTF();
4).获取用户和用户组信息:
c.查看ConnectionHeader对象中的protocol,并获取对应的Class对象:
d.设置用户和用户组验证方式:
UserGroupInformation protocolUser = header.getUgi();
user.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
e.调用authorizeConnection()方法,返回false,则抛出AccessControlException异常:
1).ProxyUsers.authorize(user, this.getHostAddress(), conf);
authorize(user, header, getHostInetAddress());
2)验证失败,则调用
setupResponse(authFailedResponse, authFailedCall, Status.FATAL, null,
ae.getClass().getName(), ae.getMessage());
并返回-1表示连接失败

2.否则是一次方法调用的相关数据:
a. 获取dataInputStream对象:
DataInputStream dis= new DataInputStream(new ByteArrayInputStream(buf));
b.获取call id: int id = dis.readInt();
c.获取Invocation对象信息(包括方法名,参数个数,参数值,参数类型等信息):
Writable param = ReflectionUtils.newInstance(paramClass, conf);//read param
param.readFields(dis);
d.创建一个Call对象:Call call = new Call(id, param, this);
e.把call对象放入BlockingQueue队列中:
callQueue.put(call)
f.rpcCount加1,表明一次需要处理rpc调用

Handler 的run方法:---BlockingQueue<Call> callQueue 中存放的是待处理的call对象
1.从队列中获取call对象,如果队列为空,则阻塞(BlockingQueue特性)
final Call call = callQueue.take();
2.调用call方法:value=call(call.connection.protocol, call.param, call.timestamp);
3.对call.connection.responseQueue加锁,调用
---当调用call方法时抛出异常,则error不为空:
setupResponse(buf, call, (error == null) ? Status.SUCCESS : Status.ERROR, value, errorClass, error);

setupResponse(ByteArrayOutputStream response, Call call, Status status, Writable rv, String errorClass, String error):
a.ByteArrayOutputStream response = new ByteArrayOutputStream(10240)
response.reset();
b.封装成DataOutputStream对象:DataOutputStream out = new DataOutputStream(response);
c.写入call id,写入处理的状态:
out.writeInt(call.id); // write call id
out.writeInt(status.state); // write status
d.如果成功:把调用返回的结果ObjectWritable对象放到字节数组:rv.write(out);
e.否则:写入错误类名和错误信息:
WritableUtils.writeString(out, errorClass);
WritableUtils.writeString(out, error);
f.调用call的setResponse方法,把写入的数据存放call对象的ByteBuffer对象:
call.setResponse(ByteBuffer.wrap(response.toByteArray()));
4. 调用responder.doRespond(call)方法:
a.把call放入Connectiond对象中的responseQueue中:call.connection.responseQueue.addLast(call);
b.对一个每个Connection对象,如果responseQueue中只有一个元素:
则让handler来处理信息的发送(避免线程切换的开销):processResponse(call.connection.responseQueue, true)
1).对responseQueue加锁,获取该队列的元素个数,如果元素个数为0,说明该通道上没有等待的数据,返回true
2).否则,获取第一个call调用:call = responseQueue.removeFirst();
3).获取socketChannel对象:SocketChannel channel = call.connection.channel;
4).异步方式尽可能的往通道写入数据:
int numBytes = channelWrite(channel, call.response);
5).如果所有数据已写入通道:
i). rpcCount-- 表示一次rpc调用正式结束
ii).判断numElements,如果为1,说明是最后一次调用,设置done为true(该通道上没有等待的数据)
否则,done为false
6).如果只写入一部分数据:
i).重新把call调用放入responseQueue队列头部中:
call.connection.responseQueue.addFirst(call);
ii).如果是Handler来发送数据:
a.修改最近一次访问时间:call.timestamp = System.currentTimeMillis();
b.Responder中pending 加1,表明Responder对这个call调用还需处理(第一次是Handler处理的)
c.唤醒可能处于select中等待的Responder选择器:writeSelector.wakeup();
d.将通道注册到该选择器中:
channel.register(writeSelector, SelectionKey.OP_WRITE, call);
e.发现异常:设置done为true
g.最后pending 减1

-----对于某个connection对象:
如果所有call调用完毕,则done=true error=false
对于某个call调用,如果已经读到通道尾:done=true error=false
本次调用结束,且队列只有一个调用:则done=true error=false
本次调用结束,但队列还有其它call调用,则done=false error=false
如果在选择器注册读的过程中,发生通道关闭异常,done=true, error=false

注意:Responder的run方法:在执行writeSelector.select(PURGE_INTERVAL);---有可能阻塞等待
首先: pending+1,唤醒选择器:writeSelector.wakeup();
这时,Responder的run方法会结束本次,进入下次循环,调用方法:
while (pending > 0) {
wait(); }
这时Repsonder的run方法一直在等待中
最后 pending-1后,再回结束Responder中的循环,重新执行writeSelector.select(PURGE_INTERVAL);方法

call()方法:
1.强制转换param为Invocation对象:
Invocation call = (Invocation)param;
2.获取Method对象:
Method method = protocol.getMethod(call.getMethodName(),call.getParameterClasses());
method.setAccessible(true);
3.设置开始处理时间:long startTime = System.currentTimeMillis();
调用实际调用的方法:Object value = method.invoke(instance, call.getParameters());
其中instance 是NameNode对象
4.获取调用方法所用的时间:
int processingTime = (int) (System.currentTimeMillis() - startTime);
在queue的时间:int qTime = (int) (startTime-receivedTime);
5.获取ObjectWritable对象:new ObjectWritable(method.getReturnType(), value);

 

Responder()方法:
1.设置该线程为守护线程
2.打开一个写选择器:Selector writeSelector = Selector.open()
3.pending =0

run()方法:
1.判断Responder对象的pending,如果大于0,则调用wait()方法等待
2.调用写选择器的select方法:writeSelector.select(900000); ---最多阻塞15分钟
3.获取SelectionKey:Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
4.循环迭代:
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
try {
if (key.isValid() && key.isWritable()) {
doAsyncWrite(key);
}
} catch (IOException e) {
LOG.info(getName() + ": doAsyncWrite threw exception " + e);
}
}

doAsyncWrite方法:
a.获取call调用Call call = (Call)key.attachment();
b.锁住call.connection.responseQueue对象:
调用processResponse(call.connection.responseQueue, false)如果返回为true,即当前连接的所有call调用都结束,调用key.interestOps(0)清除所有兴趣操作集
5.获取当前时间,如果当前时间小于上次清理时间+900000s,继续上面处理
6.否则,获取选择器感兴趣的所有key,对于每个key,获取call对象存放到call集合中
清除每个call对象:doPurge(call, now):
a.获取responseQueue集合:
LinkedList<Call> responseQueue = call.connection.responseQueue;
b.迭代获取每个call对象:每个call对象对应一个connection,获取connection的ResponseQueue: c.对ResponseQueue中的call进行判断:
如果这个call超过时间,关闭这个call连接:
synchronized (responseQueue) {
Iterator<Call> iter = responseQueue.listIterator(0);
while (iter.hasNext()) {
call = iter.next();
if (now > call.timestamp + PURGE_INTERVAL) {
closeConnection(call.connection);
break;
}
}

7. closeConnection(Connection connection):
从connectionlist清除connection对象:
synchronized (connectionList) {
if (connectionList.remove(connection))
numConnections--;
}
8.关闭connection连接:
connection.close():
data =null
dataLengthBuffer = null;
关闭socket,channel:socket.shutdownOutput();channel.close();socket.close()

Hadoop IPC 调用

原文:http://www.cnblogs.com/jinkangkai/p/5213831.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!