需求:在从银行数据库中取出 几十万数据时,需要对 每行数据进行相关操作,通过pandas的dataframe发现数据处理过慢,于是 对数据进行 分段后 通过 线程进行处理;
如下给出 测试版代码,通过 list 分段模拟 pandas 的 dataframe ;
1 # -*- coding: utf-8 -*- 2 # (C) Guangcai Ren <renguangcai@jiaaocap.com> 3 # All rights reserved 4 # create time ‘2019/6/26 14:41‘ 5 import math 6 import random 7 import time 8 from threading import Thread 9 10 _result_list = [] 11 12 13 def split_df(): 14 # 线程列表 15 thread_list = [] 16 # 需要处理的数据 17 _l = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] 18 # 每个线程处理的数据大小 19 split_count = 2 20 # 需要的线程个数 21 times = math.ceil(len(_l) / split_count) 22 count = 0 23 for item in range(times): 24 _list = _l[count: count + split_count] 25 # 线程相关处理 26 thread = Thread(target=work, args=(item, _list,)) 27 thread_list.append(thread) 28 # 在子线程中运行任务 29 thread.start() 30 count += split_count 31 32 # 线程同步,等待子线程结束任务,主线程再结束 33 for _item in thread_list: 34 _item.join() 35 36 37 def work(df, _list): 38 """ 线程执行的任务,让程序随机sleep几秒 39 40 :param df: 41 :param _list: 42 :return: 43 """ 44 sleep_time = random.randint(1, 5) 45 print(f‘count is {df},sleep {sleep_time},list is {_list}‘) 46 time.sleep(sleep_time) 47 _result_list.append(df) 48 49 50 def use(): 51 split_df() 52 53 54 if __name__ == ‘__main__‘: 55 y = use() 56 print(len(_result_list), _result_list)
响应结果如下:
注意点:
脚本中的 _result_list 在项目中 要 放在 函数中,不能直接放在 路由类中,否则会造成 多次请求 数据 污染;
定义线程任务时 thread = Thread(target=work, args=(item, _list,)) 代码中的 work函数 和 参数 要分开,否则 多线程无效
注意线程数不能过多
原文:https://www.cnblogs.com/rgcLOVEyaya/p/RGC_LOVE_YAYA_1103_3days.html