首页 > 其他 > 详细

如何解决生产环境中的数据倾斜问题

时间:2020-11-11 21:59:53      阅读:38      评论:0      收藏:0      [点我收藏+]

     无论是对于Flink、Spark这样的实时计算框架还是hive等离线计算框架,数据量从来不是问题,真正引起问题导致严重后果的是数据倾斜。所谓数据倾斜,是指在大规模并行处理的数据中,其中某个运行节点处理的数据远远超过其他部分,这会导致该节点压力极大,最终出现运行失败从而导致整个任务失败。

数据倾斜背景和危害

数据倾斜原理

     目前我们所知道的大数据处理框架,比如Flink、Spark、Hadoop等之所以能处理高达千亿的数据,是因为这些框架都利用了分布式计算的思想,集群中多个计算节点并行,使得数据处理能力得到线性扩展。
     在实际生产中Flink是以集群的形式在运行,在运行的过程中包含两类进程。其中TaskManager实际负责执行计算的Worker,在其上执行Flink Job的一组Task,Task则是我们执行具体代码逻辑的容器。理论上只要我们的任务Task足够多就可以对足够大的数据进行处理。
     但是实际上大数据量经常出现,一个Flink作业包含200个Task节点,其中有199个节点可以在很短的时间内完成计算。但是有一个节点执行时间远超其他结果,并且随着数据量的持续增加,导致该计算节点挂掉,从而整个任务失败重启。我们可以在Flink的管理界面中看到任务的某一个Task数据量远超其他节点。

数据倾斜原因和解决方案

     Flink任务出现数据倾斜的直观表现是任务节点频繁出现反压,但是增加并行度后并不能解决问题;部分节点出现OOM异常,是因为大量的数据集中在某个节点上,导致该节点内存被爆,任务失败重启。
     产生数据倾斜的原因主要有2个方面:

  • 业务上有严重的数据热点,比如滴滴打车的订单数据中北京、上海等几个城市的订单远超其他地区
  • 技术上大量使用了keyBy、GroupBy等操作,错误的使用了分组Key,人为产生数据热点

     解决问题的思路也很清晰:

  • 业务上要尽量避免热点Key的设计,例如我们可以把北京、上海等热点城市分成不同的区域,并进行单独处理
  • 技术上出现热点时,要调整方案打散原来的key,避免直接聚合;此外Flink还提供了大量的功能可以避免数据倾斜

Flink任务数据倾斜场景和解决方案

两阶段聚合解决KeyBy热点

     KeyBy是经常使用的分组聚合函数之一。在实际的业务中经常会碰到这样的场景:双11按照下单用户所在的省聚合求订单量最高的前10个省,或者按照用户的手机类型聚合求访问量最高的设备类型等。
     上述场景在外面进行KeyBy时就会出现严重的数据倾斜,如下图所示:
技术分享图片

如果我们简单的使用KeyBy算子,模拟一个简单的统计PV的场景如下:

DataStream sourceStream = ...;
windowedStream = sourceStream.keyBy(‘type‘)
       .window(TumblingEventTimeWindows.of(Time.minutes(1)));
windowedStream.process(new MyPVFunction())
       .addSink(new MySink())...
env.execute()....

我们根据type进行keyBy时,如果数据的type分布不均匀就会导致大量的数据分配到一个task中去,发生数据倾斜。
解决思路:

  • 首先把分组的key打散,比如加随机后缀
  • 对打散后的数据进行聚合
  • 把打散的key还原为真正的key
  • 二次keyby进行结果统计
DataStream sourceStream = ...;

resultStream = sourceStream

     .map(record -> {

        Record record = JSON.parseObject(record, Record.class);

        String type = record.getType();

        record.setType(type + "#" + new Random().nextInt(100));

        return record;

      })

      .keyBy(0)

      .window(TumblingEventTimeWindows.of(Time.minutes(1)))

      .aggregate(new CountAggregate())

      .map(count -> {

        String key = count.getKey.substring(0, count.getKey.indexOf("#"));

        return RecordCount(key,count.getCount);

      })

      //二次聚合

      .keyBy(0)

      .process(new CountProcessFunction);

resultStream.sink()...

env.execute()...

其中CountAggregate函数实现如下:

public class CountAggregate implements AggregateFunction<Record,CountRecord,CountRecord> {

    @Override

    public CountRecord createAccumulator() {

        return new CountRecord(null, 0L);

    }

    @Override

    public CountRecord add(Record value, CountRecord accumulator) {

        if(accumulator.getKey() == null){

            accumulator.setKey(value.key);

        }

        accumulator.setCount(value.count);

        return accumulator;

    }

    @Override

    public CountRecord getResult(CountRecord accumulator) {

        return accumulator;

    }

    @Override

    public CountRecord merge(CountRecord a, CountRecord b) {

        return new CountRecord(a.getKey(),a.getCount()+b.getCount()) ;

    }

}

CountProcessFunction的实现如下:

public class CountProcessFunction extends KeyedProcessFunction<String, CountRecord, CountRecord> {

    private ValueState<Long> state = this.getRuntimeContext().getState(new ValueStateDescriptor("count",Long.class));

    @Override

    public void processElement(CountRecord value, Context ctx, Collector<CountRecord> out) throws Exception {

        if(state.value()==0){

            state.update(value.count);

            ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 1000L * 5);

        }else{

            state.update(state.value() + value.count);

        }

    }

    @Override

    public void onTimer(long timestamp, OnTimerContext ctx, Collector<CountRecord> out) throws Exception {

        //这里可以做业务操作,例如每 5 分钟将统计结果发送出去

        //out.collect(...);

        //清除状态

        state.clear();

        //其他操作

        ...

    }

}

通过上面打散聚合在二次聚合的方式,我们就可以实现热点key的打散,消除数据倾斜。

GroupBy+Aggregation 分组聚合热点问题

     业务上通过groupby进行分组,然后紧跟一个SUM、COUNT等聚合函数操作是非常常见的。GroupBy函数会根据Key进行分组,完全依赖Key的设计,如果Key出现热点,那么会导致巨大的shuffle,相同key的数据会被发往同一个节点处理;如果某个key的数据量过大则会直接导致该节点成为计算瓶颈,引起反压。
     按照上面的分组统计PV的场景,SQL语句如下:

select

      date,

      type,

      sum(count) as pv

from table

      group by

      date,

      type;

可以通过内外两层聚合的方式将SQL改为:

select date,

       type,

       sum(pv) as pv

from(

  select

        date,

        type,

        sum(count) as pv

  from table

        group by

        date,

        type,

        floor(rand()*100) --随机打散成100份 

    )

    group by 

    date,

    type;

在上面的SQL拆成了内外两层,第一次通过随机打散100份的方式减少数据热点,当然这个打散的方式可以根据业务灵活指定。

Flink消费Kafka上下游并行度不一致导致的数据倾斜

     在使用Flink处理实时业务时,上游一般都是消息系统,Kafka是使用最广泛的大数据消息系统。当使用Flink消费Kafka数据时,也会出现数据倾斜。
     需要十分注意的是,Flink消费Kafka数据时,时推荐上下游并行去保持一致,即Kafka的分区数等于Flink Consumer的并行度。
     但是有一种情况,为了加快数据的处理速度,来设置Flink消费者的并行度大于Kafka的分区数。如果不做任何的设置则会导致部分Flink Consumer现场永远消费不到数据。这时需要设置Flink的Redistributing,也就是数据重分配。Flink提供8种重分区策略,在接收到Kafka消息后,可以通过自定义数据分区策略实现数据的负载均衡,如:

dataStream

        .setParallelism(2)

        // 采用REBALANCE分区策略重分区

        .rebalance() //.rescale()

        .print()

        .setParallelism(4);

rebalance分区策略,数据会以round-robin的方式对数据进行再次分区,可以全局负载均衡。rescale分区策略基于上下游的并行度,会讲数据以循环的方式输出到下游的每个实例中。

其他

     Flink一直在不断迭代,不断出现各种各样的手段解决遇到的数据倾斜问题,例如MiniBatch微批处理手段等,需要不断去发现。

如何解决生产环境中的数据倾斜问题

原文:https://www.cnblogs.com/liufei-yes/p/13960807.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!