代码生成类解析:
Thrift--facebook RPC框架,介绍就不说了,百度,google一大把,使用也不介绍,直接上结构和分析吧。
Hello.thrift文件内容如下:
namespace java com.tomsun.thrift.generated.demo service Hello { string helloString(1:string para) }
内容很简单,申明个RPC service(Hello),服务方法helloString,方法参数格式(seq: parameter type, parameter name),参数需要标号(1: xxx xxx, 2: xxx xxx), namespace 指定生成代码语言类型(java),和Java包名(本文只讨论java ^#^!).
类文件解析:
话说就上面定义一个service(只包含一个method),可生成的类文件可是一个庞然大物(975行代码),在此就不全贴出来了,说明时会贴出关键代码用于说明。
因为thrift是一个完整的RPC框架,内部结构分的很细,所以代码生成量理所当然,(protobuf的所谓的RPC,只不过是个架子,空的,基本上都得自己去实现。当protobuf序列化自己感觉比thrift丰富多了(sint,fint,int,uint)),
但thrift支持的容器类型(List,set, map)比protobuf多(list),具体介绍等以后再详细,回到正题。
thrift compiler生成的Hello.java 文件,以服务名为类文件名。服务接口定义如下:
1 public interface Iface { //同步RPC服务调用接口定义 2 3 public String helloString(String para) throws org.apache.thrift.TException; 4 5 } 6 7 public interface AsyncIface { //异步RPC服务调用接口定义(仔细瞅,方法返回值参数string没了,方法参数多了个AsyncMethodCallback) 8 9 public void helloString(String para, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; 10 11 }
Async形式具体返回值是通过callback回调来获得。定义如下:
1 public interface AsyncMethodCallback<T> { 2 /** 3 * This method will be called when the remote side has completed invoking 4 * your method call and the result is fully read. For oneway method calls, 5 * this method will be called as soon as we have completed writing out the 6 * request. 7 * @param response 8 */ 9 public void onComplete(T response); 10 11 /** 12 * This method will be called when there is an unexpected clientside 13 * exception. This does not include application-defined exceptions that 14 * appear in the IDL, but rather things like IOExceptions. 15 * @param exception 16 */ 17 public void onError(Exception exception); 18 }
自己定义异步RPC得实现该接口,同步RPC客户端骨架及其工厂类如下。
1 public static class Client extends org.apache.thrift.TServiceClient implements Iface { 2 public static class Factory implements org.apache.thrift.TServiceClientFactory<Client> {
Client,客户端调用RPC骨架实现,封装了底层复杂RPC序列化,和网络传输。
public static class AsyncClient extends org.apache.thrift.async.TAsyncClient implements AsyncIface { public static class Factory implements org.apache.thrift.async.TAsyncClientFactory<AsyncClient> {
感觉thrift的工厂有点鸡肋的感念,没有设计模式三种工厂那种封装的味道,可有可无。
具体看一下同步骨架内部:
1 public String helloString(String para) throws org.apache.thrift.TException 2 { 3 send_helloString(para); 4 return recv_helloString(); 5 } 6 7 public void send_helloString(String para) throws org.apache.thrift.TException 8 { 9 helloString_args args = new helloString_args(); 10 args.setPara(para); 11 sendBase("helloString", args); 12 } 13 14 public String recv_helloString() throws org.apache.thrift.TException 15 { 16 helloString_result result = new helloString_result(); 17 receiveBase(result, "helloString"); 18 if (result.isSetSuccess()) { 19 return result.success; 20 } 21 throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "helloString failed: unknown result"); 22 }
String helloString(String para)方法包含send,recv两同步操作,另外thrift为方法参数和返回值都生成一个java类,本例中的helloString_args,helloString_result.
public static class helloString_args implements org.apache.thrift.TBase<helloString_args, helloString_args._Fields>, java.io.Serializable, Cloneable, Comparable<helloString_args> public static class helloString_result implements org.apache.thrift.TBase<helloString_result, helloString_result._Fields>, java.io.Serializable, Cloneable, Comparable<helloString_result>
两者为rpc客户端和服务器端传输的数据,都实现TBase接口,
public interface TBase<T extends TBase<T,F>, F extends TFieldIdEnum> extends Comparable<T>, Serializable { /** * Reads the TObject from the given input protocol. * * @param iprot Input protocol */ public void read(TProtocol iprot) throws TException; /** * Writes the objects out to the protocol * * @param oprot Output protocol */ public void write(TProtocol oprot) throws TException; /** * Get the F instance that corresponds to fieldId. */ public F fieldForId(int fieldId); /** * Check if a field is currently set or unset. * * @param field */ public boolean isSet(F field); /** * Get a field‘s value by field variable. Primitive types will be wrapped in * the appropriate "boxed" types. * * @param field */ public Object getFieldValue(F field); /** * Set a field‘s value by field variable. Primitive types must be "boxed" in * the appropriate object wrapper type. * * @param field */ public void setFieldValue(F field, Object value); public TBase<T, F> deepCopy(); /** * Return to the state of having just been initialized, as though you had just * called the default constructor. */ public void clear(); }
TBase定义read,write和具体字段是否设值的判定方法,以helloString_args具体解析:
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("helloString_args"); private static final org.apache.thrift.protocol.TField PARA_FIELD_DESC = new org.apache.thrift.protocol.TField("para", org.apache.thrift.protocol.TType.STRING, (short)1);
struct_desc(TStruct)为thrift内部对象结构,作为远程传输读写的参数metadata元数据和标志位(readStructBegin(), writeStructEnd())
/** * Helper class that encapsulates struct metadata. * */ public final class TStruct { public TStruct() { this(""); } public TStruct(String n) { name = n; } public final String name; //(本例中为hellosString_args) }
para_field_desc(TField)为传输结构对象中的变量结构,read,write时会对应相应的标志位(readTFiledBegin(), writeTFieldEnd()),具体为方法参数和返回值,本例中为helloString方法参数(如果多个方法参数,helloString_args中将对应多个TField元数据):
/** * Helper class that encapsulates field metadata. * */ public class TField { public TField() { this("", TType.STOP, (short)0); } public TField(String n, byte t, short i) { name = n; //参数名 type = t;//thrift内部类型 id = i;// seq number,即为hello.thrift方法参数定义的num. } public final String name; public final byte type; public final short id; public String toString() { return "<TField name:‘" + name + "‘ type:" + type + " field-id:" + id + ">"; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + id; result = prime * result + ((name == null) ? 0 : name.hashCode()); result = prime * result + type; return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; TField otherField = (TField) obj; return type == otherField.type && id == otherField.id; } }
thrift内部类型:
/** * Type constants in the Thrift protocol. */ public final class TType { public static final byte STOP = 0; //参数值没有设定,指定stop,read wire stream时会直接skip。 public static final byte VOID = 1; public static final byte BOOL = 2; public static final byte BYTE = 3; public static final byte DOUBLE = 4; public static final byte I16 = 6; public static final byte I32 = 8; public static final byte I64 = 10; public static final byte STRING = 11; public static final byte STRUCT = 12; public static final byte MAP = 13; public static final byte SET = 14; public static final byte LIST = 15; public static final byte ENUM = 16; }
static { schemes.put(StandardScheme.class, new helloString_argsStandardSchemeFactory()); schemes.put(TupleScheme.class, new helloString_argsTupleSchemeFactory()); }
注册具体stream read,write的类和对应的工厂类,两种读写方法:
public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { schemes.get(iprot.getScheme()).getScheme().read(iprot, this); } public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { schemes.get(oprot.getScheme()).getScheme().write(oprot, this); }
private static class helloString_argsStandardScheme extends StandardScheme<helloString_args> { public void read(org.apache.thrift.protocol.TProtocol iprot, helloString_args struct) throws org.apache.thrift.TException { org.apache.thrift.protocol.TField schemeField; iprot.readStructBegin(); while (true) { schemeField = iprot.readFieldBegin(); if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { break; } switch (schemeField.id) { case 1: // PARA if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { struct.para = iprot.readString(); struct.setParaIsSet(true); } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } iprot.readFieldEnd(); } iprot.readStructEnd(); // check for required fields of primitive type, which can‘t be checked in the validate method struct.validate(); } public void write(org.apache.thrift.protocol.TProtocol oprot, helloString_args struct) throws org.apache.thrift.TException { struct.validate(); oprot.writeStructBegin(STRUCT_DESC); if (struct.para != null) { oprot.writeFieldBegin(PARA_FIELD_DESC); oprot.writeString(struct.para); oprot.writeFieldEnd(); } oprot.writeFieldStop(); oprot.writeStructEnd(); } }
private static class helloString_argsTupleScheme extends TupleScheme<helloString_args> { @Override public void write(org.apache.thrift.protocol.TProtocol prot, helloString_args struct) throws org.apache.thrift.TException { TTupleProtocol oprot = (TTupleProtocol) prot; BitSet optionals = new BitSet(); if (struct.isSetPara()) { optionals.set(0); } oprot.writeBitSet(optionals, 1); if (struct.isSetPara()) { oprot.writeString(struct.para); } } @Override public void read(org.apache.thrift.protocol.TProtocol prot, helloString_args struct) throws org.apache.thrift.TException { TTupleProtocol iprot = (TTupleProtocol) prot; BitSet incoming = iprot.readBitSet(1); if (incoming.get(0)) { struct.para = iprot.readString(); struct.setParaIsSet(true); } } } }
standard schema读写更倾向于结构化,(structBeging->fieldBegin()->value()->fieldEnd()->structEnd()),而tuple schema通过一个bitset(bit位操作)来代替(structBegin,fieldBegin,structEnd,filedEnd哪些占用流空间,因为上面提及struct,field里面包含struct名,field名,类型,seq num等占用流空间信息),减少序列化和网络传输的流大小。
public String para; // required
para,helloString_args结构内部成员(方法参数具体值)。 _Field,struct内部所有成员的enum表示,用于上面standard schema读取(见上述代码)
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { PARA((short)1, "para"); private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); static { for (_Fields field : EnumSet.allOf(_Fields.class)) { byName.put(field.getFieldName(), field); } } /** * Find the _Fields constant that matches fieldId, or null if its not found. */ public static _Fields findByThriftId(int fieldId) { switch(fieldId) { case 1: // PARA return PARA; default: return null; } } /** * Find the _Fields constant that matches fieldId, throwing an exception * if it is not found. */ public static _Fields findByThriftIdOrThrow(int fieldId) { _Fields fields = findByThriftId(fieldId); if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn‘t exist!"); return fields; } /** * Find the _Fields constant that matches name, or null if its not found. */ public static _Fields findByName(String name) { return byName.get(name); } private final short _thriftId; private final String _fieldName; _Fields(short thriftId, String fieldName) { _thriftId = thriftId; _fieldName = fieldName; } public short getThriftFieldId() { return _thriftId; } public String getFieldName() { return _fieldName; } }
struct中field元数据metadata,FieldMetaData包含(_Field enum, fieldvaluemetadata(参数名,requirement type, TType)):
// isset id assignments public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); tmpMap.put(_Fields.PARA, new org.apache.thrift.meta_data.FieldMetaData("para", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(helloString_args.class, metaDataMap); }
Requirement Type enum:
/** * Requirement type constants. * */ public final class TFieldRequirementType { public static final byte REQUIRED = 1; public static final byte OPTIONAL = 2; public static final byte DEFAULT = 3; }
一些设值,判断是否设值操作:
public String getPara() { return this.para; } public helloString_args setPara(String para) { this.para = para; return this; } public void unsetPara() { this.para = null; } /** Returns true if field para is set (has been assigned a value) and false otherwise */ public boolean isSetPara() { return this.para != null; }
<**************************************************************************************************************************************************>
服务器端同步处理器类:
public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor
注册实际的处理函数类:
private static <I extends Iface> Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) { processMap.put("helloString", new helloString()); return processMap; }
处理函数类:
public static class helloString<I extends Iface> extends org.apache.thrift.ProcessFunction<I, helloString_args> { public helloString() { super("helloString"); } public helloString_args getEmptyArgsInstance() { return new helloString_args(); } protected boolean isOneway() {//是否是单向RPC,只调用不要求返回 return false; } public helloString_result getResult(I iface, helloString_args args) throws org.apache.thrift.TException { helloString_result result = new helloString_result(); result.success = iface.helloString(args.para);//调用用户编写的server实现类方法 return result; //方法值类(helloString_result类) } } }
<**********************************************************************************************************************************************************>
Async异步client和processor有所不同,来看看AsyncClient内部:
public AsyncClient(org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.transport.TNonblockingTransport transport) { super(protocolFactory, clientManager, transport); } public void helloString(String para, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException { checkReady(); helloString_call method_call = new helloString_call(para, resultHandler, this, ___protocolFactory, ___transport); this.___currentMethod = method_call; ___manager.call(method_call); }
1:TAsyncClientManager:异步客户端管理器类,AsyncClient统一把AsyncMethodCall传递给manager,内部用ConcurrentLinkedQueue,TimeOutSet,和另外开一个Thread来管理存储,消息的传递,接收,和超时处理,
并异常,完成时调用MethodCallback回调。
2:底层用Noblocking channel进行传递。
异步远程RPC调用:
public void helloString(String para, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException { checkReady(); helloString_call method_call = new helloString_call(para, resultHandler, this, ___protocolFactory, ___transport); this.___currentMethod = method_call; ___manager.call(method_call); }
helloString_call类封装了消息表现形式:
public static class helloString_call extends org.apache.thrift.async.TAsyncMethodCall { private String para; public helloString_call(String para, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { super(client, protocolFactory, transport, resultHandler, false); this.para = para; } public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException { prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("helloString", org.apache.thrift.protocol.TMessageType.CALL, 0)); helloString_args args = new helloString_args(); args.setPara(para); args.write(prot); prot.writeMessageEnd(); } public String getResult() throws org.apache.thrift.TException { if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) { throw new IllegalStateException("Method call not finished!"); } org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array()); org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport); return (new Client(prot)).recv_helloString(); } } }
异步RPC调用状态:
public static enum State { CONNECTING, // connecting.连接 WRITING_REQUEST_SIZE, // writing_request_size.写请求长度 WRITING_REQUEST_BODY, // writing_request_body.写请求体 READING_RESPONSE_SIZE, // reading_repsonse_size.读返回长度 READING_RESPONSE_BODY, //reading_response_body.读返回体 RESPONSE_READ, //repsonse_read.返回读取完毕 ERROR;// error.有异常 }
消息类型:
public final class TMessageType { public static final byte CALL = 1; // 调用 public static final byte REPLY = 2; //返回应答 public static final byte EXCEPTION = 3;//异常 public static final byte ONEWAY = 4;// 单向 }
Async服务器端处理,TBaseAsyncProcessor.process():
//Find processing function final TMessage msg = in.readMessageBegin(); AsyncProcessFunction fn = processMap.get(msg.name); //Get Args TBase args = (TBase)fn.getEmptyArgsInstance(); args.read(in); in.readMessageEnd(); //start off processing function fn.start(iface, args,fn.getResultHandler(fb,msg.seqid)); return true;
public static class helloString<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, helloString_args, String> public helloString_args getEmptyArgsInstance() { return new helloString_args(); } public void start(I iface, helloString_args args, org.apache.thrift.async.AsyncMethodCallback<String> resultHandler) throws TException { iface.helloString(args.para,resultHandler); }
public AsyncMethodCallback<String> getResultHandler(final AsyncFrameBuffer fb, final int seqid) { final org.apache.thrift.AsyncProcessFunction fcall = this; return new AsyncMethodCallback<String>() { public void onComplete(String o) { helloString_result result = new helloString_result(); result.success = o; try { fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid); return; } catch (Exception e) { LOGGER.error("Exception writing to internal frame buffer", e); } fb.close(); } public void onError(Exception e) { byte msgType = org.apache.thrift.protocol.TMessageType.REPLY; org.apache.thrift.TBase msg; helloString_result result = new helloString_result(); { msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION; msg = (org.apache.thrift.TBase)new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage()); } try { fcall.sendResponse(fb,msg,msgType,seqid); return; } catch (Exception ex) { LOGGER.error("Exception writing to internal frame buffer", ex); } fb.close(); } }; } protected boolean isOneway() { return false; }
TProcessor, TTransport, async, schema, server分析待续。
原文:http://www.cnblogs.com/onlysun/p/4590906.html