首页 > 其他 > 详细

通信项目 细抠

时间:2021-05-21 22:26:56      阅读:37      评论:0      收藏:0      [点我收藏+]

技术分享图片

生成通话记录:


    //存放tel的list集合
    private List<String> phoneList = new ArrayList<>();
    //存放tel和Name的Map集合
    private Map<String, String> phoneNameMap = new HashMap<>();

随机抽取手机号 配对 成 通话记录,如果重复用 while 满足!= break

随机生成时间
 SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");
 Date startDate = sdf1.parse(startTime);
 Date endDate = sdf1.parse(endTime);
 long randomTS = startDate.getTime() + (long) ((endDate.getTime() - startDate.getTime()) * Math.random());
 Date resultDate = new Date(randomTS);
 SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 String resultTimeString = sdf2.format(resultDate);
 return resultTimeString;


输出流每次写一条日之后需要flush,不然可能导致积攒多条数据才输出一次。
  public void writeLog(String filePath) {
      try {
          OutputStreamWriter osw = new OutputStreamWriter(new FileOutputStream(filePath,true), "UTF-8");
          while (true) {
              Thread.sleep(200);
              String log = product();
              System.out.println(log);
              osw.write(log + "\n");
              //一定要手动flush才可以确保每条数据都写入到文件一次
              osw.flush();
          }
      } catch (IOException e) {
          e.printStackTrace();
      } catch (InterruptedException e2) {
          e2.printStackTrace();
      }
  }


java -cp /root/work/xin_project/jars/ct_producer-1.0-SNAPSHOT.jar producer.ProductLog /root/work/xin_project/jars/calllog.csv

这个命令太麻烦了,便写成了sh脚本
cat product.sh 
#!bin/bash
java -cp /root/work/xin_project/jars/ct_producer-1.0-SNAPSHOT.jar producer.ProductLog /root/work/xin_project/jars/calllog.csv

这个项目启动需要的前置环境

http://192.168.182.179:50070

单个bigdata179上启动
/root/sof/hadoop-2.7.3/sbin/start-all.sh
/root/sof/hadoop-2.7.3/sbin/stop-all.sh

全部 bigdata179 180 181
/root/sof/zookeeper-3.4.10/bin/zkServer.sh start
/root/sof/zookeeper-3.4.10/bin/zkServer.sh stop

单个 bigdata179
/root/sof/hbase-1.3.0/bin/start-hbase.sh
/root/sof/hbase-1.3.0/bin/stop-hbase.sh
每个单个 对应关闭的时候是 每个都要做:
/root/sof/hbase-1.3.0/bin/hbase-daemon.sh stop master
/root/sof/hbase-1.3.0/bin/hbase-daemon.sh stop regionserver

单个
/root/sof/hbase-1.3.0/bin/hbase shell

全部
Kafka的 启动命令 (&表示后台启动):
/root/sof/kafka_2.11-2.1.1/bin/kafka-server-start.sh /root/sof/kafka_2.11-2.1.1/config/server.properties &

全部
Kafka的 关闭命令:
/root/sof/kafka_2.11-2.1.1/bin/kafka-server-stop.sh /root/sof/kafka_2.11-2.1.1/config/server.properties &

配置flume

█ 4. Flume

flume和kafka进行实时数据收集。

source channel sink event 

流程:

   1)source监控有没有新的数据 ,(某个文件或数据流,数据源产生新的数据)

   2)拿到该数据后,将数据封装在一个Event中,

   3)并put到channel后commit提交,channel队列先进先出,

   4)sink去channel队列中拉取数据,

   5)然后写入到HDFS中。
监控端口source:type  -->netcat                  日志打印sink:type-->logger
监控文件(通过shell命令)source:type  -->exec      hdfs  sink:type-->hdfs
监控目录source:type  -->spooldir		hdfs  sink:type-->hdfs

扇入(fan in):数据汇总,多层
     channel : type-->两种:内存、磁盘 (内存会有数据丢失的问题)
     bin/flume-ng agent --conf conf/ --name a3 --conf-file myconf/flume33.conf  
     bin/flume-ng agent -c conf -n a3 -f myconf/flume33.conf

扇出(fan out):数据作用于多个地方
     source 1个     replicating
     channel 多个  
     sink 多个
配置文件 dataStream是序列化文件, batchSize sink每次的量

agent.sources.kafkaSource.type=org.apache.flume.source.kafka.KafkaSource
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

#type的参数不能写成uuid,得写具体,否则找不到类
a1.sources.r1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder

技术分享图片

# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /root/work/XIN_project/jars/calllog.csv
a1.sources.r1.shell = /bin/bash -c

# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = bigdata179:9092,bigdata180:9092,bigdata181:9092
a1.sinks.k1.topic = calllog
a1.sinks.k1.batchSize = 20
a1.sinks.k1.requiredAcks = 1

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动命令:

/root/sof/flume/bin/flume-ng agent --conf /root/sof/flume/conf/ --name a1 --conf-file /root/sof/flume/jobconf/flume2kafka.conf

之后开始生产

$ sh productlog.sh

HBase 的数据结构

技术分享图片

数据消费 consumer:

技术分享图片

需要有配置文件说明 kafka的topic kafka的brokerlist:bigdata179:9092
技术分享图片
技术分享图片

写一个HBaseUtil 里面有 是否存在表 // 初始化命名空间 // 创建表 // 表是否存在
技术分享图片
技术分享图片
技术分享图片
技术分享图片

生成rowKey,直接进行的拼接 分区号在前+主叫+建立时间+被叫+flag+通话时长
技术分享图片

生成分区号:手机号的后4位 ^异或 通话年月201802 ---> 之后x.hashcode(); ---> 之后%regions Decimal格式化
技术分享图片

自定义分区
技术分享图片

ConnectionInstance设计模式,单出来
技术分享图片

HBaseDAO 就是一个关键的 PUT 方法,把数据传进去
先把数据进行切分,因为csv用的是逗号,,要计算分区号,计算rowkey,,设计batchsize 凑齐30个进行flush

技术分享图片
技术分享图片

!!!!!!!!!这里其实还有协处理器生成 f2的 部分

在协处理器中,一条主叫日志成功插入后,将该日志切换为被叫视角再次插入一次,放入到与主叫日志不同的列族中。
修改hbase-site.xml文件,设置协处理器,并群发该hbase-site.xml文件
协处理器放在的是 hbase-site.xml文件 都是传到集群上的所有机器。

插入的数据是 从rowkey里面拿到的 String oriRowKey = Bytes.toString(put.getRow());

技术分享图片
技术分享图片

发布jar包到hbase的lib目录下(注意需群发):

!!!!!!!!协处理器这里还有些问题

数据分析 MySQL

技术分享图片
技术分享图片
技术分享图片

通信项目 细抠

原文:https://www.cnblogs.com/ming-michelle/p/14797215.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!