首页 > 其他 > 详细

docker搭建kafka环境&&Golang生产和消费

时间:2019-11-30 00:06:45      阅读:249      评论:0      收藏:0      [点我收藏+]

docker 搭建kafka环境

version: '2'
services:
  zk1:
    image: confluentinc/cp-zookeeper:latest
    hostname: zk1
    container_name: zk1
    restart: always
    ports:
      - "12181:2181"
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: zk1:12888:13888;zk2:22888:23888;zk3:32888:33888

  zk2:
    image: confluentinc/cp-zookeeper:latest
    hostname: zk2
    container_name: zk2
    restart: always
    ports:
      - "22181:2181"
    environment:
      ZOOKEEPER_SERVER_ID: 2
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: zk1:12888:13888;zk2:22888:23888;zk3:32888:33888

  zk3:
    image: confluentinc/cp-zookeeper:latest
    hostname: zk3
    container_name: zk3
    restart: always
    ports:
      - "32181:2181"
    environment:
      ZOOKEEPER_SERVER_ID: 3
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: zk1:12888:13888;zk2:22888:23888;zk3:32888:33888

  kafka1:
    image: confluentinc/cp-kafka:latest
    hostname: kafka1
    container_name: kafka1
    restart: always
    ports:
      - "9092:9092"
    depends_on:
      - zk1
      - zk2
      - zk3
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zk1:2181,zk2:2181,zk3:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092

  kafka2:
    image: confluentinc/cp-kafka:latest
    hostname: kafka2
    container_name: kafka2
    restart: always
    ports:
      - "9093:9093"
    depends_on:
      - zk1
      - zk2
      - zk3
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zk1:2181,zk2:2181,zk3:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9093

  kafka3:
    image: confluentinc/cp-kafka:latest
    hostname: kafka3
    container_name: kafka3
    restart: always
    ports:
      - "9094:9094"
    depends_on:
      - zk1
      - zk2
      - zk3
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zk1:2181,zk2:2181,zk3:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9094

创建topic

partitions为2个,replication有3个,topic的name为test_kafka
//创建topic
kafka-topics --zookeeper zk1:2181,zk2:2181,zk3:2181 --replication-factor 3 --partitions 2 --create --topic test_kafka .
//查看topic
kafka-topics --zookeeper zk1:2181,zk2:2181,zk3:2181 --describe --topic test_kafka

消费kafka数据

import (
    "context"
    "flag"
    "log"

    "github.com/segmentio/kafka-go"
)

const (
    kafkaConn1 = "127.0.0.1:9092"
    kafkaConn2 = "127.0.0.1:9093"
    kafkaConn3 = "127.0.0.1:9094"
)

var (
    topic = flag.String("t", "test_kafka", "kafka topic")
    group = flag.String("g", "test-group", "kafka consumer group")
)

func main() {
    flag.Parse()
    config := kafka.ReaderConfig{
        Brokers:  []string{kafkaConn1, kafkaConn2, kafkaConn3},
        Topic:    *topic,
        MinBytes: 1e3,
        MaxBytes: 1e6,
        GroupID:  *group,
    }
    reader := kafka.NewReader(config)
    ctx := context.Background()
    for {
        msg, err := reader.FetchMessage(ctx)
        if err != nil {
            log.Printf("fail to get msg:%v", err)
            continue
        }
        log.Printf("msg content:topic=%v,partition=%v,offset=%v,content=%v",
            msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
        err = reader.CommitMessages(ctx, msg)
        if err != nil {
            log.Printf("fail to commit msg:%v", err)
        }
    }
}

生产kafka数据

import (
    "bufio"
    "context"
    "fmt"
    "os"

    "github.com/segmentio/kafka-go"
)

const (
    kafkaConn1 = "127.0.0.1:9092"
    kafkaConn2 = "127.0.0.1:9093"
    kafkaConn3 = "127.0.0.1:9094"
    topic      = "test_kafka"
)

var brokerAddrs = []string{kafkaConn1, kafkaConn2, kafkaConn3}

func main() {
    // read command line input
    reader := bufio.NewReader(os.Stdin)
    writer := newKafkaWriter(brokerAddrs, topic)
    defer writer.Close()
    for {
        fmt.Print("Enter msg: ")
        msgStr, _ := reader.ReadString('\n')

        msg := kafka.Message{
            Value: []byte(msgStr),
        }
        err := writer.WriteMessages(context.Background(), msg)
        if err != nil {
            fmt.Println(err)
        }
    }
}

func newKafkaWriter(kafkaURL []string, topic string) *kafka.Writer {
    return kafka.NewWriter(kafka.WriterConfig{
        Brokers:  kafkaURL,
        Topic:    topic,
        Balancer: &kafka.Hash{},
    })
}

参考
http://zhongmingmao.me/2018/10/08/kafka-install-cluster-docker/
https://leel0330.github.io/golang/%E5%9C%A8go%E4%B8%AD%E4%BD%BF%E7%94%A8kafka/

docker搭建kafka环境&&Golang生产和消费

原文:https://www.cnblogs.com/alin-qu/p/11960949.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!