好久没来了,前段时间项目测试需要,需要把现网的 es 数据导出导入测试环境方便测试。就写了一个小脚本。拿出来,说不定以后有童鞋有需要呢。直接上干货了。。。
# 导出脚本
import json
import os
import time
import requests
class exportEsData():
size = 10000
def __init__(self, url,index,type):
self.url = url+"/"+index+"/"+type+"/_search"
self.urlput=url+"/"+index+"/_settings"
self.index = index
self.type = type
def exportData(self):
print("export data begin...")
puthead={"Content-Type": "application/json"}
param={ "index.max_result_window" :"1000000"} #修改index max_result_window数据超过100万,一般是根据实际情况,进行修改
pload=json.dumps(param)
requests.put(url=self.urlput,data=pload,headers=puthead)
begin = time.time()
try:
os.remove(self.index+"_"+self.type+".json")
except:
pass
msg = requests.get(self.url).text
print(msg)
obj = json.loads(msg)
num = obj["hits"]["total"]
print(num)
start = 0
end = num/self.size+1
while(start<end):
msg =requests.get(self.url+"?from="+str(start*self.size)+"&size="+str(self.size)).text
self.writeFile(msg)
start=start+1
print("export data end!!!\n\t total consuming time:"+str(time.time()-begin)+"s")
def writeFile(self,msg):
obj = json.loads(msg)
vals = obj["hits"]["hits"]
try:
f = open(self.index+"_"+self.type+".json","a")
for val in vals:
a = json.dumps(val["_source"],ensure_ascii=False)
f.write(a+"\n")
finally:
f.flush()
f.close()
if __name__ == '__main__':
exportEsData("http://ip:port","index","type").exportData() #ip,port,index,type根据实际情况替换
# 导入脚本
# coding: utf-8
from elasticsearch import Elasticsearch
import json
import requests
from elasticsearch import helpers
class importEsData():
def __init__(self,url,index,type):
self.url = url
self.urlputindex=url+"/"+index
self.urlputmapping=url+"/"+index+"/"+type+"/_mapping"
self.index = index
self.type = type
def importData(self):
es=Elasticsearch(self.url)
requests.put(self.urlputindex) #创建index
param={mappings} #这个可以用 http://ip:port/index 获取mappings 来替换mappings内容
pload=json.dumps(param)
requests.put(self.urlputmapping,pload) #创建mappings
actions=[] #收集性能数据集合
f = open(self.index+"_"+self.type+".json",encoding='gbk')
while 1:
line=f.readline()
if not line:
break
lined=json.loads(line.encode())
properties=lined["properties"] #properties根据实际数据进行替换
action = {
"_index": self.index,
"_type": self.type,
"_source": {
'properties': properties #properties根据实际数据进行替换
}
}
actions.append(action)
if len(actions)==10000:
helpers.bulk(es, actions)
del actions[0:len(actions)]
f.close()
helpers.bulk(es, actions)
if __name__ == '__main__':
importEsData("http://ip:port","index","type").importData() #ip,port,index,type根据实际情况替换