首页 > 其他 > 详细

elasticsearch-river-kafka 插件的环境配置和使用

时间:2015-10-25 02:06:20      阅读:877      评论:0      收藏:0      [点我收藏+]

1.elasticsearch-river-kafka 插件的安装

elasticsearch-river-kafka?插件的安装与其他插件一样
cd $ELASTICSEARCH_HOME
./bin/plugin -url file:/$PLUGIN_PATH -install elasticsearch-river-kafka
?
插件更新
cd $ELASTICSEARCH_HOME
./bin/plugin -remove elasticsearch-river-kafka
./bin/plugin -url file:/$PLUGIN_PATH -install elasticsearch-river-kafka

?

2.river节点的配置

配置river节点的时候,river节点和非river节点都要配置。
river节点:在es的配置文件中添加下面几行
#node.river: _none_??? ##这一行要注释掉,表示为river节点
threadpool:
????bulk:
????????type: fixed
????????size:?60
????????queue_size:?1000
?
非river节点:在es的配置文件中添加下面几行
node.river: _none_??? ##这一行要解注,表示该节点不是river节点
threadpool:
????bulk:
????????type: fixed
????????size:?60
????????queue_size:?1000
注意:一般,不会将数据落在river节点上(即node.data: false),但测试环境上就无所谓了,机器资源又紧张。
? ? ? ? ? 节点配置完后,记得重启es,重启es的顺序:master节点→data节点→river节点

?

3.elasticsearch-river-kafka?插件的开发

社区中的elasticsearch-river-kafka?插件仅提供了对String和json数据的简单处理。在实际生产中,我们遇到的情况要复杂得多。
那么这个时候,我们就得自己去开发elasticsearch-river-kafka?插件实现一些附加功能。
下面就简单介绍一下开发elasticsearch-river-kafka?插件的步骤
1)KafkaRiverPlugin
该类需要继承KafkaRiverPlugin和实现AbstractPlugin,在该类中定义plugin的名称和描述
@Override
????public?String name() {
????????return?"river-kafka";
????}
????@Override
????public?String description() {
????????return?"River Kafka Plugin";
????}
?
2)es-plugin.properties配置文件
需要在es-plugin.properties中添加如下的定义,这样ES在启动的时候就能够通过org.elasticsearch.plugins.PluginManager
在当前的classpath中扫描到我们的plugin。
注意:定义中要写KafkaRiverPlugin类的全称,es-plugin.properties一般位于src/main/resources下
plugin=com.test.elasticsearch.plugin.river.kafka.KafkaRiverPlugin
?
3)KafkaRiverModule
KafkaRiverPlugin的onModule方法:在ES加载所有的插件时,会invoke一个onModule方法。KafkaRiverModule会作为参数传进来
public?void?onModule(RiversModule module) {
????????module.registerRiver("kafka", KafkaRiverModule.class);
?}
KafkaRiverModule必须继承AbstractModule?。在KafkaRiverModule中会生成一个KafkaRiver。KafkaRiver是River接口的实现。
public?class?KafkaRiverModule?extends?AbstractModule {
????@Override
????protected?void?configure() {
????????bind(River.class).to(KafkaRiver.class).asEagerSingleton();
????}
}

?

4)KafkaRiver
? ? –?KafkaRiver必须继承AbstractRiverComponent,并且实现River接口。
? ??–?KafkaRiver只提供两个方法:start和close。
? ??–?AbstractRiverComponent?用于initialize kafkariver的logger、river名、river的配置
? ??–?构造函数通过@Inject注入river所需要的一切东西:RiverName, RiverSettings、logger、自定义的配置信息
? ? ? (这里是BasicProperties,在BasicProperties中定义的配置参数可以在创建river的时候被指定,参见“4.kafka→river→es的数据存储”)
? ??–?在start方法中启动了kafkariver的线程。在这个线程中,将数据从kafka中读取数据,然后将这些数据写到es中。
? ??–?kafkaConsumer用来定义从kafka中读取数据时的用户操作。
? ??–?ElasticsearchProducer用来定义将数据写入ES时的用户操作。
public?class?KafkaRiver?extends?AbstractRiverComponent?implements?River {
????private?BasicProperties properties;
????private?KafkaConsumer kafkaConsumer;
????private?ElasticsearchProducer elasticsearchProducer;
????private?static??ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
????private?Thread riverMonitorThread;
????private?KafkaRiverSubMonitor kafkaRiverSubMonitor;
????private?Thread thread;
????private?ESLogger logger;
????@Inject
????protected?KafkaRiver(RiverName riverName, RiverSettings settings, Client client) {
????????super(riverName, settings);
????????this.logger = Loggers.getLogger(getClass(), settings.globalSettings(), riverName);
????????properties =?new?BasicProperties(settings);
????????elasticsearchProducer =?new?ElasticsearchProducer(client, properties);
????????kafkaConsumer =?new?KafkaConsumer(riverName, properties, elasticsearchProducer);
????}
????@Override
?
????public?void?start() {
?????//启动KafkaRiver的线程
????????try?{
????????????logger.info("MHA: Starting Kafka Worker...");
????????????thread = EsExecutors.daemonThreadFactory(settings.globalSettings(),?"kafka_river").newThread(kafkaConsumer);
????????????thread.start();
????????}?catch?(Exception ex) {
????????????logger.error("Unexpected Error occurred", ex);
????????????throw?new?RuntimeException(ex);
????????}
????}
?????......
}
?

4.kafka→river→es的数据存储

通过下面的指令,可以创建一条river,这样从kafka的baymaxtest的topic中的数据通过river就会落到es上。
注意:一个集群可以创建多个river,各river可以指定不同的topic、patition和序列化类
????"type":?"kafka",
????"kafka": {
????????"topic"?:?"test",
????????"numOfConsumer"?:?"2",
????????"zk.connect"?:?"10.10.10.10:2181",
????????"zk.session.timeout.ms"?:?"50000",
????????"zk.sync.time.ms"?:?"200",
????????"zk.auto.commit.interval.ms"?:?"1000",
????????"zk.auto.commit.enable"?:?"true",
????????"zk.auto.offset.reset"?:?"smallest",
????????"zk.fetch.message.max.bytes"?:?"5242880",
????????"serializer"?:?"com.test.elasticsearch.river.kafka.serializer.AASerializer"
????},
?????"elasticsearch"?: {
????????"indexName"?:?"stringfortest",
????????"indexType"?:?"message1",
????????"batch_size"?:?"500",
????????"handling_batch_coresize"?:?"2",
????????"handling_batch_maximumPoolSize"?:?"2",
????????"handling_batch_keepAliveTime"?:?"600",
????????"handling_batch_queueSize"?:?"10",
????????"es_bulk_timeout"?:?"5"
????}
}‘
上述指令中主要配置信息的说明:
kafka中 ?→
topic:kafka的topic名为test,
numOfConsumer:从kafka中读取数据的消费者个数
zk.connect:zookper的host名
serializer:对从kafka中来的数据的序列化类
elasticsearch中 ?→
indexName:在es中生成的index名,从该river中通过的数据会落到这个index中
indexType:index的type
es_bulk_timeout:es批量处理的timeout
上述指令会返回下面的结果
{"_index":"_river",
?"_type":"baymaxriver1",
?"_id":"_meta",
?"_version":1,
?"created":true
}
?
查看river的元数据http://ip:9200/_river/rivername/_meta
?
删除一条river

?

elasticsearch-river-kafka 插件的环境配置和使用

原文:http://shensuqiao.iteye.com/blog/2251676

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!