首页 > 其他 > 详细

Flink实例(十二): connectors(十一)elasticsearch 写 入

时间:2020-10-02 09:47:41      阅读:96      评论:0      收藏:0      [点我收藏+]

1 工程目录

技术分享图片

 pom.xml

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
            <version>1.10.0</version>
        </dependency>

2 flink 写入 hbase

package com.atguigu.flink.app

import java.util

import com.atguigu.flink.bean.SensorReading
import com.atguigu.flink.source.HbaseSource
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests

object ESSinkApp {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //调用addSource以此来作为数据输入端
    val stream: scala.DataStream[SensorReading] = env.addSource(new HbaseSource)

    val httpHosts = new util.ArrayList[HttpHost]()
    httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"))
    val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading](
      httpHosts,
      new ElasticsearchSinkFunction[SensorReading]{
        override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
          // 构造数据格式
          val hashMap = new util.HashMap[String, String]()
          hashMap.put("data", t.toString)

          // 创建请求
          val indexRequest = Requests
            .indexRequest()
            .index("sensor") // 索引是sensor,相当于数据库
            .`type`("readingData") // es6必须写这一行代码
            .source(hashMap)// 数据源

          // 提交数据
          requestIndexer.add(indexRequest)
        }

      }
    )

    // 设置每一批写入es多少数据
    esSinkBuilder.setBulkFlushMaxActions(1)

    stream.addSink(esSinkBuilder.build())

    // 打印流
    stream.print()

    // 执行主程序
    env.execute()

  }

}

 

Flink实例(十二): connectors(十一)elasticsearch 写 入

原文:https://www.cnblogs.com/qiu-hua/p/13680673.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!