package main.scala.com.web.zhangyong168.cn.spark.java;
import com.alibaba.fastjson.JSONObject;
import com.web.zhangyong168.cn.spark.util.PropertiesUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.sparkproject.guava.collect.Lists;
import java.util.*;
/**
@version 1.0.0
@Author zhangyong
@Description json数据入库kafka
@Date 2020/06/05 14:40
**/
public class WirteKafka {
/**
配置文件的路径
@param proUrl 配置文件路径
@param runModel 运行模式 test dev produce
@return properties
*/
public static Properties getProperties(String proUrl, String runModel) {
Properties props = PropertiesUtils.loadProps("kafka.properties");
Properties properties = new Properties();
properties.put("bootstrap.servers", props.get(runModel + ".bootstrap.servers"));
properties.put("zookeeper.connect", props.get(runModel + ".zookeeper.connect"));
properties.put("group.id", props.get(runModel + ".group.id"));
properties.put("key.serializer", props.get(runModel + ".key.serializer"));
properties.put("value.serializer", props.get(runModel + ".value.serializer"));
return properties;
}
/**
/**
public static void main(String[] args) {
String str1 = "{"tableName":"yunduo.tb_role_user","columnNames":[ "name", "age", "birthday" ]," +
""columnTypes":[0,0,0]," +
""columnValues":[["daniel","20","2020-06-02"]," +
"["huahua","25","2020-06-03"]]" +
"}";
AccessArray accessArray = JSONObject.parseObject(str1, AccessArray.class);
System.out.println(accessArray);
List<Map<String, Object>> list = getResultList(accessArray);
insertKafkaDatas(str1.toString());
// kafkaProducer("你是我的眼3");
}
}
原文:https://www.cnblogs.com/zy168/p/13049712.html