下载地址:https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz
lizhitao@localhost:~$ tar -xzf kafka_2.10-0.8.1.1.tgz
lizhitao@localhost:~$ cd kafka_2.10-0.8.1.1.tgz
配置zookeeper(假设您已经安装了zookeeper,如果没有安装,请再网上搜索安装方法)
进入kafka安装工程根目录编辑 vim config/server.properties
修改属性zookeeper.connect=ip:2181,ip2: 2181
kafka最为重要三个配置依次为:broker.id、log.dir、zookeeper.connect
kafka server端config/server.properties参数说明和解释如下:
(参考配置说明地址:http://blog.csdn.net/lizhitao/article/details/25667831)
#实际使用案例 这里211上面的kafka 配置文件broker.id=1 port=9092 host.name=192.168.1.211 num.network.threads=2 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=2 log.retention.hours=168 log.segment.bytes=536870912 log.retention.check.interval.ms=60000 log.cleaner.enable=false zookeeper.connect=192.168.1.213:2181,192.168.1.216:2181,192.168.1.217:2181 zookeeper.connection.timeout.ms=1000000 #kafka实际使用案例 210服务器kafka配置 broker.id=2 port=9092 host.name=192.168.1.210 num.network.threads=2 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=2 log.retention.hours=168 log.segment.bytes=536870912 log.retention.check.interval.ms=60000 log.cleaner.enable=false zookeeper.connect=192.168.1.213:2181,192.168.1.216:2181,192.168.1.217:2181 zookeeper.connection.timeout.ms=1000000
cd kafka-0.8.1
lizhitao@localhost:~$ bin/kafka-server-start.sh -daemon config/server.properties &
(实验时,需要启动至少两个broker bin/kafka-server-start.sh -daemon config/server-1.properties &)
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
lizhitao@localhost:~$ bin/kafka-topics.sh --list --zookeeper localhost:2181
localhost为zookeeper地址
topic描述:
bin/kafka-topics.sh --describe --zookeeper 192.168.1.8:2181 --topic test
发送一些消息验证,在console模式下,启动producer
bin/kafka-console-producer.sh --broker-list 192.168.1.9:9092 --topic zjcTest
(此处localhost改为本机ip,否则报错,I don’t know why)
消息:
{"price":"100000","userId":14615501351480021,"payType":3,"code":"AFD3B8","payTime":{"time":1457330791333,"minutes":6,"seconds":31,"hours":14,"month":2,"year":116,"timezoneOffset":-480,"day":1,"date":7},"orderId":12222096,"goodsName":"高中半年会员"}
步骤8:启动一个consumer
lizhitao@localhost:~$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic test666 --zookeeper 192.168.197.170:2181 ,192.168.197.171:2181
和单机环境一样,只是需要修改下broker 的配置文件而已。
1、将单机版的kafka 目录复制到其他几台电脑上。
2、修改每台电脑上的kafka 目录下的server.properties 文件。
broker.id=1//这个参数在kafka 的broker 集群中必须唯一,且为正整数。
3、启动每台电脑上的kafka 即可。
本机配置伪分布式
首先为每个节点编写配置文件:> cp config/server.properties config/server-1.properties> cp config/server.properties config/server-2.properties在拷贝出的新文件中添加以下参数:config/server-1.properties: broker.id=1 port=9093 log.dir=/tmp/kafka-logs-1config/server-2.properties: broker.id=2 port=9094 log.dir=/tmp/kafka-logs-2现在启动另外两个节点:> bin/kafka-server-start.sh config/server-1.properties &...> bin/kafka-server-start.sh config/server-2.properties &... 创建一个拥有3个副本的topic:> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic 运行“"describe topics”命令知道每个节点的信息> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0leader:负责处理消息的读和写,leader是从所有节点中随机选择的.replicas:列出了所有的副本节点,不管节点是否在服务中.isr:是正在服务中的节点.
<!-- kafka配置 --><dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</artifactId> <version>${kafka.version}</version> <exclusions><!-- 实际应用中单独引入下面的jar包,不使用kafka带的 --> <exclusion> <artifactId>zookeeper</artifactId> <groupId>org.apache.zookeeper</groupId> </exclusion> <exclusion> <artifactId>zkclient</artifactId> <groupId>com.101tec</groupId> </exclusion> <exclusion> <artifactId>slf4j-api</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions></dependency>#zookeeper.connect=192.168.1.8:2181,192.168.1.13:2181,192.168.1.16:2181#zookeeper.connect=zkserver1.vko.cn:2181,zkserver2.vko.cn:2181,zkserver3.vko.cn:2181zookeeper.connect=192.168.1.179:2181metadata.broker.list=192.168.1.179:9092#metadata.broker.list=kafka.server1.vko.cn:9092,kafka.server2.vko.cn:9092 #zookeeper.connect.timeout=15000#zookeeper.session.timeout.ms=15000#zookeeper.sync.time.ms=20000#auto.commit.interval.ms=20000#auto.offset.reset=smallest#serializer.class=kafka.serializer.StringEncoder#producer.type=async#queue.buffering.max.ms=6000 group.id=llxkafka.sellstat.topics=llx<!-- 这个是加载给spring 用的.--> <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <list> <value>classpath:kafka.properties</value> </list> </property> </bean><!-- 这个是用来在代码中注入用的.--> <bean id="configProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"> <property name="locations"> <list> <value>classpath:kafka.properties</value> </list> </property> </bean><!-- 定义收信人 receiver --><bean id="testReceiver" class="cn.vko.index.Receiver"> <constructor-arg index="0" value="${zookeeper.connect}" /> <constructor-arg index="1" value="${group.id}" /> <constructor-arg index="2" value="${kafka.sellstat.topics}"/> <constructor-arg index="3" ref="testConsumer" /> </bean> <!-- 定义消息处理器 --><bean id="testConsumer" class="cn.vko.index.TestConsumer" ></bean>package cn.vko.index; import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.ResponseBody; import cn.vko.common.base.JsonMsg;/** * 测试kafka发送消息 * @author lilixin * */@Controllerpublic class TestProducer { private Logger logger = LoggerFactory.getLogger(this.getClass()); @Value("#{configProperties[‘metadata.broker.list‘]}") private String metadataBrokerList; @Value("#{configProperties[‘kafka.sellstat.topics‘]}") private String topic; @ResponseBody @RequestMapping("send") public JsonMsg send(String msg){ logger.info("发送开始-------------------------"); VkoProducer vkoProducer =new VkoProducer(metadataBrokerList); logger.info("连接完成-------------------------"); vkoProducer.send(topic, msg); logger.info("发送完成-------------------------"); return new JsonMsg(); }}package cn.vko.index; import org.slf4j.Logger;import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service;/** * 测试kafka接收消息 * @author llx * */@Servicepublic class TestConsumer implements VkoConsumer{ private Logger logger = LoggerFactory.getLogger(this.getClass()); @Override public void dealMsg(String msg) { logger.info("--------kafka接收消息开始---------"); logger.info(msg); logger.info("--------kafka接收消息结束 ---------"); } }package cn.vko.index; import java.util.Properties; import org.apache.commons.lang3.StringUtils;import org.slf4j.Logger;import org.slf4j.LoggerFactory; import cn.vko.common.utils.mybatis.GenCreateInterceptor; import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig; public class VkoProducer { private Logger log = LoggerFactory.getLogger(VkoProducer.class); private String metadataBrokerList; private Producer<String, String> producer; public VkoProducer(String metadataBrokerList) { super(); if(StringUtils.isEmpty(metadataBrokerList)){ String message = "metadataBrokerList 不可以为空";// log.error(message); throw new RuntimeException(message); } this.metadataBrokerList = metadataBrokerList; // 设置配置属性 Properties props = new Properties(); props.put("metadata.broker.list", metadataBrokerList); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("key.serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); //props.put("producer.type", "async"); props.put("queue.buffering.max.ms", "5000"); props.put("queue.buffering.max.messages", "30000"); props.put("queue.enqueue.timeout.ms", "-1"); props.put("batch.num.messages", "1"); // 可选配置,如果不配置,则使用默认的partitioner //props.put("partitioner.class", "cn.vko.kafka.PartitionerDemo"); // 触发acknowledgement机制,否则是fire and forget,可能会引起数据丢失 // 值为0,1,-1,可以参考 // http://kafka.apache.org/08/configuration.html ProducerConfig config = new ProducerConfig(props); producer = new Producer<String, String>(config); } /** * 单条插入队列 * @param msg * @param topic 主题 * @return */ public String send(String topic, String msg) { log.info("向topic : "+topic + " 发送消息 ="+msg);// Long start = System.currentTimeMillis(); KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, msg); producer.send(data);// log.info("发送消息耗时:{}",System.currentTimeMillis()- start); return "ok"; }}package cn.vko.index; import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties; import org.slf4j.Logger;import org.slf4j.LoggerFactory; import cn.vko.common.utils.mybatis.GenCreateInterceptor;import cn.vko.component.pageframework.util.StringUtil; import kafka.consumer.Consumer;import kafka.consumer.ConsumerConfig;import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector; public class Receiver { private Logger log = LoggerFactory.getLogger(Receiver.class); private String zookeeperConnect; private String groupId; private String topic; private VkoConsumer vkoConsumer; /** * 创建收件人 * @param zookeeperConnect zk集群地址,逗号分隔 * @param groupId 组id * @param topic 主题 * @param vkoConsumer 处理器 */ public Receiver(String zookeeperConnect, String groupId, String topic,VkoConsumer vkoConsumer) { super(); if(StringUtil.isEmpty(zookeeperConnect)){ String message = "zookeeperConnect 不可以为空"; log.error(message); throw new RuntimeException(message); } if(StringUtil.isEmpty(groupId)){ String message = "groupId 不可以为空"; log.error(message); throw new RuntimeException(message); } if(StringUtil.isEmpty(topic)){ String message = "topic 不可以为空"; log.error(message); throw new RuntimeException(message); } if(vkoConsumer == null){ String message = "vkoConsumer 不可以为空"; log.error(message); throw new RuntimeException(message); } this.zookeeperConnect = zookeeperConnect; this.groupId = groupId; this.topic = topic; this.vkoConsumer = vkoConsumer; log.info("kafka vkoConsumer 创建完成:groupId:{},topic:{},zookeeperConnect:{}",groupId, topic, zookeeperConnect); receive(); } private void receive(){ Properties props = new Properties(); props.put("zookeeper.connect", zookeeperConnect); props.put("group.id", groupId); props.put("zookeeper.session.timeout.ms", "14000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); ConsumerConfig conf = new ConsumerConfig(props); ConsumerConnector cc = Consumer.createJavaConsumerConnector(conf); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); // 目前每个topic都是2个分区 topicCountMap.put(topic,2); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = cc.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); for (final KafkaStream<byte[], byte[]> stream : streams) { new Thread(){ public void run(){ ConsumerIterator<byte[], byte[]> it = stream.iterator(); while(it.hasNext()){ String msg = new String(it.next().message()); try{ vkoConsumer.dealMsg(msg); }catch(Exception e){ log.error("kafka vkoConsumer topic:{} 收到消息:{} 消费异常 xxxxxxxxxxxxxxxxxx", topic, msg,e); } log.info("kafka vkoConsumer topic:{} 收到消息:{}", topic, msg); } } }.start(); log.info("kafka vkoConsumer 启动完成:groupId:{},topic:{},zookeeperConnect:{}",groupId, topic, zookeeperConnect); } log.info("kafka vkoConsumer 准备接收消息:groupId:{},topic:{},zookeeperConnect:{}",groupId, topic, zookeeperConnect); }}package cn.vko.index; public interface VkoConsumer { public void dealMsg(String strings);}//实际项目中写的consumer@Servicepublic class SellStatConsumer implements VkoConsumer{ private Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired private ISellDetailService sellDetailService; @Autowired private ISellerService sellerService; @Autowired(required=false) private IIpServiceRemote ipServiceRemote; @Autowired(required=false) private IPhoneCityServiceRemote phoneCityServiceRemote; @Override public void dealMsg(String rowData) { if (!new JsonValidator().validate(rowData)) { logger.error("json error ...... : {}", rowData); return; } logger.info("========start kafka consumer=============="+rowData); JSONObject json = JSONObject.fromObject(rowData); PayInfoForm form = (PayInfoForm)JSONObject.toBean(json, PayInfoForm.class); //do something }}原文:http://www.cnblogs.com/lilixin/p/5775877.html