package com.bd.useranalysis.spark.streaming.kafka2es; import com.alibaba.fastjson.JSON; import com.bd.useranalysis.common.config.ConfigUtil; import com.bd.useranalysis.common.project.datatype.DataTypeProperties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka010.*; import java.util.*; public class Kafka2EsJava { Properties properties = ConfigUtil.getInstance().getProperties("kafka/kafka-server-config.properties"); static Set<String> dataTypes = DataTypeProperties.dataTypeMap.keySet(); public static void main(String[] args) throws InterruptedException { SparkConf sparkConf = new SparkConf().setAppName("sparkstreaming_kafka2es").setMaster("local[2]"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); jsc.setLogLevel("WARN"); JavaStreamingContext jss = new JavaStreamingContext(jsc, Durations.seconds(2L)); Map<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put("bootstrap.servers","quyf:9092"); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializer.class); kafkaParams.put("group.id", "test_20190815"); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", true); List<String> topicList = Arrays.asList("test","test2"); JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(jss, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(topicList, kafkaParams) ); JavaDStream<HashMap<String, String>> recordDS = stream.map(new Function<ConsumerRecord<String, String>, HashMap<String, String>>() { @Override public HashMap<String, String> call(ConsumerRecord<String, String> record) throws Exception { //System.out.println("consumer==>"+record.value()); return JSON.parseObject(record.value(), HashMap.class); } }); for (String type : dataTypes) { recordDS.filter(new Function<HashMap<String, String>, Boolean>() { @Override public Boolean call(HashMap<String, String> resultMap) throws Exception { return resultMap.get("table").equals(type); } }).foreachRDD(new VoidFunction<JavaRDD<HashMap<String, String>>>() { @Override public void call(JavaRDD<HashMap<String, String>> mapJavaRDD) throws Exception { mapJavaRDD.foreach(new VoidFunction<HashMap<String, String>>() { @Override public void call(HashMap<String, String> stringStringHashMap) throws Exception { System.out.println(stringStringHashMap.toString()); } }); } }); } jss.start(); jss.awaitTermination(); } }
public class GenKafkaData { public static void main(String[] args) throws Exception { List<String> lines = IOUtils.readLines(new FileReader( new File("E:\\wechat\\wechat_source1_1111153.txt"))); Producer<String, String> producer = getProducer(); ArrayList<String> columns = DataTypeProperties.dataTypeMap.get("wechat"); Map<String, String> dataMap = new HashMap<>(); dataMap.put("table","wechat"); for(String line : lines){ String[] fields = line.split("\t"); for (int i = 0; i < fields.length; i++) { dataMap.put(columns.get(i), fields[i]); } int index = 0; while(true){ String lineRecord = JSON.toJSONString(dataMap); producer.send(new ProducerRecord<>("test2",null, lineRecord)); Thread.sleep(1000); index++; System.out.println("send->"+lineRecord); if(index==10){ break; } } //System.out.println("send->"+lineRecord); //StringProducer.producer("test", lineRecord); } } public static Producer<String, String> getProducer(){ Producer<String, String> producer = new KafkaProducer<String, String>(createProducerProperties()); return producer; } private static Properties createProducerProperties() { Properties props = new Properties(); // props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("key.serializer", StringSerializer.class); props.put("value.serializer", StringSerializer.class); props.put("bootstrap.servers", "quyf:9092"); props.put("linger.ms",1); props.put("acks", "all"); // 消息发送最大尝试次数 props.put("retries", 0); // 一批消息处理大小 props.put("batch.size", 16384); // 增加服务端请求延时 props.put("linger.ms", 1); // 发送缓存区内存大小 props.put("buffer.memory", 33554432); return props; } }
原文:https://www.cnblogs.com/quyf/p/11361080.html