最近买了一本《Python宝典》在看,此书所讲Python知识的广度明显,但是深度略显不足,所以比较适合入门及提高级的读者来看。其中对于Python大数据处理一章的内容比较有兴趣,看明白了以后,我根据书上提供的案例对源代码进行了修改,也实现了模拟MapReduce的过程。
目标:从Apache的用户访问日志access.log中统计出页面资源的访问量。我们假设这个文件体积十分巨大。
access中的信息结构:66.249.68.43 - - [04/Aug/2011:01:06:48 +0800] "GET /某页面地址 HTTP/1.1" 200 4100
含义:66.249.68.43 来源ip, [04/Aug/2011:01:06:48 +0800] 日期时间,"GET /某页面地址 HTTP/1.1" 来源方式,200 返回状态,4100 返回字节数
步骤:1、将大文件切割成多个小文件;2、计算每个小文件中的页面访问次数(map过程),每一个小文件对应一个计算结果文件;3、将每个小文件中的计算结果文件进行合并累加,统计出最终的页面资源访问量,结果保存到reduceResult.txt中。
文件结构:
其中,access.log是原始的日志文件,smallFiles中保存分割后的一组小文件,mapFiles中存入每个小文件对应的处理结果文件,reduceResult.txt保存最终的处理结果。
以下是源码:
''' Created on 2014-12-19 @author: guoxiyue @file:0fileSplit.py @function:文件分割 ''' import os,os.path,time; sourceFile=open('files/access.log','r',encoding='utf8'); #打开原始大文件 targetDir='files/smallFiles'; # 设置小文件的保存目录 smallFileSize=30; #设置每个小文件中的记录条数 tempList=[]; #临时列表,用于暂存小文件中的记录内容 fileNum=1; # 小文件序号 readLine=sourceFile.readline(); #先读一行 while(readLine): #循环 lineNum=1 while(lineNum<=smallFileSize): #控制每个小文件的记录条数不超过设定值 tempList.append(readLine); #将当前行的读取结果保存到临时列表中 readLine=sourceFile.readline(); #再读下一行 lineNum+=1;# 行号自增 if not readLine:break;#如果读到空,则说明大文件读完,退出内层循环 tempFile=open('files/smallFiles/access_'+str(fileNum)+'.txt','w',encoding='utf8');#将读取到的30条记录保存到文件中 tempFile.writelines(tempList); tempFile.close(); tempList=[];#清空临时列表 print('files/smallFiles/access_'+str(fileNum)+'.txt 创建于 '+str(time.asctime())); fileNum+=1; #文件序号自增 sourceFile.close();
''' Created on 2014-12-19 @author: guoxiyue @file:1map.py @function:map过程,分别计算每一个小文件中的页面资源访问量 ''' import os,os.path,re,time; sourceFileList=os.listdir('files/smallFiles/'); #获取所有小文件文件名列表 targetDir="files/mapFiles/"; #设置处理结果保存目录 for eachFile in sourceFileList: #遍历小文件 currentFile=open('files/smallFiles/'+eachFile,'r',encoding='utf8'); #打开小文件 currentLine=currentFile.readline(); #先读一行 tempDict={}; #临时字典 while(currentLine): p_re=re.compile("(GET|POST)\s(.*?)\sHTTP", re.IGNORECASE); #用正则表达式来提取访问资源 match=p_re.findall(currentLine); #从当前行中提取出访问资源 if(match): url=match[0][1]; #提出资源页面 if url in tempDict: #获取当前资源的访问次数,并添加到字典中 tempDict[url]+=1; else: tempDict[url]=1; currentLine=currentFile.readline(); #再读下一行 currentFile.close(); #以下是将当前小文件的统计结果排序并保存 tList=[]; for key,value in sorted(tempDict.items(),key=lambda data:data[0],reverse=True): tList.append(key+' '+str(value)); tList.append('\n') tempFile=open(targetDir+eachFile,'a',encoding='utf8'); tempFile.writelines(tList); tempFile.close() print(targetDir+eachFile+'.txt 创建于 '+str(time.asctime()));
''' Created on 2014-12-19 @author: guoxiyue @file:2reduce.py @function:reduce过程,汇总最终的页面资源访问量 ''' import os,os.path,re,time; sourceFileList=os.listdir('files/mapFiles/'); #获取小文件的map结果文件名列表 targetFile='files/reduceResult.txt'; # 设置最终结果保存文件 tempDict={}; #临时字典 p_re=re.compile('(.*?)(\d{1,}$)', re.IGNORECASE); #利用正则表达式抽取资源访问次数 for eachFile in sourceFileList:#遍历map文件 currentFile=open('files/mapFiles/'+eachFile,'r',encoding='utf8'); #打开当前文件 currentLine=currentFile.readline(); #读一行 while(currentLine): subData=p_re.findall(currentLine) #提取出当前行中资源的访问次数 if(subData[0][0] in tempDict): #将结果累加 tempDict[subData[0][0]]+=int(subData[0][1]); else: tempDict[subData[0][0]]=int(subData[0][1]); currentLine=currentFile.readline();#再读一行 currentFile.close(); #以下是将所有map文件的统计结果排序并保存 tList=[]; for key,value in sorted(tempDict.items(),key=lambda data:data[1],reverse=True): tList.append(key+' '+str(value)); tList.append('\n') tempFile=open(targetFile,'a',encoding='utf8'); tempFile.writelines(tList); tempFile.close() print(targetFile+' 创建于 '+str(time.asctime()));
Python3 模拟MapReduce处理分析大数据文件——《Python宝典》
原文:http://blog.csdn.net/laoyaotask/article/details/42023859