k means 是经典的聚类算法,这里不详细介绍。而初始化的kmeans的效果及收敛时间极端依赖初始化的K个点,对此有改进的算法k means ++ ,但是由于其实现过程不能并行对于大数据集的聚类显得有点势单力薄。对此,斯坦福提出了K means || 做出改进,详细证明内容请参考文献1。k-means∥ 改变了的取样策略,并不是 k-means++ 那样每次只取一个样本,而是每次遍历取样 O(k) 个样本,重复该取样过程大约 O(logn) 次,重复取样过后共得到 O(klogn) 个样本点组成的集合,该集合以常数因子近似于最优解,然后再聚类这O(klogn) 个点成 k 个点,最后将这 k 个点作为初始聚类中心送入Lloyd迭代中,实际实验证明 O(logn) 次重复取样是不需要的,一般5次重复取样就可以得到一个较好的聚类初始中心。正由于此,它的每次采样可以并行实现,本文尝试使用mapreduce实现实现该初始化过程。
本文使用mrjob框架方便使用python编写mapreduce作业。
下面是主文件内容
from compute_dist import Compute_dist from update_centriods import Update_centriods import pickle as pk import sys def get_sum_dist(job, runner): for line in runner.stream_output(): key, value = job.parse_output_line(line) return value def write_sum_dist_to_disk(sum_dist, fileName): sumdistFile = open(fileName, "w") sumdistFile.truncate() sumdistFile.write(str(sum_dist)) sumdistFile.close() def get_centroids(job, runner): centroids = [] for line in runner.stream_output(): key, value = job.parse_output_line(line) if value == 1: centroids.append(key) return centroids def write_centroids_to_disk(centroids, fileName): centroidFile = open(fileName, "a") for centroid in centroids: line = ‘ ‘.join(str(i) for i in centroid) line = line.strip() centroidFile.write("\n" + line) centroidFile.close() SUMDIST_FILE = "sum.txt" CENTROIDS_FILE = "centroids.txt" if __name__ == ‘__main__‘: args = sys.argv[1:] computeJob = Compute_dist(args = args + [‘--centroids=‘+CENTROIDS_FILE]) for i in range(5): with computeJob.make_runner() as computeJobRunner: computeJobRunner.run() sum_dist = get_sum_dist(computeJob, computeJobRunner) write_sum_dist_to_disk(sum_dist, SUMDIST_FILE) updaterJob = Update_centriods(args=args + [‘--centroids=‘+CENTROIDS_FILE, ‘--sumdist=‘+SUMDIST_FILE]) with updaterJob.make_runner() as updaterJobRunner: updaterJobRunner.run() newCentroids = get_centroids(updaterJob, updaterJobRunner) write_centroids_to_disk(newCentroids, CENTROIDS_FILE)
一下是两个副文件
from mrjob.job import MRJob from mrjob.step import MRStep import random import numpy as np import pickle as pk #from hdfs.client import Client class Compute_dist(MRJob): def distEclud(self, vecA, vecB): dist_int = 0 for i in range(len(vecA)): dist_int = dist_int + (vecA[i]-vecB[i])**2 dist = dist_int ** 0.5 return dist def configure_args(self): super(Compute_dist, self).configure_args() self.add_file_arg(‘--centroids‘) def get_centroids(self): centroidsFile = open(self.options.centroids) lines = centroidsFile.readlines() centroids = [] for line in lines: line = line.strip().split(‘ ‘) line = [int(x) for x in line] centroids.append(line) centroidsFile.close() return centroids def mapper_get_sum_dist(self, _, line): line = line.strip().split(‘ ‘) line = [int(x) for x in line] centroids = self.get_centroids() Dx = [] for c in centroids: temp = self.distEclud(c,line) Dx.append(temp) min_dist = min(Dx) yield None, min_dist def reducer_get_sum_dist(self, _, dist): yield None, sum(dist) def steps(self): return [MRStep(mapper = self.mapper_get_sum_dist, reducer = self.reducer_get_sum_dist)] if __name__ == ‘__main__‘: Compute_dist.run()
from mrjob.job import MRJob from mrjob.step import MRStep import random import numpy as np import pickle as pk class Update_centriods(MRJob): def distEclud(self, vecA, vecB): dist_int = 0 for i in range(len(vecA)): dist_int = dist_int + (vecA[i]-vecB[i])**2 dist = dist_int ** 0.5 return dist def configure_args(self): super(Update_centriods, self).configure_args() self.add_file_arg(‘--centroids‘) self.add_file_arg(‘--sumdist‘) def get_centroids(self): centroidsFile = open(self.options.centroids) lines = centroidsFile.readlines() centroids = [] for line in lines: line = line.strip().split(‘ ‘) line = [int(x) for x in line] centroids.append(line) centroidsFile.close() return centroids def get_sum_dist(self): sumdistFile = open(self.options.sumdist) dist = sumdistFile.readline().strip() dist = float(dist) sumdistFile.close() return dist def mapper_sample_centroids(self, _, line): dist = self.get_sum_dist() line = line.strip().split(‘ ‘) line = [int(x) for x in line] centroids = self.get_centroids() Dx = [] for c in centroids: temp = self.distEclud(c, line) Dx.append(temp) min_dist = min(Dx) prob = (5*min_dist)/dist r = random.random() if r < prob: yield line, 1 else: yield line, 0 def steps(self): return [MRStep(mapper = self.mapper_sample_centroids)] if __name__ == ‘__main__‘: Update_centriods.run()
参考文献:
1.Bahman Bahmani,Benjamin Moseley,Andrea Vattani.Scalable K-Means++
原文:https://www.cnblogs.com/londist/p/11151830.html