一、序言
交通数据处理是智能交通的一个很关键的要素,更好的分析交通数据,可以为市政管理、交通信号管制、道路规划、交通设施建设提供更好的咨询和建议。全国各地政府都在寄期望于智能交通,以缓解城市拥堵,甚至一定程度上解决大城市病或者说是市政建设滞后的问题。同时,诸如百度地图、谷歌地图、高德地图、微软地图都推出了相应的交通应用,以期找到更大的商机。
用好的存储方法和好的算法进行分析,在批处理方面可以更多的分析历史数据,分析和发现问题,为未来进行预测以及公共查询服务;在实时计算方面可以更多的进行交通监控、突发事件处理、甚至是罪犯跟踪。
因此,寻求好的存储策略,好的计算算法,成为非常必要解决的问题。
二、数据概述及存储
目前交通数据有人流量数据、汽车数据。前者对于有大规模地铁,公共交通的城市十分有用,如北京,上海,其必要前提是能通过设备采集到人流信息,对于城市管理者而言,一卡通是最好的工具。因此,在北京,交通一卡通成为了监测地铁,公交车客流信息的主要采集工具,客流信息的采集准确性也相当高,信息数据的格式也容易控制。而后者,目前主要是通过摄像头拍照采集,存在图像识别度不高,设备故障影响,数据格式半结构化,数据缺失(部分信息没有采集到),脏数据存在(拍到人,自行车)。
对于贵阳这样的三线城市,一卡通还没有普及使用,故而监测公交客流人流信息存在诸多困难。但车流信息由于平安城市的建设,变得各个交通路口存在大量的摄像头以采集数据,从而为车流信息分析提供了可能。
对于贵阳车流信息数据,存储的格式是以日志形式传输到文件服务器,并压缩存储,每天大约有1千万条数据。想要处理这些文本,要分析其格式,并转存于数据库中。
在大数据流行的今天,最好的存储和处理方式当然是放在hdfs上,并用相应的nosql分布式数据库进行管理。但作为初步研究使用,分析的内容没有那么多,财力也没有那么大,所以找了一台小型机,16G的内存,CPU8*2.3MHZ。应用的数据信息也只有时间,拍摄地点,车牌号三个字段,故经过解析文本,直接将者三个字段存储到了mysql中,以便处理。
三、数据处理的目标
初步处理数据的目标有三个:
四、计算过程
目标1,可以通过sql语句很容易达到;目标2,在前一篇《R语言空间换时间算法、Hash键值对在字符串处理中的应用》已经完成;本篇主要解决目标3。
解决目标3的主要思想是根据同一辆车的运行轨迹,以经过路口为节点,按照到达某一路段的时间,并归结到划分的时间段内,并计数到该路段的某一时刻中。
直接上代码,以下是原始版本,其思想借鉴的前篇博文的算法思想,以空间换时间,遍历900多万条数据,只需要几个小时。
rm(list=ls(all=TRUE)) gc() library(RODBC) library(hash) # 读取文件中排序好的路口地址数据 address_file <-file("/home/wanglinlin/transport/address.txt","r") sorted_address <-readLines(address_file) sorted_address_hash_pairs<-hash(sorted_address,1:269) close(address_file) #矩阵的列表示形式 transection_code_length <-length(sorted_address)*length(sorted_address) transection_code_matrix <- matrix(0,transection_code_length,1) k=1; for(i in 1:269){ for(j in 1:269){ transection_code_matrix[k]<- i*1000+j k=k+1 } } transection_code_hash<-hash(transection_code_matrix,1:transection_code_length) #transection_code_hash time<-file("/home/wanglinlin/transport/time.txt","r") traffic_time<-readLines(time) close(time) length(traffic_time) traffic_time_hash<-hash(traffic_time,1:length(traffic_time)) traffic_time_hash[["1409501100"]] trafic_flow <- matrix(0,nrow=length(traffic_time),ncol=transection_code_length) channel=odbcConnect("transport-connector-R", uid="transport", pwd="transport") #连接mysql test 数据库 sqlTables(channel) # 显示test数据库中的表格 trafic_flow_data<-sqlQuery(channel,"select unix_timestamp(time),plate,address from transport20140901 order by plate,time") trafic_flow_count<-sqlQuery(channel,"select count(*) from transport20140901") trafic_flow_data<-as.matrix(trafic_flow_data) #找出所有车牌号,并散列化,形成键值对表 plates<-sqlQuery(channel,"select distinct plate from transport20140901") odbcClose(channel) plate_list=(as.matrix(plates))[,1] plate_count=length(plate_list) plate_hash_pairs=hash(plate_list,1:plate_count) #各路段车流计数 transection_code_hash[[toString(sorted_address_hash_pairs[[trafic_flow_data[1000,3]]] * 1000 +sorted_address_hash_pairs[[trafic_flow_data[1001,3]]])]] trafic_flow_count_number<-as.numeric(trafic_flow_count)-1 for(i in 1:trafic_flow_count_number){ if(plate_hash_pairs[[trafic_flow_data[i,2]]]==plate_hash_pairs[[trafic_flow_data[i+1,2]]]){ start_time_stamp <- as.numeric(trafic_flow_data[i,1])-as.numeric(trafic_flow_data[i,1])%%300 count_times <- ceiling((as.numeric(trafic_flow_data[i+1,1])-start_time_stamp)/300) col_index_code <- sorted_address_hash_pairs[[trafic_flow_data[i,3]]] * 1000 +sorted_address_hash_pairs[[trafic_flow_data[i+1,3]]] col_index <- transection_code_hash[[toString(col_index_code)]] for(j in 1:count_times){ timestamp <- start_time_stamp + 300 * j row_index <- traffic_time_hash[[toString(timestamp)]] trafic_flow[row_index,col_index] <- trafic_flow[row_index,col_index] + 1 } } } write.table(trafic_flow,"/home/wanglinlin/transport/trafic_flow.txt",row.names = FALSE,col.names = FALSE) #把trafic_flow中的数据读取第一个五分钟车流量到矩阵中 five_minues_trafic_flow <- matrix(0,269,269) for(i in 1:269){ for (j in 1:269){ five_minues_trafic_flow[i,j]=trafic_flow[1,transection_code_hash[[toString(i*1000+j)]]] } } write.table(five_minues_trafic_flow,"/home/wanglinlin/transport/five_minues_trafic_flow.txt",row.names = FALSE,col.names = FALSE)上述代码中,最终的各路段车流信息存储于一个矩阵中,每行代表所有路段的一个时间段的车流信息。借助了对路口地址的编码和hash散列。建立编码和散列的过程一方面消耗了时间,另一方面也增加了内存开销。除此之外,将某一时段的车流信息以正规的方式显示,任然依赖于编码和散列,对后续不利。故可进一步优化这一部分,以三维数组存储车流信息,x,y平面刻画各路口的信息,z轴刻画时间变化。代码如下:
rm(list=ls(all=TRUE)) gc() library(RODBC) library(hash) # 读取文件中按照拼音排序好的路口地址数据 address_file <-file("/home/wanglinlin/transport/address.txt","r") sorted_address <-readLines(address_file) sorted_address_hash_pairs<-hash(sorted_address,1:269) #生成路口地址hash键值对,便于通过路口地址名称找序号 close(address_file) #sorted_address #class(sorted_address_hash_pairs[["黔灵西路(合群路口-威清路口)"]]) time<-file("/home/wanglinlin/transport/time.txt","r") traffic_time<-readLines(time) close(time) length(traffic_time) traffic_time_hash<-hash(traffic_time,1:length(traffic_time)) traffic_time_hash[["1409501100"]] channel=odbcConnect("transport-connector-R", uid="transport", pwd="transport") #连接mysql test 数据库 sqlTables(channel) # 显示test数据库中的表格 trafic_flow_data<-sqlQuery(channel,"select unix_timestamp(time),plate,address from transport20140901 order by plate,time") trafic_flow_count<-sqlQuery(channel,"select count(*) from transport20140901") trafic_flow_data<-as.matrix(trafic_flow_data) #找出所有车牌号,并散列化,形成键值对表 plates<-sqlQuery(channel,"select distinct plate from transport20140901") odbcClose(channel) plate_list=(as.matrix(plates))[,1] plate_count=length(plate_list) plate_hash_pairs=hash(plate_list,1:plate_count) trafic_flow <- array(0, dim=c(269,269,length(traffic_time))) trafic_flow_count_number<-as.numeric(trafic_flow_count)-1 for(i in 1:trafic_flow_count_number){ if(plate_hash_pairs[[trafic_flow_data[i,2]]]==plate_hash_pairs[[trafic_flow_data[i+1,2]]]){ start_time_stamp <- as.numeric(trafic_flow_data[i,1])-as.numeric(trafic_flow_data[i,1])%%300 count_times <- ceiling((as.numeric(trafic_flow_data[i+1,1])-start_time_stamp)/300) x_index <- sorted_address_hash_pairs[[trafic_flow_data[i,3]]] y_index <- sorted_address_hash_pairs[[trafic_flow_data[i+1,3]]] for(j in 1:count_times){ timestamp <- start_time_stamp + 300 * j z_index <- traffic_time_hash[[toString(timestamp)]] trafic_flow[x_index,y_index,z_index] <- trafic_flow[x_index,y_index,z_index] + 1 } } } write.table(trafic_flow,"/home/wanglinlin/transport/trafic_flow_roads.txt",row.names = FALSE,col.names = FALSE) #把trafic_flow中的数据读取第一个五分钟车流量到矩阵中 #write.table(trafic_flow[,,1],"/home/wanglinlin/transport/five_minues_trafic_flow.txt",row.names = FALSE,col.names = FALSE)
可以看到的是,算法通过优化,具备了更好的额性能,只有通过不断优化,才会更好。
五、总结
解决了一个计算目标,下次也算迈出了新的一步,如果有对算法的最终优化版,到时候再贴上来。
==================================关于原创的声明==================================
本博主的所有原创文章,非本人书面授权,严禁转载。
可以应用于一切非商用的内部学习和讨论研究。
欢迎通过留言或email进行广泛交流。
原文:http://blog.csdn.net/gufe_hfding/article/details/46463933