cd $ELASTICSEARCH_HOME ./bin/plugin -url file:/$PLUGIN_PATH -install elasticsearch-river-kafka |
cd $ELASTICSEARCH_HOME ./bin/plugin -remove elasticsearch-river-kafka ./bin/plugin -url file:/$PLUGIN_PATH -install elasticsearch-river-kafka |
#node.river: _none_??? ##这一行要注释掉,表示为river节点 threadpool: ???? bulk:
???????? type: fixed
???????? size:? 60
???????? queue_size:? 1000
|
node.river: _none_??? ##这一行要解注,表示该节点不是river节点 threadpool: ???? bulk:
???????? type: fixed
???????? size:? 60
???????? queue_size:? 1000
|
@Override ???? public ?String name() {
???????? return ?"river-kafka" ;
???? }
???? @Override
???? public ?String description() {
???????? return ?"River Kafka Plugin" ;
???? }
|
plugin=com.test.elasticsearch.plugin.river.kafka.KafkaRiverPlugin |
public ?void ?onModule(RiversModule module) {
???????? module.registerRiver( "kafka" , KafkaRiverModule. class );
? }
|
public ?class ?KafkaRiverModule? extends ?AbstractModule {
???? @Override
???? protected ?void ?configure() {
???????? bind(River. class ).to(KafkaRiver. class ).asEagerSingleton();
???? }
} |
?
public ?class ?KafkaRiver? extends ?AbstractRiverComponent? implements ?River {
???? private ?BasicProperties properties;
???? private ?KafkaConsumer kafkaConsumer;
???? private ?ElasticsearchProducer elasticsearchProducer;
???? private ?static ??ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
???? private ?Thread riverMonitorThread;
???? private ?KafkaRiverSubMonitor kafkaRiverSubMonitor;
???? private ?Thread thread;
???? private ?ESLogger logger;
???? @Inject
???? protected ?KafkaRiver(RiverName riverName, RiverSettings settings, Client client) {
???????? super (riverName, settings);
???????? this .logger = Loggers.getLogger(getClass(), settings.globalSettings(), riverName);
???????? properties =? new ?BasicProperties(settings);
???????? elasticsearchProducer =? new ?ElasticsearchProducer(client, properties);
???????? kafkaConsumer =? new ?KafkaConsumer(riverName, properties, elasticsearchProducer);
???? }
???? @Override
?
???? public ?void ?start() {
????? //启动KafkaRiver的线程
???????? try ?{
???????????? logger.info( "MHA: Starting Kafka Worker..." );
???????????? thread = EsExecutors.daemonThreadFactory(settings.globalSettings(),? "kafka_river" ).newThread(kafkaConsumer);
???????????? thread.start();
???????? }? catch ?(Exception ex) {
???????????? logger.error( "Unexpected Error occurred" , ex);
???????????? throw ?new ?RuntimeException(ex);
???????? }
???? }
????? ......
} |
???? "type" :? "kafka" ,
???? "kafka" : {
???????? "topic" ?:? "test" ,
???????? "numOfConsumer" ?:? "2" ,
???????? "zk.connect" ?:? "10.10.10.10:2181" ,
???????? "zk.session.timeout.ms" ?:? "50000" ,
???????? "zk.sync.time.ms" ?:? "200" ,
???????? "zk.auto.commit.interval.ms" ?:? "1000" ,
???????? "zk.auto.commit.enable" ?:? "true" ,
???????? "zk.auto.offset.reset" ?:? "smallest" ,
???????? "zk.fetch.message.max.bytes" ?:? "5242880" ,
???????? "serializer" ?:? "com.test.elasticsearch.river.kafka.serializer.AASerializer"
???? },
????? "elasticsearch" ?: {
???????? "indexName" ?:? "stringfortest" ,
???????? "indexType" ?:? "message1" ,
???????? "batch_size" ?:? "500" ,
???????? "handling_batch_coresize" ?:? "2" ,
???????? "handling_batch_maximumPoolSize" ?:? "2" ,
???????? "handling_batch_keepAliveTime" ?:? "600" ,
???????? "handling_batch_queueSize" ?:? "10" ,
???????? "es_bulk_timeout" ?:? "5"
???? }
}‘ |
{ "_index" : "_river" ,
? "_type" : "baymaxriver1" ,
? "_id" : "_meta" ,
? "_version" : 1 ,
? "created" : true
} |
curl -XDELETE? ‘http://localhost:9200/_river/rivername‘
|
?
elasticsearch-river-kafka 插件的环境配置和使用
原文:http://shensuqiao.iteye.com/blog/2251676