public class ScmcommissionLineServiceImpl{ //导入数据 public void importData(List list){ //500分一组 int count = 500; int listSize = list.size(); //线程数 int RunSize = (listSize / count)+1; ThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(RunSize); CountDownLatch countDownLatch = new CountDownLatch(RunSize); for (int i = 0; i < RunSize; i++) { List newList = null ; if((i+1)==RunSize){ int startIndex = (i*count);; int endIndex = list.size(); newList =list.subList(startIndex,endIndex); }else{ int startIndex = i*count;; int endIndex = (i+1)*count-1; newList =list.subList(startIndex,endIndex); } ImportTask task= new ImportTask (newList,countDownLatch); executor.execute(task); } countDownLatch.await(); //主线程等待所有线程完成任务 //所有线程完成任务后的一些业务 System.out.println("插入数据完成!"); ..... //关闭线程池 executor.shutdown(); } } class ImportTask implements Runnable{ private List list; private CountDownLatch countDownLatch; public ImportTask (List list,CountDownLatch countDownLatch){ this.list = list ; this.countDownLatch=countDownLatch; } @Override public void run() { if(null!=list){ //业务逻辑 } countDownLatch.countDown();//发出线程任务完成的信号 } 版权声明:本文为CSDN博主「尤尤尤奴斯」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。 原文链接:https://blog.csdn.net/weixin_38158701/article/details/84966713
//最大线程数控制
private static int MAX_THREADS= 5;
//跑批分页大小
private static int EXPIRED_PAGE_SIZE = 30;
private void dataHandler(List<SyncFileDto> list) {
//处理数据数量
int listSize = list.size();
//线程数
int runSize;
if (listSize % EXPIRED_PAGE_SIZE == 0) {
runSize = (listSize / EXPIRED_PAGE_SIZE);
} else {
runSize = (listSize / EXPIRED_PAGE_SIZE) + 1;
}
ThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(runSize);
CountDownLatch countDownLatch = new CountDownLatch(runSize);
//最大并发线程数控制
final Semaphore semaphore = new Semaphore(MAX_THREADS);
List handleList = null;
for (int i = 0; i < runSize; i++) {
if ((i + 1) == runSize) {
int startIndex = i * EXPIRED_PAGE_SIZE;
int endIndex = list.size();
handleList = list.subList(startIndex, endIndex);
} else {
int startIndex = i * EXPIRED_PAGE_SIZE;
int endIndex = (i + 1) * EXPIRED_PAGE_SIZE;
handleList = list.subList(startIndex, endIndex);
}
SyncTask task = new SyncTask(handleList, countDownLatch, semaphore);
executor.execute(task);
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally{
executor.shutdown();
}
}
class SyncTask implements Runnable {
private List<SyncFileDto> list;
private CountDownLatch countDownLatch;
private Semaphore semaphore;
public SyncSyncTask(List<SyncFileDto> list, CountDownLatch countDownLatch, Semaphore semaphore) {
this.list = list;
this.countDownLatch = countDownLatch;
this.semaphore = semaphore;
}
@Override
public void run() {
if (!CollectionUtils.isEmpty(list)) {
try {
semaphore.acquire();
list.stream().forEach(fileDto -> {
//业务处理
});
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
//线程任务完成
countDownLatch.countDown();
}
}
原文:https://www.cnblogs.com/ronniechen/p/14779113.html