private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {TransportResponseHandler responseHandler = new TransportResponseHandler(channel);TransportClient client = new TransportClient(channel, responseHandler);TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,rpcHandler);return new TransportChannelHandler(client, responseHandler, requestHandler,conf.connectionTimeoutMs(), closeIdleConnections);}
public TransportChannelHandler initializePipeline(SocketChannel channel,RpcHandler channelRpcHandler) {try {TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);channel.pipeline().addLast("encoder", encoder).addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder()).addLast("decoder", decoder).addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))// NOTE: Chunks are currently guaranteed to be returned in the order of request, but this// would require more logic to guarantee if this were not part of the same event loop..addLast("handler", channelHandler);return channelHandler;} catch (RuntimeException e) {logger.error("Error while initializing Netty pipeline", e);throw e;}}
private void init(String hostToBind, int portToBind) {IOMode ioMode = IOMode.valueOf(conf.ioMode());EventLoopGroup bossGroup =NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server");EventLoopGroup workerGroup = bossGroup;PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());bootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NettyUtils.getServerChannelClass(ioMode)).option(ChannelOption.ALLOCATOR, allocator).childOption(ChannelOption.ALLOCATOR, allocator);if (conf.backLog() > 0) {bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());}if (conf.receiveBuf() > 0) {bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());}if (conf.sendBuf() > 0) {bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());}bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {RpcHandler rpcHandler = appRpcHandler;for (TransportServerBootstrap bootstrap : bootstraps) {rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);}context.initializePipeline(ch, rpcHandler);}});InetSocketAddress address = hostToBind == null ?new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);channelFuture = bootstrap.bind(address);channelFuture.syncUninterruptibly();port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();logger.debug("Shuffle server started on port :" + port);}
原文:http://www.cnblogs.com/gaoxing/p/4985665.html