1 package com.it.baizhan.scalacode.Streaming 2 3 import java.text.SimpleDateFormat 4 import java.util.{Date, Properties} 5 6 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} 7 8 import scala.util.Random 9 10 /** 11 * 向 kafka 中生产数据 12 */ 13 object ProduceDataToKafka { 14 def main(args: Array[String]): Unit = { 15 val props = new Properties() 16 props.put("bootstrap.servers", "mynode1:9092,mynode2:9092,mynode3:9092") 17 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") 18 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") 19 20 val producer = new KafkaProducer[String,String](props) 21 var counter = 0 22 var keyFlag = 0 23 while(true){ 24 counter +=1 25 keyFlag +=1 26 val content: String = userlogs() 27 // producer.send(new ProducerRecord[String, String]("streamingtopic", content)) 28 producer.send(new ProducerRecord[String, String]("mytopic", s"key-$keyFlag", content)) 29 if(0 == counter%100){ 30 counter = 0 31 Thread.sleep(5000) 32 } 33 } 34 producer.close() 35 } 36 37 def userlogs()={ 38 val userLogBuffer = new StringBuffer("") 39 val timestamp = new Date().getTime(); 40 var userID = 0L 41 var pageID = 0L 42 43 //随机生成的用户ID 44 userID = Random.nextInt(2000) 45 46 //随机生成的页面ID 47 pageID = Random.nextInt(2000); 48 49 //随机生成Channel 50 val channelNames = Array[String]("Spark","Scala","Kafka","Flink","Hadoop","Storm","Hive","Impala","HBase","ML") 51 val channel = channelNames(Random.nextInt(10)) 52 53 val actionNames = Array[String]("View", "Register") 54 //随机生成action行为 55 val action = actionNames(Random.nextInt(2)) 56 57 val dateToday = new SimpleDateFormat("yyyy-MM-dd").format(new Date()) 58 userLogBuffer.append(dateToday) 59 .append("\t") 60 .append(timestamp) 61 .append("\t") 62 .append(userID) 63 .append("\t") 64 .append(pageID) 65 .append("\t") 66 .append(channel) 67 .append("\t") 68 .append(action) 69 System.out.println(userLogBuffer.toString()) 70 userLogBuffer.toString() 71 } 72 73 }
原文:https://www.cnblogs.com/zhouaimin/p/14680904.html