转载自http://shift-alt-ctrl.iteye.com/blog/1990030?utm_source=tuicool&utm_medium=referral
Thrift-client作为服务消费端,由于thrift使用socket通讯,因此它需要面对几个问题:
1) client端需要知道server端的IP + port,如果是分布式部署,还需要知道所有server的IP + port列表.
2) client为了提升性能,不可能只使用一个socket来处理并发请求,当然也不能每个请求都创建一个socket;我们需要使用连接池方案.
3) 对于java开发工程师而言,基于spring配置thrift服务,可以提供很多的便利.
4) 基于zookeeper配置管理,那么client端就不需要"硬编码"的配置server的ip + port,可以使用zookeeper来推送每个service的服务地址.
5) 因为thrift-client端不使用连接池的话,将不能有效的提高并发能力,本文重点描述看如何使用thrift-client连接池。
1. pom.xml
- <dependencies>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-context</artifactId>
- <version>3.0.7.RELEASE</version>
- </dependency>
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.4.5</version>
- <!--<exclusions>-->
- <!--<exclusion>-->
- <!--<groupId>log4j</groupId>-->
- <!--<artifactId>log4j</artifactId>-->
- <!--</exclusion>-->
- <!--</exclusions>-->
- </dependency>
- <!--
- <dependency>
- <groupId>com.101tec</groupId>
- <artifactId>zkclient</artifactId>
- <version>0.4</version>
- </dependency>
- -->
- <dependency>
- <groupId>org.apache.thrift</groupId>
- <artifactId>libthrift</artifactId>
- <version>0.9.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>2.3.0</version>
- </dependency>
- <dependency>
- <groupId>commons-pool</groupId>
- <artifactId>commons-pool</artifactId>
- <version>1.6</version>
- </dependency>
-
- </dependencies>
2. spring-thrift-client.xml
其中zookeeper作为可选项,开发者也可以通过制定serverAddress的方式指定server的地址.
- <!-- fixedAddress -->
- <!--
- <bean id="userService" class="com.demo.thrift.ThriftServiceClientProxyFactory">
- <property name="service" value="com.demo.service.UserService"></property>
- <property name="serverAddress" value="127.0.0.1:9090:2"></property>
- <property name="maxActive" value="5"></property>
- <property name="idleTime" value="10000"></property>
- </bean>
- -->
- <!-- zookeeper -->
- <bean id="thriftZookeeper" class="com.demo.thrift.zookeeper.ZookeeperFactory" destroy-method="close">
- <property name="connectString" value="127.0.0.1:2181"></property>
- <property name="namespace" value="demo/thrift-service"></property>
- </bean>
- <bean id="userService" class="com.demo.thrift.ThriftServiceClientProxyFactory" destroy-method="close">
- <property name="service" value="com.demo.service.UserService"></property>
- <property name="maxActive" value="5"></property>
- <property name="idleTime" value="1800000"></property>
- <property name="addressProvider">
- <bean class="com.demo.thrift.support.impl.DynamicAddressProvider">
- <property name="configPath" value="UserServiceImpl"></property>
- <property name="zookeeper" ref="thriftZookeeper"></property>
- </bean>
- </property>
- </bean>
3. ThriftServiceClientProxyFactory.java
因为我们要在client端使用连接池方案,那么就需要对client的方法调用过程,进行代理,这个类,就是维护了一个"Client"代理类,并在方法调用时,从"对象池"中取出一个"Client"对象,并在方法实际调用结束后归还给"对象池".
4. ThriftClientPoolFactory.java
"Client"对象池,对象池中是已经实例化的Client对象,Client对象负责与Thrift server通信.
- public class ThriftClientPoolFactory extends BasePoolableObjectFactory<TServiceClient>{
-
- private final ThriftServerAddressProvider addressProvider;
-
- private final TServiceClientFactory<TServiceClient> clientFactory;
-
- private PoolOperationCallBack callback;
-
- protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider,TServiceClientFactory<TServiceClient> clientFactory) throws Exception {
- this.addressProvider = addressProvider;
- this.clientFactory = clientFactory;
- }
-
- protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider,TServiceClientFactory<TServiceClient> clientFactory,PoolOperationCallBack callback) throws Exception {
- this.addressProvider = addressProvider;
- this.clientFactory = clientFactory;
- this.callback = callback;
- }
-
-
-
- @Override
- public TServiceClient makeObject() throws Exception {
- InetSocketAddress address = addressProvider.selector();
- TSocket tsocket = new TSocket(address.getHostName(),address.getPort());
- TProtocol protocol = new TBinaryProtocol(tsocket);
- TServiceClient client = this.clientFactory.getClient(protocol);
- tsocket.open();
- if(callback != null){
- try{
- callback.make(client);
- }catch(Exception e){
-
- }
- }
- return client;
- }
-
- public void destroyObject(TServiceClient client) throws Exception {
- if(callback != null){
- try{
- callback.destroy(client);
- }catch(Exception e){
-
- }
- }
- TTransport pin = client.getInputProtocol().getTransport();
- pin.close();
- }
-
- public boolean validateObject(TServiceClient client) {
- TTransport pin = client.getInputProtocol().getTransport();
- return pin.isOpen();
- }
-
- static interface PoolOperationCallBack {
-
- void destroy(TServiceClient client);
-
- void make(TServiceClient client);
- }
-
- }
5. DynamicAddressProvider.java
将zookeeper作为server地址的提供者,这样客户端就不需要再配置文件中指定一堆ip + port,而且当server服务有更新时,也不需要client端重新配置.
- public class DynamicAddressProvider implements ThriftServerAddressProvider, InitializingBean {
-
- private String configPath;
-
- private PathChildrenCache cachedPath;
-
- private CuratorFramework zookeeper;
-
-
-
- private Set<String> trace = new HashSet<String>();
-
- private final List<InetSocketAddress> container = new ArrayList<InetSocketAddress>();
-
- private Queue<InetSocketAddress> inner = new LinkedList<InetSocketAddress>();
-
- private Object lock = new Object();
-
- private static final Integer DEFAULT_PRIORITY = 1;
-
- public void setConfigPath(String configPath) {
- this.configPath = configPath;
- }
-
- public void setZookeeper(CuratorFramework zookeeper) {
- this.zookeeper = zookeeper;
- }
-
- @Override
- public void afterPropertiesSet() throws Exception {
-
- if(zookeeper.getState() == CuratorFrameworkState.LATENT){
- zookeeper.start();
- }
- buildPathChildrenCache(zookeeper, configPath, true);
- cachedPath.start(StartMode.POST_INITIALIZED_EVENT);
- }
-
- private void buildPathChildrenCache(CuratorFramework client, String path, Boolean cacheData) throws Exception {
- cachedPath = new PathChildrenCache(client, path, cacheData);
- cachedPath.getListenable().addListener(new PathChildrenCacheListener() {
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
- PathChildrenCacheEvent.Type eventType = event.getType();
- switch (eventType) {
- case CONNECTION_SUSPENDED:
- case CONNECTION_LOST:
- System.out.println("Connection error,waiting...");
- return;
- default:
-
- }
-
- cachedPath.rebuild();
- rebuild();
- }
-
- protected void rebuild() throws Exception {
- List<ChildData> children = cachedPath.getCurrentData();
- if (children == null || children.isEmpty()) {
-
-
-
- container.clear();
- System.out.println("thrift server-cluster error....");
- return;
- }
- List<InetSocketAddress> current = new ArrayList<InetSocketAddress>();
- for (ChildData data : children) {
- String address = new String(data.getData(), "utf-8");
- current.addAll(transfer(address));
- trace.add(address);
- }
- Collections.shuffle(current);
- synchronized (lock) {
- container.clear();
- container.addAll(current);
- inner.clear();
- inner.addAll(current);
-
- }
- }
- });
- }
-
-
-
- private List<InetSocketAddress> transfer(String address){
- String[] hostname = address.split(":");
- Integer priority = DEFAULT_PRIORITY;
- if (hostname.length == 3) {
- priority = Integer.valueOf(hostname[2]);
- }
- String ip = hostname[0];
- Integer port = Integer.valueOf(hostname[1]);
- List<InetSocketAddress> result = new ArrayList<InetSocketAddress>();
- for (int i = 0; i < priority; i++) {
- result.add(new InetSocketAddress(ip, port));
- }
- return result;
- }
-
-
- @Override
- public List<InetSocketAddress> getAll() {
- return Collections.unmodifiableList(container);
- }
-
- @Override
- public synchronized InetSocketAddress selector() {
- if (inner.isEmpty()) {
- if(!container.isEmpty()){
- inner.addAll(container);
- }else if(!trace.isEmpty()){
- synchronized (lock) {
- for(String hostname : trace){
- container.addAll(transfer(hostname));
- }
- Collections.shuffle(container);
- inner.addAll(container);
- }
- }
- }
- return inner.poll();
- }
-
-
- @Override
- public void close() {
- try {
- cachedPath.close();
- zookeeper.close();
- } catch (Exception e) {
-
- }
- }
- }
到此为止,我们的Thrift基本上就可以顺利运行起来了.更多代码,参见附件.
Thrift-server端开发与配置,参见[Thrift-server]
[转载] Thrift-client与spring集成
原文:http://www.cnblogs.com/scott19820130/p/4919121.html