百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程网 > 正文

ElasticSearch客户端批量处理操作bulkIndexAsync引发的内存泄漏

yuyutoo 2024-10-28 20:22 3 浏览 0 评论

没错这又是一次血案,不过是在测试环境发现的,还好不是上次线上导致的CPU300%,这次及时发现,避免了线上血案,不过我还是要复盘一下的,下面就看看我的分析,看看有没有什么错误的地方

场景描述

场景很简单,就是把我MySQL中一个表的数据全部迁移到ElasticSearch的一个新索引上(因为老的索引字段匹配不上,随着业务迭代MySQL的字段类型发生了变化,ES又没有办法动态更新,所以最简单的方法就是同步一次数据啦) 然后我就想当然的写了一段同步脚本,然后满怀信心的去开发环境自测,没问题啊,10W多条数据很快啊,当时直接就同步完了,一条数据不差,心里暗想,这次真简单,然后就提测,去测试环境跑数据了,然后血案就发生了

跑着跑着,测试姐姐就说测试环境不能用了,我想着我只是同步一个表的数据,就算有错应该也只是影响那个页面的查询吧,然后我就打开了测试环境页面,好家伙,每个页面都报错了,都是can not found url,再结合我们的ShenYU网关,就想到应该是节点崩了,没有Controller注册到网关上,所以请求就没办法映射。然后我就去容器里面看报错嘛,但是错误显示的是超时:ElasticSearch连接超时,ZK连接超时等等等等。比较茫然,这个错误没有太大的信息呀,师兄提醒我是不是内存崩了,但是我并没有看到OOM的提醒呀,所以我就顺着这个思路开始了排查

思路验证

想到可能是OOM的原因,为了测验,我就把代码的异步批量处理改成了同步单条索引,然后去测试环境验证(为什么不去开发环境?我下面会说到) 发现如果是同步单条跑的话,一点问题没有就是速度有点慢,那不行,生产几千万呢,算下来的话,几天也同步不完,但是我从这就确定了应该是内存的问题:是内存溢出了?还是内存泄漏了?接下来,看看我经过排查过后得到的重点代码(罪魁祸首)

//拉取5000条数据
//这条SQL就是查询主键id > redis中已经同步过后的ID 的后5000条数据
//为了减轻MySQL压力
List<xxxDO> aDOS = xxxRepository.selectById(Integer.valueOf(redisService.get(idKey)));
//同步数据中间逻辑省略
//开始索引
xxxxEsService.indexEs(aDOS);


//再来看看这个indexEs方法
aDOS.forEach(aDO -> {
            EsABO esABO = new EsABO();
            //把aDO转换成esABO然后
            esHighLevelRestClient.bulkIndexAsync(Arrays.asList(esABO));
}
//罪魁祸手就是这个bulkIndexAsync接下来看看这个方法
 public void bulkIndexAsync(List<IDocument> dataList) {
        if (CollectionUtils.isEmpty(dataList)) {
            return;
        }
        BulkRequest request = new BulkRequest();
        dataList.forEach(data -> {
            IndexRequest indexRequest = new IndexRequest(indexName,
                    mappingTypeName, data.documentId())
                    .source(JSON.toJSONString(data), XContentType.JSON)
                    .routing(data.routing());
            request.add(indexRequest);
        });

        long startTime = System.currentTimeMillis();
        client.bulkAsync(request, RequestOptions.DEFAULT, new ActionListener() {

            @Override
            public void onResponse(Object o) {
                log.info("es index cost time:{}", System.currentTimeMillis() - startTime);
            }

            @Override
            public void onFailure(Exception e) {
                log.error("es index fail, cost time:{}, Exception:", System.currentTimeMillis() - startTime, e);
            }
        });
    }

解释下,这个方法:就是传过来一条记录,然后这个记录就会被add方法添加到这个BulkRequest里面,然后ESClient会执行这个bulkAsync就去异步的同步数据了。这样看起来没啥问题啊,我下面再放一下这个bulkAsync方法的源码

    public final void bulkAsync(BulkRequest bulkRequest, RequestOptions options, ActionListener<BulkResponse> listener) {
        performRequestAsyncAndParseEntity(bulkRequest, RequestConverters::bulk, options, BulkResponse::fromXContent, listener, emptySet());
    }

它里面就是对我们提交的这个bulkRequest处理的逻辑,看上去没什么问题,我再放一张图,大家看看能不能察觉到什么: 这是我本地跑Jprofiler存储到快照中的几张图 1.同步数据接口未开启的时候

2. 同步数据开启

大家发现什么问题了么?

开启同步数据后,我的堆越来越大,这些也没什么,关键是经过GC之后,堆的大小并没有明显变化,问题就在这了,也就是说我的一个大对象并没有进行GC回收,是那个对象呢?没错就是那个bulkRequest

分析

一开始我拉出来5000条数据塞给这个bulkRequest,可能刚开始他能处理过来,并且进行了GC回收直接回收掉了,看下图

这个时候我们的速度应该能跟得上,但是我们的MySQL源源不断的塞给他5000,很快这个堆就又满了,但是我们的这个bulkRequest还没有同步完,还有强引用(可能现在这个bulkRequest里面有10W数据,但是前5W已经处理完了),也就是说随着速度跟不上来,导致这个bulkRequest越来愈大

而且以前处理过的也无法销毁,所以导致内存泄漏,进而系统内存OOM崩溃,但是为什么没有OOM,这个我只能猜测可能是速度太快,根本就没来得及然后系统就僵死了,一行日志都没有了,整个系统直接崩溃了

怎么解决?

关键原因就在于及时的回收已经处理过的request,所以采用另外一种批量处理方式

public  void bulkProcessorBatchInsert(List<EsUpdateEntityBO<T>> entityBOList) {
        if (entityBOList.size()==0) {
            return;
        }
        List<IndexRequest> indexRequests=new ArrayList<>();

        //更新的数据
        entityBOList.forEach(e->{
            //获取id
            IndexRequest indexRequest = new IndexRequest();
            indexRequest.routing(e.getRouting());
            indexRequest.index(indexName);
            indexRequest.type(mappingTypeName);
            //更新的id
            indexRequest.id(e.getDocumentId());
            //更新的数据
            String jsonString = JSON.toJSONString(e.getT());
            indexRequest.source(jsonString, XContentType.JSON);
            indexRequests.add(indexRequest);
        });
        indexRequests.forEach(bulkProcessor::add);
    }

这不是和刚才的一样么?其实不一样,因为关键代码在下面 我在建立ES客户端的时候是这样的

private BulkProcessor createBulkProcessor(RestHighLevelClient restHighLevelClient,Boolean async) {
		//中间很多逻辑
        
        BulkProcessor.Builder builder;
        if (async) {
            //异步
            //这里也有很多逻辑
        }
        //到达1000条时刷新
        builder.setBulkActions(2000);
        //内存到达8M时刷新
        builder.setBulkSize(new ByteSizeValue(8L, ByteSizeUnit.MB));
        //设置的刷新间隔1s
        builder.setFlushInterval(TimeValue.timeValueSeconds(1));
        //设置允许执行的并发请求数。
        builder.setConcurrentRequests(8);
        //设置重试策略
        builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1), 3));
        return builder.build();
    }

关键点就在于,我不无限制的处理请求,每当我的bucket攒够2000条,或者我的内存达到了8M,或者已经距离上次处理间隔了1S了我就主动进行刷盘(同步数据)操作,这样的话就能避免上面那种情况了

难道ES官方不知道这个问题么?我看了官方文档,确实没找到他这个 bulkAsync中这个BulkRequest bulkRequest有什么刷盘策略,难道就是无限制增长么?这个我还没有看完他的源码,可能自己的能力也不够,如果有老哥知道,评论区留言 至此用了新的异步批量请求的操作,测试环境很快很安全的同步了数据,而且预发环境的上千万的数据也很快很安全的同步完成,我心中的大石头终于放下来了 分享就到这了,如果有什么不对的地方,请大佬指出,我也好改正学习下 好了,写代码去了

作者:会编程的王学长
链接:https://juejin.cn/post/7090355674655227918

相关推荐

Java开发中如何优雅地避免OOM(OutOfMemoryError)

Java开发中如何优雅地避免OOM(OutOfMemoryError)在这个信息化高速发展的时代,内存就像程序员手中的笔,缺了它就什么都写不出来。而OOM(OutOfMemoryError)就像是横在...

常见的JVM调优方法和步骤

1、内存调优堆内存设置:通过-Xms和-Xmx参数调整初始和最大堆内存大小-Xms:初始堆大小(如-Xms512M)-Xmx:最大堆大小(如-Xmx2048M)调整新生代和老年代的比例...

Java中9种常见的CMS GC问题分析与解决(一)

目前,互联网上Java的...

JDK21新特性:Prepare to Disallow the Dynamic Loading of Agents

PreparetoDisallowtheDynamicLoadingofAgentsJEP451:准备禁止动态加载代理摘要...

Java程序GC垃圾回收机制优化指南

Java程序GC垃圾回收机制优化指南作为一个Java开发者,我们经常会在任务管理器里看到Java进程占用内存不断增长,然后突然下降的现象。这其实就是在Java虚拟机中运行的垃圾回收(GC)机制在起作用...

Java Java命令学习系列(一)——Jps

jps位于jdk的bin目录下,其作用是显示当前系统的java进程情况,及其id号。jps相当于Solaris进程工具ps。不象”pgrepjava”或”ps-efgrepjava”,jps...

面试题专题:头条一面参考答案(003)

前两篇文章也都是介绍头条一面的内容及参考答案...

Java JVM原理与性能调优:从基础到高级应用

一、JVM基础架构与内存模型1.1JVM整体架构概览Java虚拟机(JVM)是Java程序运行的基石,它由以下几个核心子系统组成:...

死锁攻防战:阿里架构师教你用3种核武器杜绝程序僵死

从线程转储分析到银行家算法,彻底掌握大厂必考的死锁解决方案以下是为Java死锁问题设计的结构化技术解析方案,包含代码级解决方案与高频追问应对策略:...

Java 1.8 虚拟机内存分布详解

Java1.8虚拟机内存分布详解Java1.8的JVM内存布局相比早期版本有显著变化(如永久代被元空间取代)。以下是其核心内存区域的划分、作用及配置参数:一、JVM内存整体结构...

Java 多线程开发难题?这篇文章给你答案!

作为互联网大厂的后端开发人员,在Java多线程开发过程中,必然会面临诸多复杂且具有挑战性的问题。在高并发场景下,各类潜在问题对系统的稳定性与性能产生严重影响,本文将深入探讨这些问题,并提供全面且有...

软件性能调优全攻略:从瓶颈定位到工具应用

性能调优是软件测试中的重要环节,旨在提高系统的响应时间、吞吐量、并发能力、资源利用率,并降低系统崩溃或卡顿的风险。通常,性能调优涉及发现性能瓶颈、分析问题根因、优化代码和系统配置等步骤,调优之前需要先...

JVM性能优化实战技巧

JVM性能优化实战技巧在现代企业级应用开发中,JavaVirtualMachine(JVM)作为承载Java应用程序的核心引擎,其性能直接决定了系统的响应速度、吞吐量以及资源利用率。因此,掌握一些...

JVM 深度解析:运行时数据区域、分代回收与垃圾回收机制全攻略

共同学习,有错欢迎指出。JVM运行时数据区域1.程序计数器程序计数器是一块较小的内存空间,可看作当前线程所执行的字节码的行号指示器。在虚拟机概念模型里,字节码解释器通过改变这个计数器的值选取下一条...

JVM内存管理详解与调优实战

JVM内存管理详解与调优实战Java虚拟机(JVM)作为Java程序运行的核心组件,其内存管理机制直接影响着应用程序的性能表现。今天,咱们就来一场既严肃又有趣的JVM内存管理之旅,看看这个“幕后英雄”...

取消回复欢迎 发表评论: