1 安装方法以及相关库文件
https://github.com/edenhill/librdkafka
2
consumer有两套API,高级(high-level)和底层(simple)的,应该叫底层API或者低级API,它跟高级API的区别是没有自动负载均衡,而高级API会自动进行负载均衡。
3 kafka主要的用途
发数据---->producer
//发一条
rd_kafka_produce()
//发多条
rd_kafka_produce_batch()
收数据---->consumer
在收发数据之前至少需要一个统一的句柄,方便kafka内部准备好链接brokers集群,初始化kafka内部结构等
建立这个kafka句柄需要知道连接到哪个broker
发布消息使用rd_kafka_top_t,
1 // 对rd_kafka_topic_partition_list_t结构的操作 2 // 创建 3 rd_kafka_topic_partition_list_new(); 4 // 增加元素 5 rd_kafka_topic_partition_list_add(); 6 // 删除元素 7 rd_kafka_topic_partition_list_del(); 8 // 查找元素 9 rd_kafka_topic_partition_list_find(); 10 11 // 对rd_kafka_topic_t的操作 12 // 创建 13 rd_kafka_topic_new(); 14 // 删除 15 rd_kafka_topic_destroy(); 16 // 获取该topic的名字 17 rd_kafka_topic_name(); 18 // 获取该topic传入的应用参数 19 rd_kafka_topic_opaque(); 20 21 // 使用rd_kafka_topic_partition_list_t的时候,topic+partition是连在一起的, 22 // 所以给kafka句柄的时候只用一个参数就够了 23 // 订阅消息 24 rd_kafka_subscribe (rd_kafka_t *rk, 25 const rd_kafka_topic_partition_list_t *topics); 26 // 指定消费的partition,可以在运行时更换 27 rd_kafka_assign (rd_kafka_t *rk, 28 const rd_kafka_topic_partition_list_t *partitions); 29 30 // 用rd_kafka_topic_t比较麻烦,需要配合一个partition才行 31 // 直接启动consumer了 32 rd_kafka_consume_start(rd_kafka_topic_t *rkt, int32_t partition, 33 int64_t offset); 34 // 每次接收也要带上partition 35 rd_kafka_message_t *rd_kafka_consume(rd_kafka_topic_t *rkt, int32_t partition, 36 int timeout_ms); 37 38 39 // 发送的时候使用 rd_kafka_topic_t 40 int rd_kafka_produce(rd_kafka_topic_t *rkt, int32_t partition, 41 int msgflags, 42 void *payload, size_t len, 43 const void *key, size_t keylen, 44 void *msg_opaque);
后续用了继续做笔记,关于错误处理,topic_partition_list操作等
https://blog.csdn.net/lijinqi1987/article/details/76582067
http://suntus.github.io/2016/07/07/librdkafka--kafka%20C%20api%E4%BB%8B%E7%BB%8D/
kafka之c接口常用API------librdkafka
原文:https://www.cnblogs.com/lanjianhappy/p/10636838.html