数组排序是一个常见的操作。基于比较的排序算法其性能下限是O(nlog(n)),但在分布式环境下面我们可以并发,从而提高性能。这里展示了Spark中数组排序的实现,并分析了性能,同时尝试找到导致性能提升的原因。
import sys from pyspark import SparkContext if __name__ == "__main__": if len(sys.argv) != 2: print >> sys.stderr, "Usage: sort <file>" exit(-1) sc = SparkContext(appName="PythonSort") lines = sc.textFile(sys.argv[1], 1) sortedCount = lines.flatMap(lambda x: x.split(' ')) .map(lambda x: (int(x), 1)) .sortByKey(lambda x: x) # This is just a demo on how to bring all the sorted data back to a single node. # In reality, we wouldn't want to collect all the data to the driver node. output = sortedCount.collect() for (num, unitcount) in output: print num sc.stop()
方法textFile读入一个文本文件,并在Spark环境里创建相应的RDD集。这个数据集存放在lines变量中。方法flatMap和map不同,map返回的是一个key,value的对,得到的RDD集和哈希表有点像。而flatMap的输出结果是数组。这个数组是对每个元素调用传入的lambda函数后得到的结果的并。这意味着传入的lambda函数可以返回0个或多个结果,比如:
>>> rdd = sc.parallelize([2, 3, 4]) >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect()) [1, 1, 1, 2, 2, 3]
容易看出,这段程序实际上只是调用了Spark提供的排序接口sortByKey,而不是在代码中实现一个排序算法。方法sortByKey的底层实现如下:
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size) : RDD[(K, V)] = { val part = new RangePartitioner(numPartitions, self, ascending) new ShuffledRDD[K, V, V](self, part) .setKeyOrdering(if (ascending) ordering else ordering.reverse) }
访问本人博客获得更多信息:magic01
原文:http://blog.csdn.net/alburthoffman/article/details/43957795