简化下,数据源是kafka消息,目标是将消息按主键(一种可区分消息重复方法)插入pg表(有则更新,无则插入);
源消息确定有重复的,尽量保证入库速度;另外,producer端已经保证key相同的消息入到了同一个分区,数据量8000万消息
首先key相同的消息在同一个分区,那么consumer可以按分区个数,起多个实例,一一对应;
对于某一个consumer,单批消费下来N条消息,将N条数据拆分成K份,提交给线程池异步执行upsert入库;
这里要考虑的一个问题是,如果上一批消息的某些线程执行upsert时尚未结束,下一次poll消息执行入库又开始了,
可能发生的问题是,两个批次中,有相同key的消息,那么将会出现>=2个线程对同一条记录执行update,产生死锁,
实际在生产环境确实发生了,和服务器性能关系较大,性能稍差的服务器,入库速度显著慢一些,更容易发生
因此,这个地方需要设计成:poll一批消息之前,必须阻塞,等待上一批消息完全处理完成
然后因为一次插入N(假设消费N完全不重复,N可配置,可以是很大)条数据入库耗时长,改进为均分后用多个线程并行入库
线程使用的是初始化好的线程池
springboot+maven+postgres
需要开启的功能:异步(启动类@EnableAsync)线程池(配置注入方式)
一、注入一个线程池,这里可以用spring自带的,也可以用java原生的,spring自带的,类名多一个单词task:ThreadPoolTaskExecutor
其他的注解的作用,可配置的参数,根据需要查阅配置
@Configuration public class ExecutorConfig { @Value("${thread.pool.core.pool.size}") private int corePoolSize; @Value("${thread.pool.max.pool.size}") private int maxPoolSize; @Value("${thread.pool.queue.capacity}") private int queueCapacity; @Value("${thread.pool.name.prefix}") private String threadNamePrefix; /** * spring自带的 */ @Bean(name = "springThreadPool") public Executor springThreadPoolConfig() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); executor.setThreadNamePrefix(threadNamePrefix); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } /** * java原生的 */ @Bean(name = "javaThreadPool") public Executor javaThreadPoolConfig() { return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); } }
二、将需要执行入库的业务逻辑异步放入线程池执行
@Service public class PhotoServiceImpl{ @Autowired private PhotoDao photoDao; /** * 这里使用异步,并定义异步执行的线程池标志 * 返回必须是Future<Void> 或 Future<T> * 前者仅表示异步线程执行后,需要拿到执行结果 */ @Override @Async("springThreadPool") public Future<Void> batchUpsert(List<PhotoModel> list) { if (!CollectionUtils.isEmpty(list)) { photoDao.batchUpsert(list); log.debug("batch upsert photo,size:{}", list.size()); } return new AsyncResult<>(null); } }
三、消费下来的消息预处理:包括内存中去重覆盖,分组,入口方法是下面的execute(),是一个List<Future<Void>>
它的size取决于这个批次分成了多少组(每组都是一个线程去执行入库,在for中)
@Service public class BatchProcessPhotoService { //单个线程最大处理记录数 @Value("${batch.upsert.max.record.size}") private int batchUpsertMaxRecordSize; //批量提交开启的线程数 @Value("${batch.upsert.thread.size}") private int threadSize; @Autowired private PhotoService photoService; public List<Future<Void>> execute(List<PhotoModel> list) { log.info("batch upsert dw_human_photo start"); List<List<PhotoModel>> splitList = split(list); List<Future<Void>> futures = new ArrayList<>(); for (List<PhotoModel> everyList : splitList) { Future<Void> future = photoService.batchUpsert(everyList); futures.add(future); } return futures; } /** * 将去重后的数据分段:总数/线程数 */ private List<List<PhotoModel>> split(List<PhotoModel> list) { List<PhotoModel> distinctList = coverDuplicate(list); int start, end; int total = distinctList.size(); List<List<PhotoModel>> splitList = new ArrayList<>(); if (total <= batchUpsertMaxRecordSize) { splitList.add(distinctList); return splitList; } for (int i = 0; i < threadSize; i++) { start = total / threadSize * i; end = total / threadSize * (i + 1); if (i == threadSize - 1) { end = total; } splitList.add(distinctList.subList(start, end)); } return splitList; } /** * 将personId重复的,用后面的覆盖前面的 */ private List<PhotoModel> coverDuplicate(List<PhotoModel> sourceList) { if (CollectionUtils.isEmpty(sourceList)) { return new ArrayList<>(); } List<PhotoModel> distinctList = sourceList.stream().collect( Collectors.toMap(PhotoModel::getHumanPicId, Function.identity(), (e1, e2) -> e2) ).values().stream().collect(Collectors.toList()); return distinctList; } }
四、consumer消费消息
@KafkaListener(id = "xxxKafka", groupId = "${config.kafka.humanGroupId}", topics = {"${config.kafka.humanTopic}"}, containerFactory = "batchFactoryHuman") public void xxxListener(List<ConsumerRecord<?, String>> list, Acknowledgment ack) { log.info("consume data,size:{},offset:{}", list.size(), list.get(0).offset()); List<HumanInfoKafkaModel> humanKafkaList = buildHumanKafkaList(list); if (CollectionUtils.isNotEmpty(humanKafkaList)) { ......一大堆消息解析和处理逻辑,返回photoListForUpsert try { List<Future<Void>> futures = batchProcessPhotoService.execute(photoListForUpsert); if (CollectionUtils.isNotEmpty(futures)) { for (Future<Void> future : futures) { //这里需要调用get()同步起来 future.get(); } } } catch (Exception e) { //photoListForUpsert异常处理,入库,发到另一个topic单独一条条消费均可 } } //全部逻辑执行正常,全部数据入库完毕,提交offset ack.acknowledge(); }
五、配置Kafka消费工厂,可配置参数可查阅配置
@Configuration @EnableKafka public class KafkaConfig { @Autowired private Config config; @Value("${kafka.max.poll.records}") private String maxPollRecords; @Value("${kafka.max.partition.fetch.bytes}") private String maxPartitionFetchBytes; @Value("${kafka.consumer.thread.size}") private Integer threadRecord; /** * POOL_PHOTO_IMPORT_TOPIC导入 kafka属性配置 */ public Map<String, Object> consumerConfigsPhoto() { Map<String, Object> props = Maps.newHashMap(); props.put(KafkaConstants.KAFKA_CONNECT_BROKERS, config.getKafka().getPhotoBroker()); props.put(KafkaConstants.KAFKA_CONNECT_GROUP_ID, config.getKafka().getPhotoGroupId()); buildProp(props, true); return props; } /** * POOL_PHOTO_IMPORT_TOPIC导入 消费工厂 * containerFactory = "batchFactoryPhoto" */ @Bean public KafkaListenerContainerFactory<?> batchFactoryPhoto() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigsPhoto())); // 设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG factory.setBatchListener(true); //消费线程数量设置,这里采用的是单线程消费+后续worker线程池处理 factory.setConcurrency(threadRecord); factory.setAutoStartup(Boolean.valueOf(config.getKafka().getPhotoSwitch())); factory.getContainerProperties().setAckMode((ContainerProperties.AckMode.MANUAL_IMMEDIATE)); return factory; } }
consumer端,如果是要保证顺序,最好是多实例(每个实例默认单线程消费)
消费下来的消息需要做预处理,可以先内部去重覆盖,然后分组
每个组一个线程,给线程池统一处理;异步执行需要指定注入的线程池名,并且要有返回
在for循环中必须调用Future的get方法同步起来,保证每一批消息彻底处理完毕再接着消费
原文:https://www.cnblogs.com/yb38156/p/14605676.html