首页 > 其他 > 详细

kafak之间的转发

时间:2020-10-14 22:49:13      阅读:37      评论:0      收藏:0      [点我收藏+]

近期要实现一个业务,将一个环境kafka中的数据转发到另一个环境kafka中~ 

话不多说,干活!!emmm····

--------------------------------------------------------华丽丽的代码风格线----------------------------------------------------------------

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;


import java.util.Arrays;
import java.util.Properties;

/**
* @author wanghd
* @version 1.0
* @date 2020/10/14 9:55
*/

public class Consumer implements Runnable {
private final KafkaConsumer<String, String> consumer;
private KafkaProducer<String, String> producer;
private ConsumerRecords<String, String> msgList;
private final String topic;
private String produceTopic;
private static final String GROUPID = "groupA";


public Consumer(String topicName) {
Properties props = new Properties();
props.put("bootstrap.servers", "172.16.12.14:9092");
props.put("group.id", GROUPID);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<String, String>(props);
this.topic = topicName;
this.consumer.subscribe(Arrays.asList(topic));
}

@Override
public void run() {
System.out.println("---------开始消费---------");
try {
msgList = consumer.poll(1000);
if (null != msgList && msgList.count() > 0) {
for (ConsumerRecord<String, String> record : msgList) {
String value = record.value();
//将json转换成Map
if (value != null || !value.isEmpty()) {
JSONObject jsonObject = JSON.parseObject(value);
String vin = jsonObject.get("vin").toString();
System.out.println("vin" + vin);
//vin作为key推送数据
sendToSgl("sglkafkaTopic", vin, record.value());
}
}
} else {
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
consumer.close();
}
}

public void sendToSgl(String topic, String vin, String value) {
Properties props = new Properties();
props.put("bootstrap.servers", "172.16.12.14:9092");
props.put("group.id", GROUPID);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "earliest");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
this.producer = new KafkaProducer<String, String>(props);
this.produceTopic = topic;
startSend(vin, value);
}

public void startSend(String vin, String value) {
try {
producer.send(new ProducerRecord<String, String>(produceTopic, vin, value));
System.out.println("发送的信息:" + value);
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}

public static void main(String args[]) {
Consumer test1 = new Consumer("KAFKA_TEST");
Thread thread1 = new Thread(test1);
thread1.start();
}
}

--------------------------------------------------------华丽丽的代码风格线----------------------------------------------------------------

上述代码意思是:将要执行的类放入线程中~ 让它嗡嗡嗡的一直跑...技术分享图片技术分享图片技术分享图片技术分享图片技术分享图片就是这样跑!hhhh~ 首先它会执行run方法,在这里,让run方法先消费数据,在消费到之后进行发送...嘟嘟嘟的发送。!

 

想法就是这么个想法,我依旧很菜~.!

--------------------------------------------------------华丽丽的pom风格线----------------------------------------------------------------

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>sendToSGL</artifactId>
<version>1.0-SNAPSHOT</version>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.4.RELEASE</version>
</parent>

<dependencies>


<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>1.0.0</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.70</version>
</dependency>


</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<executable>true</executable>
</configuration>
</plugin>
</plugins>
</build>
</project>

--------------------------------------------------------华丽丽的pom风格线----------------------------------------------------------------

 

kafak之间的转发

原文:https://www.cnblogs.com/shuaidong/p/13816901.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!