首页 > 编程语言 > 详细

BitMap算法知识笔记以及在大数据方向的使用

时间:2020-01-27 10:38:17      阅读:139      评论:0      收藏:0      [点我收藏+]

概述

所谓的BitMap算法就是位图算法,简单说就是用一个bit位来标记某个元素所对应的value,而key即是该元素,由于BitMap使用了bit位来存储数据,因此可以大大节省存储空间,这是很常用的数据结构,比如用于Bloom Filter中、用于无重复整数的排序等等。bitmap通常基于数组来实现,数组中每个元素可以看成是一系列二进制数,所有元素组成更大的二进制集合。

基本思想

 我用一个简单的例子来详细介绍BitMap算法的原理。假设我们要对0-7内的5个元素(4,7,2,5,3)进行排序(这里假设元素没有重复)。我们可以使用BitMap算法达到排序目的。要表示8个数,我们需要8个byte,

  1.首先我们开辟一个字节(8byte)的空间,将这些空间的所有的byte位都设置为0

  2.然后便利这5个元素,第一个元素是4,因为下边从0开始,因此我们把第五个字节的值设置为1

  3.然后再处理剩下的四个元素,最终8个字节的状态如下图

技术分享图片

       4.现在我们遍历一次bytes区域,把值为1的byte的位置输出(2,3,4,5,7),这样便达到了排序的目的

从上面的例子我们可以看出,BitMap算法的思想还是比较简单的,关键的问题是如何确定10进制的数到2进制的映射图。

MAP映射:

假设需要排序或则查找的数的总数N=100000000,BitMap中1bit代表一个数字,1个int = 4Bytes = 4*8bit = 32 bit,那么N个数需要N/32 int空间。所以我们需要申请内存空间的大小为int a[1 + N/32],其中:a[0]在内存中占32为可以对应十进制数0-31,依次类推:

  a[0]-----------------------------> 0-31

  a[1]------------------------------> 32-63

  a[2]-------------------------------> 64-95

  a[3]--------------------------------> 96-127

那么十进制数如何转换为对应的bit位,下面介绍用位移将十进制数转换为对应的bit位:

  1.求十进制数在对应数组a中的下标

  十进制数0-31,对应在数组a[0]中,32-63对应在数组a[1]中,64-95对应在数组a[2]中………,使用数学归纳分析得出结论:对于一个十进制数n,其在数组a中的下标为:a[n/32]

  2.求出十进制数在对应数a[i]中的下标

  例如十进制数1在a[0]的下标为1,十进制数31在a[0]中下标为31,十进制数32在a[1]中下标为0。 在十进制0-31就对应0-31,而32-63则对应也是0-31,即给定一个数n可以通过模32求得在对应数组a[i]中的下标。

  3.位移

  对于一个十进制数n,对应在数组a[n/32][n%32]中,但数组a毕竟不是一个二维数组,我们通过移位操作实现置1

  a[n/32] |= 1 << n % 32
  移位操作:
  a[n>>5] |= 1 << (n & 0x1F)

  n & 0x1F 保留n的后五位 相当于 n % 32 求十进制数在数组a[i]中的下标。

BitMap简单使用示例

用户标签使用BitMap的数据结构来存储,比如表示用户对应的标签表如下所示:

技术分享图片

 如果使用标签,也就是一个标签对应多个用户,如下所示,比较简单一看就会:

技术分享图片

 让每一个标签存储包含此标签的所有用户 ID,每一个标签都是一个独立的 Bitmap。这样,实现用户的去重和查询统计,就变得一目了然:

技术分享图片

 对上面例子的使用:

在用户群做交集和并集运算的时候,例如:

1,如何查找使用苹果手机的程序员用户?

技术分享图片

 2.如何查找所有男性或者00后的用户?

技术分享图片

 3,同样是刚才的例子,我们给定 90 后用户的 Bitmap,再给定一个全量用户的 Bitmap。最终要求出的是存在于全量用户,但又不存在于 90 后用户的部分。

技术分享图片

 如何求出呢?我们可以使用异或操作,即相同位为 0,不同位为 1。 (1^1)

技术分享图片

BitMap的代码实现

java实现: 

 1 /**
 2  * ClassName BitMap4.java
 3  * author Rhett.wang
 4  * version 1.0.0
 5  * Description TODO
 6  * createTime 2020年01月24日 07:53:00
 7  */
 8 public class BitMap4 {
 9     //保存数据的
10     private byte[] bits;
11 
12     //能够存储多少数据
13     private int capacity;
14 
15 
16     public BitMap4(int capacity){
17         this.capacity = capacity;
18 
19         //1bit能存储8个数据,那么capacity数据需要多少个bit呢,capacity/8+1,右移3位相当于除以8
20         bits = new byte[(capacity >>3 )+1];
21     }
22 
23     public void add(int num){
24         // num/8得到byte[]的index
25         int arrayIndex = num >> 3;
26 
27         // num%8得到在byte[index]的位置
28         int position = num & 0x07;
29 
30         //将1左移position后,那个位置自然就是1,然后和以前的数据做|,这样,那个位置就替换成1了。
31         bits[arrayIndex] |= 1 << position;
32     }
33 
34     public boolean contain(int num){
35         // num/8得到byte[]的index
36         int arrayIndex = num >> 3;
37 
38         // num%8得到在byte[index]的位置
39         int position = num & 0x07;
40 
41         //将1左移position后,那个位置自然就是1,然后和以前的数据做&,判断是否为0即可
42         return (bits[arrayIndex] & (1 << position)) !=0;
43     }
44 
45     public void clear(int num){
46         // num/8得到byte[]的index
47         int arrayIndex = num >> 3;
48 
49         // num%8得到在byte[index]的位置
50         int position = num & 0x07;
51 
52         //将1左移position后,那个位置自然就是1,然后对取反,再与当前值做&,即可清除当前的位置了.
53         bits[arrayIndex] &= ~(1 << position);
54 
55     }
56 
57     public static void main(String[] args) {
58         BitMap4 bitmap = new BitMap4(100);
59         bitmap.add(7);
60         System.out.println("插入7成功");
61 
62         boolean isexsit = bitmap.contain(7);
63         System.out.println("7是否存在:"+isexsit);
64 
65         bitmap.clear(7);
66         isexsit = bitmap.contain(7);
67         System.out.println("7是否存在:"+isexsit);
68     }
69 }

PYTHON代码实现:

 1 class BitMap():
 2     def __init__(self,max):
 3         self.size=int((max +31 -1)/31)
 4         self.array=[0 for i in range(self.size)]
 5 
 6     def bitindex(self,num):
 7         return num%31
 8 
 9     def set_1(self,num):
10         elemindex=(num//31)
11         byteindex=self.bitindex(num)
12         ele=self.array[elemindex]
13         self.array[elemindex] = ele| (1<< byteindex)
14 
15 
16     def test_1(self,i):
17         elemindex=(i//31)
18         bytearray= self.bitindex(i)
19         if self.array[elemindex] & (1 << bytearray):
20             return True
21         return False
22 
23 if __name__ =="__main__":
24     Max = ord(‘z‘)
25     shuffle_array=[x for x in ‘qwelajkda‘]
26     ret =[]
27     bitmap =BitMap(Max)
28     for c in shuffle_array:
29         bitmap.set_1(ord(c))
30 
31     for i in range(Max+1):
32         if bitmap.test_1(i):
33             ret.append(chr(i))
34 
35     print(u‘原始数组是:%s‘ % shuffle_array)
36     print(u‘排序以后的数组是:%s‘ % ret)

scala代码实现

 1 /**
 2   * ClassName BitMap.java
 3   * author Rhett.wang
 4   * version 1.0.0
 5   * Description TODO
 6   * createTime 2020年01月24日 10:30:00
 7   */
 8 class BitMap(bitmap:Array[Byte], length:Int) {
 9 
10 }
11 object BitMap {
12   var bitmap:Array[Int]=Array()
13   def main(args: Array[String]) {
14 
15     var bitmaps=TestBit(100)
16     setBit(32)
17     println(getBit(32))
18     println(getBit(11))
19   }
20   def TestBit(length:Int):Unit={
21     bitmap= new Array[Int]((length >> 5).toInt + (if ((length & 31) > 0) 1
22     else 0))
23   }
24 
25   def getBit(index: Long): Int = {
26     var intData: Int = bitmap(((index - 1) >> 5).toInt)
27     var offset: Int = ((index - 1) & 31).toInt
28     return intData >> offset & 0x01
29   }
30 
31   def setBit(index: Long) {
32     var belowIndex: Int = ((index - 1) >> 5).toInt
33     var offset: Int = ((index - 1) & 31).toInt
34     var inData: Int = bitmap(belowIndex)
35     bitmap(belowIndex) = inData | (0x01 << offset)
36   }
37 
38    /* def clear(num:Int):Unit={
39       var arrayIndex: Int = num >> 5
40       var position: Int = num & 0x1F
41       bitmap(arrayIndex) =(bitmap(arrayIndex) & ~(1 << position)).toByte
42     }*/
43 
44 
45 }

Roaring BitMap的原理和使用

位图索引被广泛用于数据库和搜索引擎中,通过利用位级并行,它们可以显著加快查询速度。但是,位图索引会占用大量的内存,因此我们会更喜欢压缩位图索引。 Roaring Bitmaps 就是一种十分优秀的压缩位图索引,后文统称 RBM。压缩位图索引有很多种,比如基于 RLE(Run-Length Encoding运行长度编码)的WAH (Word Aligned Hybrid Compression Scheme) 和 Concise (Compressed ‘n’ Composable Integer Set)。相比较前者, RBM 能提供更优秀的压缩性能和更快的查询效率。

RBM 的用途和 Bitmap 很差不多(比如说索引),只是说从性能、空间利用率各方面更优秀了。目前 RBM 已经在很多成熟的开源大数据平台中使用,简单列几个作为参考。

1 Apache Lucene and derivative systems such as Solr and Elasticsearch,
2 Metamarkets’ Druid,
3 Apache Spark,
4 Apache Hive,
5 eBay’s Apache Kylin,
6 ……

RBM 的主要思想并不复杂,简单来讲,有如下三条:

1,我们将 32-bit 的范围 ([0, n)) 划分为 2^16 个桶,每一个桶有一个 Container 来存放一个数值的低16位;

2,在存储和查询数值的时候,我们将一个数值 k 划分为高 16 位(k % 2^16)和低 16 位(k mod 2^16),取高 16 位找到对应的桶,然后在低 16 位存放在相应的 Container 中;

3,容器的话, RBM 使用两种容器结构: Array Container 和 Bitmap Container。Array Container 存放稀疏的数据,Bitmap Container 存放稠密的数据。即,若一个 Container 里面的 Integer 数量小于 4096,就用 Short 类型的有序数组来存储值。若大于 4096,就用 Bitmap 来存储值。

如下图,就是官网给出的一个例子,三个容器分别代表了三个数据集:

the list of the first 1000 multiples of 62

all integers [216, 216 + 100)

all even numbers in [2216, 3216)

技术分享图片

举例说明:

看完前面的还不知道在说什么?没关系,举个栗子说明就好了。现在我们要将 821697800 这个 32 bit 的整数插入 RBM 中,整个算法流程是这样的:

821697800 对应的 16 进制数为 30FA1D08, 其中高 16 位为 30FA, 低16位为 1D08。

我们先用二分查找从一级索引(即 Container Array)中找到数值为 30FA 的容器(如果该容器不存在,则新建一个),从图中我们可以看到,该容器是一个 Bitmap 容器。

找到了相应的容器后,看一下低 16 位的数值 1D08,它相当于是 7432,因此在 Bitmap 中找到相应的位置,将其置为 1 即可。

技术分享图片

下面介绍到的是RoaringBitmap的核心,三种Container。

通过上面的介绍我们知道,每个32位整形的高16位已经作为key存储在RoaringArray中了,那么Container只需要处理低16位的数据。

ArrayContainer

结构很简单,只有一个short[] content,将16位value直接存储。

short[] content始终保持有序,方便使用二分查找,且不会存储重复数值。

因为这种Container存储数据没有任何压缩,因此只适合存储少量数据。

ArrayContainer占用的空间大小与存储的数据量为线性关系,每个short为2字节,因此存储了N个数据的ArrayContainer占用空间大致为2N字节。存储一个数据占用2字节,存储4096个数据占用8kb。

根据源码可以看出,常量DEFAULT_MAX_SIZE值为4096,当容量超过这个值的时候会将当前Container替换为BitmapContainer。

BitmapContainer

这种Container使用long[]存储位图数据。我们知道,每个Container处理16位整形的数据,也就是0~65535,因此根据位 图的原理,需要65536个比特来存储数据,每个比特位用1来表示有,0来表示无。每个long有64位,因此需要1024个long来提供65536个 比特。

因此,每个BitmapContainer在构建时就会初始化长度为1024的long[]。这就意味着,不管一个BitmapContainer中只存储了1个数据还是存储了65536个数据,占用的空间都是同样的8kb。

解释一下为什么这里用的 4096 这个阈值?因为一个 Integer 的低 16 位是 2Byte,因此对应到 Arrary Container 中的话就是 2Byte * 4096 = 8KB;同样,对于 Bitmap Container 来讲,2^16 个 bit 也相当于是 8KB。

RunContainer

RunContainer中的Run指的是行程长度压缩算法(Run Length Encoding),对连续数据有比较好的压缩效果。

它的原理是,对于连续出现的数字,只记录初始数字和后续数量。即:

对于数列11,它会压缩为11,0;
对于数列11,12,13,14,15,它会压缩为11,4;
对于数列11,12,13,14,15,21,22,它会压缩为11,4,21,1;
源码中的short[] valueslength中存储的就是压缩后的数据。

这种压缩算法的性能和数据的连续性(紧凑性)关系极为密切,对于连续的100个short,它能从200字节压缩为4字节,但对于完全不连续的100个short,编码完之后反而会从200字节变为400字节。

如果要分析RunContainer的容量,我们可以做下面两种极端的假设:

最好情况,即只存在一个数据或只存在一串连续数字,那么只会存储2个short,占用4字节

最坏情况,0~65535的范围内填充所有的奇数位(或所有偶数位),需要存储65536个short,128kb

代码测试示例:

 1 import org.roaringbitmap.RoaringBitmap;
 2 
 3 import java.util.function.Consumer;
 4 
 5 /**
 6  * ClassName RBitMap.java
 7  * author Rhett.wang
 8  * version 1.0.0
 9  * Description TODO
10  * createTime 2020年01月25日 21:09:00
11  */
12 public class RBitMap {
13     public static void main(String[] args) {
14         test1();
15     }
16     private static void test1(){
17         //向rr中添加1、2、3、1000四个数字
18         RoaringBitmap rr = RoaringBitmap.bitmapOf(1,2,3,1000);
19         //创建RoaringBitmap rr2
20         RoaringBitmap rr2 = new RoaringBitmap();
21         //向rr2中添加10000-12000共2000个数字
22         rr2.add(10000L,12000L);
23         //返回第3个数字是1000,第0个数字是1,第1个数字是2,则第3个数字是1000
24         rr.select(3);
25         //返回value = 2 时的索引为 1。value = 1 时,索引是 0 ,value=3的索引为2
26         rr.rank(2);
27         //判断是否包含1000
28         rr.contains(1000); // will return true
29         //判断是否包含7
30         rr.contains(7); // will return false
31 
32         //两个RoaringBitmap进行or操作,数值进行合并,合并后产生新的RoaringBitmap叫rror
33         RoaringBitmap rror = RoaringBitmap.or(rr, rr2);
34         //rr与rr2进行位运算,并将值赋值给rr
35         rr.or(rr2);
36         //判断rror与rr是否相等,显然是相等的
37         boolean equals = rror.equals(rr);
38         if(!equals) throw new RuntimeException("bug");
39         // 查看rr中存储了多少个值,1,2,3,1000和10000-12000,共2004个数字
40         long cardinality = rr.getLongCardinality();
41         System.out.println(cardinality);
42         //遍历rr中的value
43         for(int i : rr) {
44             System.out.println(i);
45         }
46         //这种方式的遍历比上面的方式更快
47         rr.forEach((Consumer<? super Integer>) i -> {
48             System.out.println(i.intValue());
49         });
50     }
51 }

在RoaringBitmap中,32位整数被分成了2^16个块。任何一个32位整数的前16位决定放在哪个块里。后16位就是放在这个块里的内容。比如0xFFFF0000和0xFFFF0001,前16位都是FFFF,表明这两个数应该放在一个块里。后16位分别是0和1。在这个块中指保存0和1就可以了,不需要保存完整的整数。

在最开始的时候,一个块中包含一个长度为4的short数组,后16位所对应的值就存在这个short数组里。注意在插入的时候要保持顺序性,这里就需要用到二分查找来加快速度了。如果当块中的元素大于short数组的长度时,就需要重新分配更大的数组,把当前数组copy过去,并把新值插入对应的位置。扩展数组大小和STL中vector的方式类似,不过并不是完全的加倍,而且上限是4096,也就是说最多只保存4096个元素。那么问题来了,超过了4096怎么办呢?

一个块里最多可能需要存放2^16个元素,那么如果是用short来存放,最多需要65536个short,那么就是131072个byte。如果换一种方式,用位来存储元素,那么就需要65536个bit,相当于1024个long型数组,即2048个int,也就是4096个short。

所以,当一个块中元素数量小于等于4096的时候,用有序short数组来保存元素,而当元素数量大于4096的时候,用长度为1024的long数组来按位表示元素是否存在。

当bitmap中有多个块的时候,块的信息是用数组来保存的。这个数组同样需要保持顺序性,也是用二分查找找到一个块的位置。所以,当一个整数过来之后,首先根据前16位计算出块的key,然后在块的数组中二分查找。找到的话,就把后16位保存在这个块中。找不到,就创建一个新块,把后16位保存在块中,再把块插入对应的位置。

 大数据分析常用去重算法分析-Kylin

首先,请大家思考一个问题:在大数据处理领域中,什么环节是你最不希望见到的?以我的观点来看,shuffle 是我最不愿意见到的环节,因为一旦出现了非常多的 shuffle,就会占用大量的磁盘和网络 IO,从而导致任务进行得非常缓慢。而今天我们所讨论的去重分析,就是一个会产生非常多 shuffle 的场景,先大概介绍一下shuffle原理:

Shuffle描述着数据从map task输出到reduce task输入的这段过程。在分布式情况下,reduce task需要跨节点去拉取其它节点上的map task结果。这一过程将会产生网络资源消耗和内存,磁盘IO的消耗。

技术分享图片

先来看以下场景:

技术分享图片

我们有一张商品访问表,表上有 item 和 user_id 两个列,我们希望求商品的 UV,这是去重非常典型的一个场景。我们的数据是存储在分布式平台上的,分别在数据节点 1 和 2 上。

我们从物理执行层面上想一下这句 SQL 背后会发生什么故事:首先分布式计算框架启动任务, 从两个节点上去拿数据, 因为 SQL group by 了 item 列, 所以需要以 item 为 key 对两个表中的原始数据进行一次 shuffle。我们来看看需要 shuffle 哪些数据:因为 select/group by了 item,所以 item 需要 shuffle 。但是,user_id  我们只需要它的一个统计值,能不能不 shuffle 整个 user_id 的原始值呢?

如果只是简单的求 count 的话, 每个数据节点分别求出对应 item 的 user_id 的 count, 然后只要 shuffle 这个 count 就行了,因为count 只是一个数字, 所以 shuffle 的量非常小。但是由于分析的指标是 count distinct,我们不能简单相加两个节点user_id 的 count distinct 值,我们只有得到一个 key 对应的所有 user_id 才能统计出正确的 count distinct值,而这些值原先可能分布在不同的节点上,所以我们只能通过 shuffle 把这些值收集到同一个节点上再做去重。而当 user_id 这一列的数据量非常大的时候,需要 shuffle 的数据量也会非常大。我们其实最后只需要一个 count 值,那么有办法可以不 shuffle 整个列的原始值吗?我下面要介绍的两种算法就提供了这样的一种思路,使用更少的信息位,同样能够求出该列不重复元素的个数(基数)

技术分享图片

第一种要介绍的算法是一种精确的去重算法,主要利用了 Bitmap 的原理。Bitmap 也称之为 Bitset,它本质上是定义了一个很大的 bit 数组,每个元素对应到 bit 数组的其中一位。例如有一个集合[2,3,5,8]对应的 Bitmap 数组是[001101001],集合中的 2 对应到数组 index 为 2 的位置,3 对应到 index 为 3 的位置,下同,得到的这样一个数组,我们就称之为 Bitmap。很直观的,数组中 1 的数量就是集合的基数。追本溯源,我们的目的是用更小的存储去表示更多的信息,而在计算机最小的信息单位是 bit,如果能够用一个 bit 来表示集合中的一个元素,比起原始元素,可以节省非常多的存储。

这就是最基础的 Bitmap,我们可以把 Bitmap 想象成一个容器,我们知道一个 Integer 是32位的,如果一个 Bitmap 可以存放最多 Integer.MAX_VALUE 个值,那么这个 Bitmap 最少需要 32 的长度。一个 32 位长度的 Bitmap 占用的空间是512 M (2^32/8/1024/1024),这种 Bitmap 存在着非常明显的问题:这种 Bitmap 中不论只有 1 个元素或者有 40 亿个元素,它都需要占据 512 M 的空间。回到刚才求 UV 的场景,不是每一个商品都会有那么多的访问,一些爆款可能会有上亿的访问,但是一些比较冷门的商品可能只有几个用户浏览,如果都用这种 Bitmap,它们占用的空间都是一样大的,这显然是不可接受的。

技术分享图片

 对于上节说的问题,有一种设计的非常的精巧 Bitmap,叫做 Roaring Bitmap,能够很好地解决上面说的这个问题。我们还是以存放 Integer 值的 Bitmap 来举例,Roaring Bitmap 把一个 32 位的 Integer 划分为高 16 位和低 16 位,取高 16 位找到该条数据所对应的 key,每个 key 都有自己的一个 Container。我们把剩余的低 16 位放入该 Container 中。依据不同的场景,有 3 种不同的 Container,分别是 Array Container、Bitmap Container 和 Run Container,下文将一一介绍。

技术分享图片

 首先第一种,是 Roaring Bitmap 初始化时默认的 Container,叫做 Array Container。Array Container 适合存放稀疏的数据,Array Container 内部的数据结构是一个 short array,这个 array 是有序的,方便查找。数组初始容量为 4,数组最大容量为 4096。超过最大容量 4096 时,会转换为 Bitmap Container。这边举例来说明数据放入一个 Array Container 的过程:有 0xFFFF0000 和 0xFFFF0001 两个数需要放到 Bitmap 中, 它们的前 16 位都是 FFFF,所以他们是同一个 key,它们的后 16 位存放在同一个 Container 中; 它们的后 16 位分别是 0 和 1, 在 Array Container 的数组中分别保存 0 和 1 就可以了,相较于原始的 Bitmap 需要占用 512M 内存来存储这两个数,这种存放实际只占用了 2+4=6 个字节(key 占 2 Bytes,两个 value 占 4 Bytes,不考虑数组的初始容量)。

技术分享图片

第二种 Container 是 Bitmap Container,其原理就是上文说的 Bitmap。它的数据结构是一个 long 的数组,数组容量固定为 1024,和上文的 Array Container 不同,Array Container 是一个动态扩容的数组。这边推导下 1024 这个值:由于每个 Container 还需处理剩余的后 16 位数据,使用 Bitmap 来存储需要 8192 Bytes(2^16/8), 而一个 long 值占 8 个 Bytes,所以一共需要 1024(8192/8)个 long 值。所以一个 Bitmap container 固定占用内存 8 KB(1024 * 8 Byte)。当 Array Container 中元素到 4096 个时,也恰好占用 8 k(4096*2Bytes)的空间,正好等于 Bitmap 所占用的 8 KB。而当你存放的元素个数超过 4096 的时候,Array Container 的大小占用还是会线性的增长,但是 Bitmap Container 的内存空间并不会增长,始终还是占用 8 K,所以当 Array Container 超过最大容量(DEFAULT_MAX_SIZE)会转换为 Bitmap Container。

我们自己在 Kylin 中实践使用 Roaring Bitmap 时,我们发现 Array Container 随着数据量的增加会不停地 resize 自己的数组,而 Java 数组的 resize 其实非常消耗性能,因为它会不停地申请新的内存,同时老的内存在复制完成前也不会释放,导致内存占用变高,所以我们建议把 DEFAULT_MAX_SIZE 调得低一点,调成 1024 或者 2048,减少 Array Container 后期 reszie 数组的次数和开销。

技术分享图片

 最后一种 Container 叫做Run Container,这种 Container 适用于存放连续的数据。比如说 1 到 100,一共 100 个数,这种类型的数据称为连续的数据。这边的Run指的是Run Length Encoding(RLE),它对连续数据有比较好的压缩效果。原理是对于连续出现的数字, 只记录初始数字和后续数量。例如: 对于 [11, 12, 13, 14, 15, 21, 22],会被记录为 11, 4, 21, 1。很显然,该 Container 的存储占用与数据的分布紧密相关。最好情况是如果数据是连续分布的,就算是存放 65536 个元素,也只会占用 2 个 short。而最坏的情况就是当数据全部不连续的时候,会占用 128 KB 内存。

技术分享图片

 总结:用一张图来总结3种 Container 所占的存储空间,可以看到元素个数达到 4096 之前,选用 Array Container 的收益是最好的,当元素个数超过了 4096 时,Array Container 所占用的空间还是线性的增长,而 Bitmap Container 的存储占用则与数据量无关,这个时候 Bitmap Container 的收益就会更好。而 Run Container 占用的存储大小完全看数据的连续性, 因此只能画出一个上下限范围 [4 Bytes, 128 KB]。

技术分享图片

 我们再来看一下Bitmap 在 Kylin 中的应用,Kylin 中编辑 measure 的时候,可以选择 Count Distinct,且Return Type 选为 Precisely,点保存就可以了。但是事情没有那么简单,刚才上文在讲 Bitmap 时,一直都有一个前提,放入的值都是数值类型,但是如果不是数值类型的值,它们不能够直接放入 Bitmap,这时需要构建一个全区字典,做一个值到数值的映射,然后再放入 Bitmap 中。

技术分享图片

在 Kylin 中构建全局字典,当列的基数非常高的时候,全局字典会成为一个性能的瓶颈。针对这种情况,社区也一直在努力做优化,这边简单介绍几种优化的策略,更详细的优化策略可以见文末的参考链接。

技术分享图片

 1)当一个列的值完全被另外一个列包含,而另一个列有全局字典,可以复用另一个列的全局字典

技术分享图片

 2)当精确去重指标不需要跨 Segment 聚合的时候,可以使用这个列的 Segment 字典代替(这个列需要字典编码)。在 Kylin 中,Segment 就相当于时间分片的概念。当不会发生跨 Segments 的分析时,这个列的 Segment 字典就可以代替这个全局字典。

技术分享图片

 3)如果你的 cube 包含很多的精确去重指标,可以考虑将这些指标放到不同的列族上。不止是精确去重,像一些复杂 measure,我们都建议使用多个列族去存储,可以提升查询的性能。

 快手BitBase数据库

技术分享图片

 如上图所示,首先将原始数据的一列的某个值抽象成 bitmap(比特数组),举例:city=bj,city 是维度,bj (北京) 是维度值,抽象成 bitmap 值就是10100,表示第0个用户在 bj,第1个用户不在北京,依次类推。然后将多维度之间的组合转换为 bitmap 计算:bitmap 之间做与、或、非、异或,举例:比如在北京的用户,且兴趣是篮球,这样的用户有多少个,就转换为图中所示的两个 bitmap 做与运算,得到橙色的 bitmap,最后,再对 bitmap 做 count 运算。count 表示统计“1”的个数,list 是列举“1”所在的数组 index,业务上表示 userId。

SparkSQL自定义聚合函数(UDAF)实现bitmap函数

创建表:使用phoenix在HBase中创建测试表,字段使用VARBINARY类型

1 CREATE TABLE IF NOT EXISTS test_binary (
2 date VARCHAR NOT NULL,
3 dist_mem VARBINARY
4  CONSTRAINT test_binary_pk PRIMARY KEY (date)
5  ) SALT_BUCKETS=6;

创建完成后使用RoaringBitmap序列化数据存入数据库:

技术分享图片

 自定义代码:

  1 import org.apache.spark.sql.Row;
  2 import org.apache.spark.sql.expressions.MutableAggregationBuffer;
  3 import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
  4 import org.apache.spark.sql.types.DataType;
  5 import org.apache.spark.sql.types.DataTypes;
  6 import org.apache.spark.sql.types.StructField;
  7 import org.apache.spark.sql.types.StructType;
  8 import org.roaringbitmap.RoaringBitmap;
  9  
 10 import java.io.*;
 11 import java.util.ArrayList;
 12 import java.util.List;
 13  
 14 /**
 15  * 实现自定义聚合函数Bitmap
 16  */
 17 public class UdafBitMap extends UserDefinedAggregateFunction {
 18     @Override
 19     public StructType inputSchema() {
 20         List<StructField> structFields = new ArrayList<>();
 21         structFields.add(DataTypes.createStructField("field", DataTypes.BinaryType, true));
 22         return DataTypes.createStructType(structFields);
 23     }
 24  
 25     @Override
 26     public StructType bufferSchema() {
 27         List<StructField> structFields = new ArrayList<>();
 28         structFields.add(DataTypes.createStructField("field", DataTypes.BinaryType, true));
 29         return DataTypes.createStructType(structFields);
 30     }
 31  
 32     @Override
 33     public DataType dataType() {
 34         return DataTypes.LongType;
 35     }
 36  
 37     @Override
 38     public boolean deterministic() {
 39         //是否强制每次执行的结果相同
 40         return false;
 41     }
 42  
 43     @Override
 44     public void initialize(MutableAggregationBuffer buffer) {
 45         //初始化
 46         buffer.update(0, null);
 47     }
 48  
 49     @Override
 50     public void update(MutableAggregationBuffer buffer, Row input) {
 51         // 相同的executor间的数据合并
 52         // 1. 输入为空直接返回不更新
 53         Object in = input.get(0);
 54         if(in == null){
 55             return ;
 56         }
 57         // 2. 源为空则直接更新值为输入
 58         byte[] inBytes = (byte[]) in;
 59         Object out = buffer.get(0);
 60         if(out == null){
 61             buffer.update(0, inBytes);
 62             return ;
 63         }
 64         // 3. 源和输入都不为空使用bitmap去重合并
 65         byte[] outBytes = (byte[]) out;
 66         byte[] result = outBytes;
 67         RoaringBitmap outRR = new RoaringBitmap();
 68         RoaringBitmap inRR = new RoaringBitmap();
 69         try {
 70             outRR.deserialize(new DataInputStream(new ByteArrayInputStream(outBytes)));
 71             inRR.deserialize(new DataInputStream(new ByteArrayInputStream(inBytes)));
 72             outRR.or(inRR);
 73             ByteArrayOutputStream bos = new ByteArrayOutputStream();
 74             outRR.serialize(new DataOutputStream(bos));
 75             result = bos.toByteArray();
 76         } catch (IOException e) {
 77             e.printStackTrace();
 78         }
 79         buffer.update(0, result);
 80     }
 81  
 82     @Override
 83     public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
 84         //不同excutor间的数据合并
 85         update(buffer1, buffer2);
 86     }
 87  
 88     @Override
 89     public Object evaluate(Row buffer) {
 90         //根据Buffer计算结果
 91         long r = 0l;
 92         Object val = buffer.get(0);
 93         if (val != null) {
 94             RoaringBitmap rr = new RoaringBitmap();
 95             try {
 96                 rr.deserialize(new DataInputStream(new ByteArrayInputStream((byte[]) val)));
 97                 r = rr.getLongCardinality();
 98             } catch (IOException e) {
 99                 e.printStackTrace();
100             }
101         }
102         return r;
103     }
104 }

调用例子:

 1  /**
 2      * 使用自定义函数解析bitmap
 3      *
 4      * @param sparkSession
 5      * @return
 6      */
 7     private static void udafBitmap(SparkSession sparkSession) {
 8         try {
 9             Properties prop = PropUtil.loadProp(DB_PHOENIX_CONF_FILE);
10             // JDBC连接属性
11             Properties connProp = new Properties();
12             connProp.put("driver", prop.getProperty(DB_PHOENIX_DRIVER));
13             connProp.put("user", prop.getProperty(DB_PHOENIX_USER));
14             connProp.put("password", prop.getProperty(DB_PHOENIX_PASS));
15             connProp.put("fetchsize", prop.getProperty(DB_PHOENIX_FETCHSIZE));
16             // 注册自定义聚合函数
17             sparkSession.udf().register("bitmap",new UdafBitMap());
18             sparkSession
19                     .read()
20                     .jdbc(prop.getProperty(DB_PHOENIX_URL), "test_binary", connProp)
21                     // sql中必须使用global_temp.表名,否则找不到
22                     .createOrReplaceGlobalTempView("test_binary");
23             //sparkSession.sql("select YEAR(TO_DATE(date)) year,bitmap(dist_mem) memNum from global_temp.test_binary group by YEAR(TO_DATE(date))").show();
24             sparkSession.sql("select date,bitmap(dist_mem) memNum from global_temp.test_binary group by date").show();
25         } catch (Exception e) {
26             e.printStackTrace();
27         }
28     }  

总结

感谢网络大神的分享:技术共勉

https://kyligence.io/zh/blog/count-distinct-bitmap/

https://www.jianshu.com/p/ded6e8ecd0d1

https://blog.csdn.net/xiongbingcool/article/details/81282118

http://www.sohu.com/a/327627247_315839

https://blog.csdn.net/xywtalk/article/details/52590275

 

 

 

 

 

 

 

BitMap算法知识笔记以及在大数据方向的使用

原文:https://www.cnblogs.com/boanxin/p/12232604.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!