cd $ELASTICSEARCH_HOME./bin/plugin -url file:/$PLUGIN_PATH -install elasticsearch-river-kafka |
cd $ELASTICSEARCH_HOME./bin/plugin -remove elasticsearch-river-kafka./bin/plugin -url file:/$PLUGIN_PATH -install elasticsearch-river-kafka |
#node.river: _none_??? ##这一行要注释掉,表示为river节点threadpool:????bulk:
????????type: fixed
????????size:?60
????????queue_size:?1000
|
node.river: _none_??? ##这一行要解注,表示该节点不是river节点threadpool:????bulk:
????????type: fixed
????????size:?60
????????queue_size:?1000
|
@Override????public?String name() {
????????return?"river-kafka";
????}
????@Override
????public?String description() {
????????return?"River Kafka Plugin";
????}
|
plugin=com.test.elasticsearch.plugin.river.kafka.KafkaRiverPlugin |
public?void?onModule(RiversModule module) {
????????module.registerRiver("kafka", KafkaRiverModule.class);
?}
|
public?class?KafkaRiverModule?extends?AbstractModule {
????@Override
????protected?void?configure() {
????????bind(River.class).to(KafkaRiver.class).asEagerSingleton();
????}
} |
?
public?class?KafkaRiver?extends?AbstractRiverComponent?implements?River {
????private?BasicProperties properties;
????private?KafkaConsumer kafkaConsumer;
????private?ElasticsearchProducer elasticsearchProducer;
????private?static??ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
????private?Thread riverMonitorThread;
????private?KafkaRiverSubMonitor kafkaRiverSubMonitor;
????private?Thread thread;
????private?ESLogger logger;
????@Inject
????protected?KafkaRiver(RiverName riverName, RiverSettings settings, Client client) {
????????super(riverName, settings);
????????this.logger = Loggers.getLogger(getClass(), settings.globalSettings(), riverName);
????????properties =?new?BasicProperties(settings);
????????elasticsearchProducer =?new?ElasticsearchProducer(client, properties);
????????kafkaConsumer =?new?KafkaConsumer(riverName, properties, elasticsearchProducer);
????}
????@Override
?
????public?void?start() {
?????//启动KafkaRiver的线程
????????try?{
????????????logger.info("MHA: Starting Kafka Worker...");
????????????thread = EsExecutors.daemonThreadFactory(settings.globalSettings(),?"kafka_river").newThread(kafkaConsumer);
????????????thread.start();
????????}?catch?(Exception ex) {
????????????logger.error("Unexpected Error occurred", ex);
????????????throw?new?RuntimeException(ex);
????????}
????}
?????......
} |
????"type":?"kafka",
????"kafka": {
????????"topic"?:?"test",
????????"numOfConsumer"?:?"2",
????????"zk.connect"?:?"10.10.10.10:2181",
????????"zk.session.timeout.ms"?:?"50000",
????????"zk.sync.time.ms"?:?"200",
????????"zk.auto.commit.interval.ms"?:?"1000",
????????"zk.auto.commit.enable"?:?"true",
????????"zk.auto.offset.reset"?:?"smallest",
????????"zk.fetch.message.max.bytes"?:?"5242880",
????????"serializer"?:?"com.test.elasticsearch.river.kafka.serializer.AASerializer"
????},
?????"elasticsearch"?: {
????????"indexName"?:?"stringfortest",
????????"indexType"?:?"message1",
????????"batch_size"?:?"500",
????????"handling_batch_coresize"?:?"2",
????????"handling_batch_maximumPoolSize"?:?"2",
????????"handling_batch_keepAliveTime"?:?"600",
????????"handling_batch_queueSize"?:?"10",
????????"es_bulk_timeout"?:?"5"
????}
}‘ |
{"_index":"_river",
?"_type":"baymaxriver1",
?"_id":"_meta",
?"_version":1,
?"created":true
} |
curl -XDELETE?‘http://localhost:9200/_river/rivername‘
|
?
elasticsearch-river-kafka 插件的环境配置和使用
原文:http://shensuqiao.iteye.com/blog/2251676