如果要获取数据并分析,例如用for循环,那只能按顺序读取,这样就会造成效率低下:
循环读取多文件过慢,本文分别使用多线程、多进程方法对文件进行读取
由于处理完文件往往需要获取返回值,可以使用以下两种方法:
import queue
q = queue.Queue()
def read_file(file):
with open(os.path.join(path,file),‘r‘) as f:
q.put()
1 自定义get_result()方法,取返回值
import threading
class ReadThread(threading.Thread):
def __init__(self,file):
threading.Thread.__init__(self) #super(ReadThread, self).__init__()
self.file = file
def run(self):
self.res = read_file(self.file)
def get_result(self):
#注:此方法特别慢
return self.res
threads = []
for file in os.listdir(path):
t = ReadThread(file)
threads.append(t)
[t.start() for t in threads]
[t.join() for t in threads]
for t in threads:
t.get_result()
2 使用队列
#改用多线程读取
import threading
start = time.time()
df = pd.DataFrame()
threads = []
for file in os.listdir(path):
t = threading.Thread(target=read_file,args=(file,))
threads.append(t)
[t.start() for t in threads]
[t.join() for t in threads]
while not q.empty():
q.get()
q.task_done()
print("read time:",time.time()-start)
关于task_done
如果线程里每从队列里取一次,但没有执行task_done(),则join无法判断队列到底有没有结束,在最后执行个join()是等不到结果的,会一直挂起。
理解Queue队列中join()与task_done()的关系 - andyzhang- - 博客园 (cnblogs.com)
python伪多线程,适合IO密集型任务
如果是一个计算为主的程序(专业一点称为CPU密集型程序),这一点确实是比较吃亏的,每个线程运行一遍,就相当于单线程再跑,甚至比单线程还要慢——CPU切换线程的上下文也是要有开销的。但是,如果是一个磁盘或网络为主的程序(IO密集型)就不同了。一个线程处在IO等待的时候,另一个线程还可以在CPU里面跑,有时候CPU闲着没事干,所有的线程都在等着IO,这时候他们就是同时的了,而单线程的话此时还是在一个一个等待的。我们都知道IO的速度比起CPU来是慢到令人发指的,python的多线程就在这时候发挥作用了。比方说多线程网络传输,多线程往不同的目录写文件,等等
python 实现多线程并返回函数返回值的三种方法_zxc的博客-CSDN博客_python多线程函数返回值
最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目
多线程线程数设置多少合适_东东的专栏-CSDN博客_线程数设置多少合适
本文阻塞、非阻塞主要指取结果时是否使用回调函数,回调函数可以避免阻塞
def read_file(file):
with open(os.path.join(path,file),‘r‘) as f:
data = json.load(f)
return data
#线程池,取结果时会阻塞
from concurrent.futures import ThreadPoolExecutor,as_completed
df = pd.DataFrame()
start_time = time.time()
with ThreadPoolExecutor(20) as executor: # 创建 ThreadPoolExecutor
future_list = [executor.submit(read_file, file) for file in os.listdir(path)] # 提交任务
for future in as_completed(future_list):
result = future.result() # 获取任务结果
df = df.append(result,ignore_index=True)
print(time.time()-start_time)
#线程池,非阻塞
from concurrent.futures import ThreadPoolExecutor,as_completed
df = pd.DataFrame()
start_time = time.time()
with ThreadPoolExecutor(20) as executor: # 创建 ThreadPoolExecutor
future_list = [executor.submit(read_file, file) for file in os.listdir(path)] # 提交任务
def get_result(future):
global df
df = df.append(future.result(),ignore_index=True)
for future in as_completed(future_list):
future.add_done_callback(get_result)
print(time.time()-start_time)
进程数一般设置为 核数-1
from concurrent.futures import ThreadPoolExecutor,as_completed
net_df = pd.DataFrame()
start_time = time.time()
# 提交任务
with ThreadPoolExecutor(20) as executor: # 创建 ThreadPoolExecutor
future_list = [executor.submit(read_file, file) for file in os.listdir(path)]
for future in as_completed(future_list):
result = future.result() # 获取任务结果
net_df = net_df.append(result,ignore_index=True)
print(time.time()-start_time)
#进程池,非阻塞获取结果
from concurrent.futures import ProcessPoolExecutor,as_completed
df = pd.DataFrame()
start_time = time.time()
# 提交任务
with ProcessPoolExecutor(7) as executor: # 创建 ThreadPoolExecutor
future_list = [executor.submit(read_file, file) for file in os.listdir(path)]
def get_result(future):
global df
df = df.append(future.result(),ignore_index=True)
for future in as_completed(future_list):
future.add_done_callback(get_result)
print(time.time()-start_time)
Python线程池及其原理和使用(超级详细) (biancheng.net)
[第53天: Python 线程池 - 纯洁的微笑 - 博客园 (cnblogs.com)](https://www.cnblogs.com/ityouknow/p/12993166.html#:~:text=在 python 中使用线程池有两种方式,一种是基于第三方库 threadpool,,另一种是基于 python3 新引入的库 concurrent.futures.ThreadPoolExecutor 。)
Python concurrent.future线程池和进程池_Gordennizaicunzai的博客-CSDN博客
原文:https://www.cnblogs.com/gongyanzh/p/14820926.html