这篇文章其实不是专门来讲DFSClient源码的,就光这个类就近4000行代码,不用说牵扯到其他类的代码,围绕着以下错误展开对DFSClient分析
由于最近flume1.4.0报
04 Apr 2014 07:11:53,111 WARN [ResponseProcessor for block blk_326610323152553165_1164644] (org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$ResponseProcessor.run:3015) - DFSOutputStream ResponseProcessor exception for block blk_326610323152553165_1164644java.net.SocketTimeoutException:
69000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/10.95.198.123:27691 remote=/10.95.198.22:60010]
at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:155)
at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:128)
at java.io.DataInputStream.readFully(DataInputStream.java:195)
at java.io.DataInputStream.readLong(DataInputStream.java:416)
at org.apache.hadoop.hdfs.protocol.DataTransferProtocol$PipelineAck.readFields(DataTransferProtocol.java:124)
at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$ResponseProcessor.run(DFSClient.java:2967)
04 Apr 2014 07:11:53,112 WARN [DataStreamer for file /user/hive/warehouse_ziyan/flume/woStoreSoftWDownload/20140404/.woStoreSoftWDownloadTotal.1396540800474.tmp block blk_326610323152553165_1164644] (org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError:3051)
- Error Recovery for block blk_326610323152553165_1164644 bad datanode[0] 10.95.198.22:60010
04 Apr 2014 07:11:53,112 WARN [DataStreamer for file /user/hive/warehouse_ziyan/flume/woStoreSoftWDownload/20140404/.woStoreSoftWDownloadTotal.1396540800474.tmp block blk_326610323152553165_1164644] (org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError:3102)
- Error Recovery for block blk_326610323152553165_1164644 in pipeline 10.95.198.22:60010, 10.95.198.21:60010, 10.95.198.11:60010: bad datanode 10.95.198.22:60010
不得不深究一下DFSClient,其实DFSClient与flume没有直接的关系,但是flume操作FileSystem,而DistributedFileSystem有调用了DFSClient,所以有间接的关系,
以上错误,可修改org.apache.flume.sink.hdfs.Bucketwriter
private void open() throws IOException, InterruptedException { if ((filePath == null) || (writer == null)) { throw new IOException("Invalid file settings"); } final Configuration config = new Configuration(); // disable FileSystem JVM shutdown hook config.setBoolean("fs.automatic.close", false);
//添加如下两句 config.set("dfs.socket.timeout","3600000"); config.set("dfs.datanode.socket.write.timeout","3600000"); // Hadoop is not thread safe when doing certain RPC operations, // including getFileSystem(), when running under Kerberos. // open() must be called by one thread at a time in the JVM. // NOTE: tried synchronizing on the underlying Kerberos principal previously // which caused deadlocks. See FLUME-1231. synchronized (staticLock) { checkAndThrowInterruptedException(); try { long counter = fileExtensionCounter.incrementAndGet(); String fullFileName = fileName + "." + counter; if (fileSuffix != null && fileSuffix.length() > 0) { fullFileName += fileSuffix; } else if (codeC != null) { fullFileName += codeC.getDefaultExtension();
socketTimeout,datanodeWriteTimeout默认为1分,8分,当然socketTimeout是主要的,以上错误是DFSOutputStream读取物block超时,ResponseProcessor响应不到就会报错
针对这个问题,我之前以为hdfs-site.xml那个dfs.socket.timeout配置会影响到DFSClient,其实不是的,只会设置datanode
flume 端:
HDFSDataStream
@Override public void open(String filePath) throws IOException { Configuration conf = new Configuration(); Path dstPath = new Path(filePath); FileSystem hdfs = dstPath.getFileSystem(conf); if(useRawLocalFileSystem) { if(hdfs instanceof LocalFileSystem) { hdfs = ((LocalFileSystem)hdfs).getRaw(); } else { logger.warn("useRawLocalFileSystem is set to true but file system " + "is not of type LocalFileSystem: " + hdfs.getClass().getName()); } }
服务端DistributedFileSystem
public void initialize(URI uri, Configuration conf) throws IOException {
super.initialize(uri, conf);
setConf(conf);
String host = uri.getHost();
if (host == null) {
throw new IOException("Incomplete HDFS URI, no host: "+ uri);
}
InetSocketAddress namenode = NameNode.getAddress(uri.getAuthority());
this.dfs = new DFSClient(namenode, conf, statistics);
this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
this.workingDir = getHomeDirectory();
}
不过像hive,hbase,都像flume一样都会存在这样的问题,如下是hbase
http://blog.csdn.net/wangqiaoshi/article/details/22900641
hdfs DFSClient 源码分析2,布布扣,bubuko.com
原文:http://blog.csdn.net/wangqiaoshi/article/details/22981011