开发同学有个数据同步的需求,正好我这边有闲,就接过来做了,
主要是将 mysql 的几张表,同步至 elasticsearch,全量同步
测试环境:4600 条
生产环境:3000 万条
首先,es 集群环境搭建
然后,编写 java 代码,完成同步功能
因为当前几张表都是基于 accountId 的关联的,而 accountId 是自增的
主要分以下几步:
这里借鉴了搜索和其他业务线的开发同学搭建的方式,完成搭建,可另写帖子完成
@Override
public void memberAccountInfoSync() {
int pageSize = 1000;
long channelMinId = queryChannelMinId();
long channelMaxId = queryChannelMaxId();
int totalCount = queryTotalCount(channelMinId, channelMaxId);
PageTaskUtil.handlePageTaskForBizInvoke(new PageTaskUtil.PageTaskForBizInvoke<ESMemberAccountInfo>() {
@Override
public List queryPageData(int pageNo, int pageSize) {
long pageStartId = getPageStartId(channelMinId, pageSize, pageNo);
long pageEndId = getPageEndId(pageStartId, pageSize);
List<CpsBaseAccountInfo> cpsBaseAccountInfoList = cpsBaseAccountService.listAccountIdsWithInitialValue((byte) CURR_CHANNEL.getCode(), pageStartId, pageEndId);
List<Long> accountIdList = cpsBaseAccountInfoList.stream().map(CpsBaseAccountInfo::getAccountId).collect(Collectors.toList());
// 性能优化,如果accountIdList为空,则进入下一次循环
if (CollectionUtils.isEmpty(accountIdList)) {
return new ArrayList(0);
}
// 捞出其他表数据
List<ESMemberAccountInfo> esMemberAccountInfoList = new ArrayList<>();
cpsBaseAccountInfoList.forEach(one -> {
long accountId = one.getAccountId();
ESMemberAccountInfo e = new ESMemberAccountInfo();
// 合并数据到es对象
esMemberAccountInfoList.add(e);
});
return esMemberAccountInfoList;
}
@Override
public void pageTask(List<ESMemberAccountInfo> pageList, Object res) {
// 如果集合为空,直接返回
if (CollectionUtils.isEmpty(pageList)) {
return;
}
CountDownLatch countDownLatch = new CountDownLatch(pageList.size());
for (ESMemberAccountInfo esMemberAccountInfo : pageList) {
final Long accountId = esMemberAccountInfo.getAccountId();
esSyncThreadPoolTaskExecutor.execute(new Runnable() {
@Override
public void run() {
try {
// 查询es中是否存在该用户
if (!isExistAccountInES(esMemberAccountInfo)) {
insertAccountInfoToES(esMemberAccountInfo);
}
} catch (Exception e) {
log.error("es accountInfo sync error, accountId: {}", accountId);
} finally {
countDownLatch.countDown();
}
}
});
}
try {
countDownLatch.await();
} catch (Exception e) {
log.error("es sync error" + e.getMessage());
}
}
}, totalCount, pageSize, 10, "esSync", null);
}
第一版,因为不知道 es 有批量插入的功能,是一条一条的往 es 中插入数据的,然后
在插入操作之前还查了一下 es 中是否存在该数据,
不存在则插入,整个功能完成后,测试环境用时近 60 秒
private void insertAccountInfoToESBatch(List<ESMemberAccountInfo> esMemberAccountInfoList) throws Exception {
BulkRequest bulkRequest = new BulkRequest();
for (ESMemberAccountInfo esMemberAccountInfo: esMemberAccountInfoList) {
UpdateRequest updateRequest = new UpdateRequest(ESIndexEnum.ES_INDEX_BASE_ACCOUNT_INFO.getIndex(), ESIndexEnum.ES_INDEX_BASE_ACCOUNT_INFO.getType(),
esMemberAccountInfo.getAccountId().toString() + esMemberAccountInfo.getBizChannel().toString());
updateRequest.doc(JSON.parseObject(JSON.toJSONString(esMemberAccountInfo)));
updateRequest.docAsUpsert(true);
bulkRequest.add(updateRequest);
}
client.getRhlClient().bulk(bulkRequest, RequestOptions.DEFAULT);
}
第二版,改为批量插入 es,并且移除查询请求,测试环境用时近 30 秒
@Override
public void memberAccountInfoSync() {
int pageSize = 1000;
long channelMinId = queryChannelMinId();
long channelMaxId = queryChannelMaxId();
log.info("es全量同步任务,channelMinId is: {}, channelMaxId is: {}", channelMinId, channelMaxId);
int totalCount = queryTotalCount(channelMinId, channelMaxId);
int pageCount = totalCount % pageSize == 0 ? totalCount / pageSize : totalCount / pageSize + 1;
log.info("es全量同步任务,pageCount is: {}", pageCount);
// druid数据库连接池的大小
final int dbConnection = 10;
final int repeatTimes = pageCount % dbConnection == 0 ? pageCount / dbConnection : pageCount / dbConnection + 1;
log.info("es全量同步任务,repeatTimes is: {}", repeatTimes);
for (int i=1; i<repeatTimes+1; i++) {
CountDownLatch countDownLatch = new CountDownLatch(dbConnection);
for (int j=1; j<dbConnection+1; j++) {
log.info("es全量同步任务,channelMinId is: {}", channelMinId);
Runnable r = syncTask(channelMinId, pageSize, j + (i-1)*dbConnection, countDownLatch);
esSyncThreadPoolTaskExecutor.execute(r);
}
try {
countDownLatch.await();
} catch (Exception e) {
log.error("es全量同步任务 countDownLatch.await() error" + e.getMessage());
}
}
}
在第二版的基础上,对查询数据的动作也进行异步批量处理,所以整个不使用开发提供的帮助类了,自己写,但保留了批量查询处理的思想,测试环境用时近 6 秒
@Override
public void memberAccountInfoSync() {
int pageSize = 2000;
long channelMinId = queryChannelMinId();
long channelMaxId = queryChannelMaxId();
int totalCount = queryTotalCount(channelMinId, channelMaxId);
int pageCount = totalCount % pageSize == 0 ? totalCount / pageSize : totalCount / pageSize + 1;
log.info("es全量同步任务,pageCount: {}", pageCount);
CountDownLatch c = new CountDownLatch(pageCount);
// druid数据库连接池的大小,不要超过最大值40
esSyncThreadPoolTaskExecutor.setCorePoolSize(30);
esSyncThreadPoolTaskExecutor.setMaxPoolSize(30);
for (int i=0; i<pageCount; i++) {
try {
esSyncThreadPoolTaskExecutor.execute(syncTask(CURR_CHANNEL, channelMinId, channelMaxId, pageSize, i, c));
} catch (Exception e) {
e.printStackTrace();
}
}
try {
c.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
因为第三版仍然使用了批量处理的思想,在同一批任务中,是以最后一个任务跑完了时间为准的,所以会导致一些性能浪费,
在这里直接通过线程池本身提供的队列功能,通过参数控制线程池大小,避免将数据库连接池耗尽
这里需要注意的是,任务数最好要小于线程池的队列大小,避免生产者过快,将线程池队列塞满导致异常
同时,每页数量不能过大,这可能导致在写 es 时,操作时间过长,导致超过 es 客户端默认的最大连接时间
实际在上到预发环境的时候,测试了下,写 3000 万的数据,因为网络的关系,导致需要 15 个小时才能跑完;
优化方法是将应用与 es 布在同一个局域网里面,结果是 17 分钟完成
主要使用批量操作,线程池异步操作完成了单机优化