首页 > 其他 > 详细

Flink写入数据到Elasticsearch示例

时间:2020-09-12 16:23:15      阅读:296      评论:0      收藏:0      [点我收藏+]

版本说明:flink-v1.11 elasticsearch-7.9

1.添加maven依赖

        <!-- elasticsearch connector -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch7_2.11</artifactId>
            <version>1.11.0</version>
        </dependency>

        <!-- jackson -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.11.1</version>
        </dependency>

2.设置Builder

        List<HttpHost> elsearchHosts = new ArrayList<>(); 
        elsearchHosts.add(new HttpHost("192.168.32.36", 9200, "http"));
        elsearchHosts.add(new HttpHost("192.168.32.37", 9200, "http"));
        elsearchHosts.add(new HttpHost("192.168.32.38", 9200, "http"));

        ObjectMapper mapper = new ObjectMapper(); // jaskson ObjectMapper

        ElasticsearchSink.Builder<ResultCollector> esSinkBuilder = new ElasticsearchSink.Builder<>( // ResultCollector 是你要保存的对象类型,替换即可
                elsearchHosts,
                new ElasticsearchSinkFunction<ResultCollector>() {

                    private static final long serialVersionUID = -6797861015704600807L;

                    public IndexRequest createIndexRequest(ResultCollector collector) throws Exception {
                        return Requests.indexRequest()
                                .index("flink-test-index") // 设置Index
                                .id(collector.getId()) // 设置ID
                    // 这里要特别注意需要传map .source(mapper.readValue(mapper.writeValueAsString(collector), Map.
class)); } @SneakyThrows @Override public void process(ResultCollector collector, RuntimeContext runtimeContext, RequestIndexer requestIndexer) { requestIndexer.add(createIndexRequest(collector)); } } ); esSinkBuilder.setBulkFlushMaxActions(1); esSinkBuilder.setFailureHandler(new RetryRejectedExecutionFailureHandler()); esSinkBuilder.setRestClientFactory((RestClientFactory) restClientBuilder -> { Header[] headers = new BasicHeader[]{new BasicHeader("Content-Type", "application/json")}; restClientBuilder.setDefaultHeaders(headers); });

3.addSink

        //stream.addSink(esSinkBuilder.build());

 参考: https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/elasticsearch.html

Flink写入数据到Elasticsearch示例

原文:https://www.cnblogs.com/dotqin/p/13656983.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!