现在我们如果有大量的文档(例如10000000万条文档)需要写入es
的某条索引中,该怎么办呢?
import time
from elasticsearch import Elasticsearch
es = Elasticsearch()
def timer(func):
def wrapper(*args, **kwargs):
start = time.time()
res = func(*args, **kwargs)
print(‘共耗时约 {:.2f} 秒‘.format(time.time() - start))
return res
return wrapper
@timer
def create_data():
""" 写入数据 """
for line in range(100):
es.index(index=‘s2‘, doc_type=‘doc‘, body={‘title‘: line})
if __name__ == ‘__main__‘:
create_data() # 执行结果大约耗时 7.79 秒
import time
from elasticsearch import Elasticsearch
from elasticsearch import helpers
es = Elasticsearch()
def timer(func):
def wrapper(*args, **kwargs):
start = time.time()
res = func(*args, **kwargs)
print(‘共耗时约 {:.2f} 秒‘.format(time.time() - start))
return res
return wrapper
@timer
def create_data():
""" 写入数据 """
for line in range(100):
es.index(index=‘s2‘, doc_type=‘doc‘, body={‘title‘: line})
@timer
def batch_data():
""" 批量写入数据 """
action = [{
"_index": "s2",
"_type": "doc",
"_source": {
"title": i
}
} for i in range(10000000)]
helpers.bulk(es, action)
if __name__ == ‘__main__‘:
# create_data()
batch_data() # MemoryError
我们通过elasticsearch模块导入helper
,通过helper.bulk
来批量处理大量的数据。首先我们将所有的数据定义成字典形式,各字段含义如下:
_index
对应索引名称,并且该索引必须存在。_type
对应类型名称。_source
对应的字典内,每一篇文档的字段和值,可有有多个字段。首先将每一篇文档(组成的字典)都整理成一个大的列表,然后,通过helper.bulk(es, action)
将这个列表写入到es对象中。
然后,这个程序要执行的话——你就要考虑,这个一千万个元素的列表,是否会把你的内存撑爆(MemoryError
)!很可能还没到没到写入es那一步,却因为列表过大导致内存错误而使写入程序崩溃!很不幸,我的程序报错了。下图是我在生成列表的时候,观察任务管理器的进程信息,可以发现此时Python消耗了大量的系统资源,而运行es实例的Java虚拟机却没什么变动。
解决办法是什么呢?我们可以分批写入,比如我们一次生成长度为一万的列表,再循环着去把一千万的任务完成。这样, Python和Java虚拟机达到负载均衡。
下面的示例测试10万条数据分批写入的速度
import time
from elasticsearch import Elasticsearch
from elasticsearch import helpers
es = Elasticsearch()
def timer(func):
def wrapper(*args, **kwargs):
start = time.time()
res = func(*args, **kwargs)
print(‘共耗时约 {:.2f} 秒‘.format(time.time() - start))
return res
return wrapper
@timer
def batch_data():
""" 批量写入数据 """
# 分批写
# for i in range(1, 10000001, 10000):
# action = [{
# "_index": "s2",
# "_type": "doc",
# "_source": {
# "title": k
# }
# } for k in range(i, i + 10000)]
# helpers.bulk(es, action)
# 使用生成器
for i in range(1, 100001, 1000):
action = ({
"_index": "s2",
"_type": "doc",
"_source": {
"title": k
}
} for k in range(i, i + 1000))
helpers.bulk(es, action)
if __name__ == ‘__main__‘:
# create_data()
batch_data() # 耗时 93.53 s
采用 Python
生成器
import time
from elasticsearch import Elasticsearch
from elasticsearch import helpers
es = Elasticsearch()
def timer(func):
def wrapper(*args, **kwargs):
start = time.time()
res = func(*args, **kwargs)
print(‘共耗时约 {:.2f} 秒‘.format(time.time() - start))
return res
return wrapper
@timer
def gen():
""" 使用生成器批量写入数据 """
action = ({
"_index": "s2",
"_type": "doc",
"_source": {
"title": i
}
} for i in range(100000))
helpers.bulk(es, action)
if __name__ == ‘__main__‘:
# create_data()
# batch_data()
gen() # 约90s
参考文章:
https://www.cnblogs.com/Neeo/articles/10788573.html
原文:https://www.cnblogs.com/midworld/p/13670001.html