0安装JDK
wget --no-check-certificate --no-cookie --header "Cookie: oraclelicense=accept-securebackup-cookie;" http://download.oracle.com/otn-pub/java/jdk/8u45-b14/jdk-8u45-linux-x64.rpm
使用rpm -ivh jdk-8u45-linux-x64.rpm进行安装
检查安装Javac
1:centOS安装ZeroMQ所需组件及工具:
yum install gcc
yum install gcc-c++
yum install make
yum install uuid-devel
yum install libuuid-devel
yum install libtool
wget http://mirror.bjtu.edu.cn/apache/zookeeper/stable/zookeeper-3.4.6.tar.gz
tar -zxvf zookeeper-3.4.6.tar.gz
cp -R zookeeper-3.4.6 /usr/local/
ln -s /usr/local/zookeeper-3.4.6/ /usr/local/zookeeper
vim /etc/profile
export ZOOKEEPER_HOME="/path/to/zookeeper" #路径指定,存放日志等文件
cp /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg
mkdir /tmp/zookeepermkdir /var/log/zookeeper
安装zeromq以及jzmq:
wget http://download.zeromq.org/zeromq-2.2.0.tar.gz
tar zxf zeromq-2.2.0.tar.gz
cd zeromq-2.2.0
./configure
make
make install
sudo ldconfig (更新LD_LIBRARY_PATH)zeromq安装完成。
安装jzmq: (提前安装好java)
yum install git
git clone git://github.com/nathanmarz/jzmq.git
cd jzmq
./autogen.sh
./configure
make
make install然后,jzmq就装好了.注意:在./autogen.sh这步如果报错:autogen.sh:error:could not find libtool is required to run autogen.sh,这是因为缺少了libtool,可以用#yum install libtool*来解决。
wget http://cloud.github.com/downloads/nathanmarz/storm/storm-0.8.1.zip
unzip storm-0.8.1.zip
mv storm-0.8.1 /usr/local/
ln -s /usr/local/storm-0.8.1/ /usr/local/storm
vim /etc/profile
export STORM_HOME=/usr/local/storm-0.8.1
export PATH=$PATH:$STORM_HOME/bin
到此为止单机版的Storm就安装完毕了。
再查看进程jps查看UI:在浏览器中输入http://localhost:8080
6:storm进程远程kill
如要监控Storm集群和运行在其上的Topology,该如何做呢?
Storm已经为你考虑到了,Storm支持Thrift的C/S架构,在部署Nimbus组件的机器上启动一个Thrift Server进程来提供服务,我们可以通过编写一个Thrift Client来请求Thrift Server,来获取你想得到的集群和Topology的相关数据,来接入监控平台,如Zabbix等,我目前使用的就是Zabbix。
整体的流程已经清楚了,下面就来实践吧。
1 安装Thrift
由于我们要使用Thrift来编译Storm的源代码来获得Thrift Client相关的Java源代码,所以需要先安装Thrift,这里选取的版本为0.9.2。
到官网下载好安装包:http://thrift.apache.org/
编译安装:configure && make && make install
验证:thrift --version
如果打印出Thrift version 0.9.2,代表安装成功。
2 编译Thrift Client代码
首先下载Storm源代码,这里使用最新的0.9.3版本:http://mirrors.hust.edu.cn/apache/storm/apache-storm-0.9.3/apache-storm-0.9.3-src.tar.gz
解压后进行编译:thrift -gen java apache-storm-0.9.3/storm-core/src/storm.thrift
在当前目录下出现gen-java文件夹,此文件夹下就是Thrift Client的Java源代码了。
3 使用Thrift Client API
然后创建一个Maven项目来进行执行监控数据的获取。
项目生成一个Jar文件,输入一些命令和自定义参数,然后输出结果。
以命令行的形式进行调用,这样可以方便的接入监控系统,当然使用形式可以根据自身情况施行。
创建好后,把gen-java生成的代码拷贝进来。
在pom.xml里引入Thrift对应版本的库:
1
2
3
4
5
|
<dependency> <groupId>org.apache.thrift</groupId> <artifactId>libthrift</artifactId> <version> 0.9 . 2 </version> </dependency> |
首先写一些Thrift相关的辅助类。
ClientInfo.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
package com.damacheng009.storm.monitor.thrift; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; import backtype.storm.generated.Nimbus; /** * 代表一个Thrift Client的信息 * @author jb-xingchencheng * */ public class ClientInfo { private TSocket tsocket; private TFramedTransport tTransport; private TBinaryProtocol tBinaryProtocol; private Nimbus.Client client; public TSocket getTsocket() { return tsocket; } public void setTsocket(TSocket tsocket) { this .tsocket = tsocket; } public TFramedTransport gettTransport() { return tTransport; } public void settTransport(TFramedTransport tTransport) { this .tTransport = tTransport; } public TBinaryProtocol gettBinaryProtocol() { return tBinaryProtocol; } public void settBinaryProtocol(TBinaryProtocol tBinaryProtocol) { this .tBinaryProtocol = tBinaryProtocol; } public Nimbus.Client getClient() { return client; } public void setClient(Nimbus.Client client) { this .client = client; } } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
package com.damacheng009.storm.monitor.thrift; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransportException; import backtype.storm.generated.Nimbus; /** * Thrift Client管理类 * @author jb-xingchencheng * */ public class ClientManager { public static ClientInfo getClient(String nimbusHost, int nimbusPort) throws TTransportException { ClientInfo client = new ClientInfo(); TSocket tsocket = new TSocket(nimbusHost, nimbusPort); TFramedTransport tTransport = new TFramedTransport(tsocket); TBinaryProtocol tBinaryProtocol = new TBinaryProtocol(tTransport); Nimbus.Client c = new Nimbus.Client(tBinaryProtocol); tTransport.open(); client.setTsocket(tsocket); client.settTransport(tTransport); client.settBinaryProtocol(tBinaryProtocol); client.setClient(c); return client; } public static void closeClient(ClientInfo client) { if ( null == client) { return ; } if ( null != client.gettTransport()) { client.gettTransport().close(); } if ( null != client.getTsocket()) { client.getTsocket().close(); } } } |
下面是入口类:
Main.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
package com.damacheng009.storm.monitor; import com.damacheng009.storm.monitor.logic.Logic; /** * 入口类 * @author jb-xingchencheng * */ public class Main { // NIMBUS的信息 public static String NIMBUS_HOST = "192.168.180.36" ; public static int NIMBUS_PORT = 6627 ; /** * 命令格式 CMD(命令) [ARG0] [ARG1] ...(更多参数) * @param args */ public static void main(String[] args) { if (args.length < 3 ) { return ; } NIMBUS_HOST = args[ 0 ]; NIMBUS_PORT = Integer.parseInt(args[ 1 ]); String cmd = args[ 2 ]; String result = "-1" ; if (cmd.equals( "get_topo_exp_size" )) { String topoName = args[ 3 ]; result = Logic.getTopoExpSize(topoName); } else if (cmd.equals( "get_topo_exp_stack_trace" )) { String topoName = args[ 3 ]; result = Logic.getTopoExpStackTrace(topoName); } System.out.println(result); } } |
测试的时候把具体的HOST和PORT改一下即可。
然后是具体的逻辑类。Logic.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
package com.damacheng009.storm.monitor.logic; import java.util.Date; import java.util.List; import java.util.Set; import com.damacheng009.storm.monitor.Main; import com.damacheng009.storm.monitor.thrift.ClientInfo; import com.damacheng009.storm.monitor.thrift.ClientManager; import backtype.storm.generated.ClusterSummary; import backtype.storm.generated.ErrorInfo; import backtype.storm.generated.TopologyInfo; import backtype.storm.generated.TopologySummary; public class Logic { /** * 取得某个拓扑的异常个数 * @param topoName * @return */ public static String getTopoExpSize(String topoName) { ClientInfo client = null ; int errorTotal = 0 ; try { client = ClientManager.getClient(Main.NIMBUS_HOST, Main.NIMBUS_PORT); ClusterSummary clusterSummary = client.getClient().getClusterInfo(); List<TopologySummary> topoSummaryList = clusterSummary.getTopologies(); for (TopologySummary ts : topoSummaryList) { if (ts.getName().equals(topoName)) { TopologyInfo topologyInfo = client.getClient().getTopologyInfo(ts.getId()); Set<String> errorKeySet = topologyInfo.getErrors().keySet(); for (String errorKey : errorKeySet) { List<ErrorInfo> listErrorInfo = topologyInfo.getErrors().get(errorKey); errorTotal += listErrorInfo.size(); } break ; } } return String.valueOf(errorTotal); } catch (Exception e) { return "-1" ; } finally { ClientManager.closeClient(client); } } /** * 返回某个拓扑的异常堆栈 * @param topoName * @return */ public static String getTopoExpStackTrace(String topoName) { ClientInfo client = null ; StringBuilder error = new StringBuilder(); try { client = ClientManager.getClient(Main.NIMBUS_HOST, Main.NIMBUS_PORT); ClusterSummary clusterSummary = client.getClient().getClusterInfo(); List<TopologySummary> topoSummaryList = clusterSummary.getTopologies(); for (TopologySummary ts : topoSummaryList) { if (ts.getName().equals(topoName)) { TopologyInfo topologyInfo = client.getClient().getTopologyInfo(ts.getId()); // 得到错误信息 Set<String> errorKeySet = topologyInfo.getErrors().keySet(); for (String errorKey : errorKeySet) { List<ErrorInfo> listErrorInfo = topologyInfo.getErrors().get(errorKey); for (ErrorInfo ei : listErrorInfo) { // 发生异常的时间 long expTime = ( long ) ei.getError_time_secs() * 1000 ; // 现在的时间 long now = System.currentTimeMillis(); // 由于获取的是全量的错误堆栈,我们可以设置一个范围来获取指定范围的错误,看情况而定 // 如果超过5min,那么就不用记录了,因为5min检查一次 if (now - expTime > 1000 * 60 * 5 ) { continue ; } error.append( new Date(expTime) + "\n" ); error.append(ei.getError() + "\n" ); } } break ; } } return error.toString().isEmpty() ? "none" : error.toString(); } catch (Exception e) { return "-1" ; } finally { ClientManager.closeClient(client); } } } |
最后打成一个Jar包,就可以跑起来接入监控系统了,如在Zabbix中,可以把各个监控项设置为自定义的item,在Zabbix Client中配置命令行来运行Jar取得数据。
接下来的测试过程先略过。
对于Storm监控的实践,目前就是这样了。
原文:http://www.cnblogs.com/leo3689/p/5158138.html