首页 > Web开发 > 详细

spark源码阅读之network(1)

时间:2015-11-22 12:36:42      阅读:484      评论:0      收藏:0      [点我收藏+]
spark将在1.6中替换掉akka,而采用netty实现整个集群的rpc的框架,netty的内存管理和NIO支持将有效的提高spark集群的网络传输能力,为了看懂这块代码,在网上找了两本书看《netty in action》和《netty权威指南》,结合了spark的源码既学习了netty也看完了spark netty的部分源码。该部分源码掺杂了太多netty的东西,看起来还是有点累的。

缓存模块

network工程里面抽闲了一个ManagerBuffer的接口,该接口用来表示二进制数据中视图(表示数据的一部分),具体的实现依赖数据的来源,目前支持file,nio bytebuffer,netty bytebuf这3中数据来源。注意该接口具体的实现可能脱离了JVM GC的管理,比如NettyManagerBuffer是引用计数的,此时当该buffer传递给其他线程是需要调用retain/release来添加或减少引用。
ManagerBuffer以ByteBuffer, InputStream和Netty对象三种方式对外显示这些数据,ByteBuffer由于消耗过大,不建议使用,添加了引用计数管理和数据大小查询。
  1. public abstract class ManagedBuffer {
  2. /** Number of bytes of the data. */
  3. public abstract long size();
  4. /**
  5. * Exposes this buffer‘s data as an NIO ByteBuffer. Changing the position and limit of the
  6. * returned ByteBuffer should not affect the content of this buffer.
  7. */
  8. // TODO: Deprecate this, usage may require expensive memory mapping or allocation.
  9. public abstract ByteBuffer nioByteBuffer() throws IOException;
  10. /**
  11. * Exposes this buffer‘s data as an InputStream. The underlying implementation does not
  12. * necessarily check for the length of bytes read, so the caller is responsible for making sure
  13. * it does not go over the limit.
  14. */
  15. public abstract InputStream createInputStream() throws IOException;
  16. /**
  17. * Increment the reference count by one if applicable.
  18. */
  19. public abstract ManagedBuffer retain();
  20. /**
  21. * If applicable, decrement the reference count by one and deallocates the buffer if the
  22. * reference count reaches zero.
  23. */
  24. public abstract ManagedBuffer release();
  25. /**
  26. * Convert the buffer into an Netty object, used to write the data out.
  27. */
  28. public abstract Object convertToNetty() throws IOException;
  29. }
ManageredBuffer每一种数据来源有一个实现类。先看下数据来源为file的。
  1. public final class FileSegmentManagedBuffer extends ManagedBuffer {
  2. private final TransportConf conf;
  3. private final File file;
  4. private final long offset;
  5. private final long length;
  6. public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length) {
  7. this.conf = conf;
  8. this.file = file;
  9. this.offset = offset;
  10. this.length = length;
  11. }
  12. @Override
  13. public long size() {
  14. return length;
  15. }
  16. @Override
  17. public ByteBuffer nioByteBuffer() throws IOException {
  18. FileChannel channel = null;
  19. try {
  20. channel = new RandomAccessFile(file, "r").getChannel();
  21. // Just copy the buffer if it‘s sufficiently small, as memory mapping has a high overhead.
  22. if (length < conf.memoryMapBytes()) {
  23. ByteBuffer buf = ByteBuffer.allocate((int) length);
  24. channel.position(offset);
  25. while (buf.remaining() != 0) {
  26. if (channel.read(buf) == -1) {
  27. throw new IOException(String.format("Reached EOF before filling buffer\n" +
  28. "offset=%s\nfile=%s\nbuf.remaining=%s",
  29. offset, file.getAbsoluteFile(), buf.remaining()));
  30. }
  31. }
  32. buf.flip();
  33. return buf;
  34. } else {
  35. return channel.map(FileChannel.MapMode.READ_ONLY, offset, length);
  36. }
  37. } catch (IOException e) {
  38. try {
  39. if (channel != null) {
  40. long size = channel.size();
  41. throw new IOException("Error in reading " + this + " (actual file length " + size + ")",
  42. e);
  43. }
  44. } catch (IOException ignored) {
  45. // ignore
  46. }
  47. throw new IOException("Error in opening " + this, e);
  48. } finally {
  49. JavaUtils.closeQuietly(channel);
  50. }
  51. }
  52. @Override
  53. public InputStream createInputStream() throws IOException {
  54. FileInputStream is = null;
  55. try {
  56. is = new FileInputStream(file);
  57. ByteStreams.skipFully(is, offset);
  58. return new LimitedInputStream(is, length);
  59. } catch (IOException e) {
  60. try {
  61. if (is != null) {
  62. long size = file.length();
  63. throw new IOException("Error in reading " + this + " (actual file length " + size + ")",
  64. e);
  65. }
  66. } catch (IOException ignored) {
  67. // ignore
  68. } finally {
  69. JavaUtils.closeQuietly(is);
  70. }
  71. throw new IOException("Error in opening " + this, e);
  72. } catch (RuntimeException e) {
  73. JavaUtils.closeQuietly(is);
  74. throw e;
  75. }
  76. }
  77. @Override
  78. public ManagedBuffer retain() {
  79. return this;
  80. }
  81. @Override
  82. public ManagedBuffer release() {
  83. return this;
  84. }
  85. @Override
  86. public Object convertToNetty() throws IOException {
  87. if (conf.lazyFileDescriptor()) {
  88. return new LazyFileRegion(file, offset, length);
  89. } else {
  90. FileChannel fileChannel = new FileInputStream(file).getChannel();
  91. return new DefaultFileRegion(fileChannel, offset, length);
  92. }
  93. }
  94. public File getFile() { return file; }
  95. public long getOffset() { return offset; }
  96. public long getLength() { return length; }
  97. @Override
  98. public String toString() {
  99. return Objects.toStringHelper(this)
  100. .add("file", file)
  101. .add("offset", offset)
  102. .add("length", length)
  103. .toString();
  104. }
  105. }
nioByteBuffer,如果数据大小小于spark.storage.memoryMapThreshold。那么使用ByteBufer读取通道的数据,如果大于等于该值,那么使用文件内存映射方式读取数据。
createInputStream中返回一个控制读取长度的LimitedInputStream,这里使用guava的ByteStreams
convertToNetty返回一个FileRegion。如果spark.shuffle.io.lazyFD设置为true那么使用LazyFileRegion,如果为false使用DefaultFileRegion。LazyFileRegion会在传输的时候生成FileChannel,注解说如果netty使用了epoll协议那么不可以使用LazyFileRegion。

数据源为ByteBuf的实现类,该类用Bytebuf来存储数据。
  1. public final class NettyManagedBuffer extends ManagedBuffer {
  2. private final ByteBuf buf;
  3. public NettyManagedBuffer(ByteBuf buf) {
  4. this.buf = buf;
  5. }
  6. @Override
  7. public long size() {
  8. return buf.readableBytes();
  9. }
  10. @Override
  11. public ByteBuffer nioByteBuffer() throws IOException {
  12. return buf.nioBuffer();
  13. }
  14. @Override
  15. public InputStream createInputStream() throws IOException {
  16. return new ByteBufInputStream(buf);
  17. }
  18. @Override
  19. public ManagedBuffer retain() {
  20. buf.retain();
  21. return this;
  22. }
  23. @Override
  24. public ManagedBuffer release() {
  25. buf.release();
  26. return this;
  27. }
  28. @Override
  29. public Object convertToNetty() throws IOException {
  30. return buf.duplicate();
  31. }
  32. @Override
  33. public String toString() {
  34. return Objects.toStringHelper(this)
  35. .add("buf", buf)
  36. .toString();
  37. }
  38. }
把一个bytebuf对象转成InputStream对象使用ByteBufInputStream对象来完成。还有bytebuf的duplicate()返回一个bytebuf映射同一份数据,任何一个修改结果都会影响另一个,注意引用计数。参见http://www.maljob.com/pages/newsDetail.html?id=394

还一个数据源为bytebuffer的实现
  1. public final class NioManagedBuffer extends ManagedBuffer {
  2. private final ByteBuffer buf;
  3. public NioManagedBuffer(ByteBuffer buf) {
  4. this.buf = buf;
  5. }
  6. @Override
  7. public long size() {
  8. return buf.remaining();
  9. }
  10. @Override
  11. public ByteBuffer nioByteBuffer() throws IOException {
  12. return buf.duplicate();
  13. }
  14. @Override
  15. public InputStream createInputStream() throws IOException {
  16. return new ByteBufInputStream(Unpooled.wrappedBuffer(buf));
  17. }
  18. @Override
  19. public ManagedBuffer retain() {
  20. return this;
  21. }
  22. @Override
  23. public ManagedBuffer release() {
  24. return this;
  25. }
  26. @Override
  27. public Object convertToNetty() throws IOException {
  28. return Unpooled.wrappedBuffer(buf);
  29. }
  30. @Override
  31. public String toString() {
  32. return Objects.toStringHelper(this)
  33. .add("buf", buf)
  34. .toString();
  35. }
  36. }
 这里面一个有意思的显示就是把bytebuffer转成bytebuf使用netty中Unpooled.wrappedBuffer()实现








spark源码阅读之network(1)

原文:http://www.cnblogs.com/gaoxing/p/4985558.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!