好久没来了,前段时间项目测试需要,需要把现网的 es 数据导出导入测试环境方便测试。就写了一个小脚本。拿出来,说不定以后有童鞋有需要呢。直接上干货了。。。
# 导出脚本

import json
import os
import time
import requests


class exportEsData():
    size = 10000
    def __init__(self, url,index,type):
        self.url = url+"/"+index+"/"+type+"/_search"
        self.urlput=url+"/"+index+"/_settings"
        self.index = index
        self.type = type
    def exportData(self):
        print("export data begin...")
        puthead={"Content-Type": "application/json"}
        param={ "index.max_result_window" :"1000000"}   #修改index max_result_window数据超过100万,一般是根据实际情况,进行修改
        pload=json.dumps(param)
        requests.put(url=self.urlput,data=pload,headers=puthead)
        begin = time.time()
        try:
            os.remove(self.index+"_"+self.type+".json")
        except:
            pass
        msg = requests.get(self.url).text
        print(msg)
        obj = json.loads(msg)
        num = obj["hits"]["total"]
        print(num)
        start = 0
        end =  num/self.size+1
        while(start<end):
            msg =requests.get(self.url+"?from="+str(start*self.size)+"&size="+str(self.size)).text
            self.writeFile(msg)
            start=start+1
        print("export data end!!!\n\t total consuming time:"+str(time.time()-begin)+"s")
    def writeFile(self,msg):
        obj = json.loads(msg)
        vals = obj["hits"]["hits"]
        try:
            f = open(self.index+"_"+self.type+".json","a")
            for val in vals:
                a = json.dumps(val["_source"],ensure_ascii=False)
                f.write(a+"\n")
        finally:
            f.flush()
            f.close()


if __name__ == '__main__':
    exportEsData("http://ip:port","index","type").exportData() #ip,port,index,type根据实际情况替换

# 导入脚本

# coding: utf-8

from elasticsearch import Elasticsearch
import json
import requests
from elasticsearch import helpers

class importEsData():
    def  __init__(self,url,index,type):
        self.url = url
        self.urlputindex=url+"/"+index
        self.urlputmapping=url+"/"+index+"/"+type+"/_mapping"
        self.index = index
        self.type = type
    def importData(self):
        es=Elasticsearch(self.url)
        requests.put(self.urlputindex)  #创建index
        param={mappings}    #这个可以用 http://ip:port/index 获取mappings 来替换mappings内容
        pload=json.dumps(param)
        requests.put(self.urlputmapping,pload)   #创建mappings

        actions=[]  #收集性能数据集合
        f = open(self.index+"_"+self.type+".json",encoding='gbk')

        while 1:
            line=f.readline()
            if not line:
                break
            lined=json.loads(line.encode())
            properties=lined["properties"]   #properties根据实际数据进行替换
            action = {
                    "_index": self.index,
                    "_type": self.type,
                    "_source": {
                        'properties': properties  #properties根据实际数据进行替换
                    }
                }
            actions.append(action)
            if len(actions)==10000:
                helpers.bulk(es, actions)
                del actions[0:len(actions)]
        f.close()
        helpers.bulk(es, actions)

if __name__ == '__main__':
    importEsData("http://ip:port","index","type").importData()  #ip,port,index,type根据实际情况替换


↙↙↙阅读原文可查看相关链接,并与作者交流