博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ElasticSearch:剖析query_and_fetch和query_then_fetch的区别
阅读量:2190 次
发布时间:2019-05-02

本文共 14122 字,大约阅读时间需要 47 分钟。

在ElasticSearch实际使用中遇到了一个问题,就是在数据量很大的情况下做聚合查询(aggregation)会导致内存溢出。当时看了文档,猜测修改search_type能避免内存溢出。实际测试发现,在数据量相同的情况下,search_type设置为query_and_fetch的聚合查询不会导致内存溢出,而默认的query_then_fetch则会内存溢出。本文就从源码层面分析这两种search_type的区别。

搜索请求处理过程

当一个搜索请求发送到节点之后,节点首先判断出这是一个搜索请求,然后将这个请求传递给TransportSearchAction。这个类负责处理所有的搜索请求。

TransportSearchAction负责根据搜索请求中的search_type将本次的搜索请求传递给对应的类。核心代码如下:

if (searchRequest.searchType() == DFS_QUERY_THEN_FETCH) {    dfsQueryThenFetchAction.execute(searchRequest, listener);} else if (searchRequest.searchType() == SearchType.QUERY_THEN_FETCH) {    queryThenFetchAction.execute(searchRequest, listener);} else if (searchRequest.searchType() == SearchType.DFS_QUERY_AND_FETCH) {    dfsQueryAndFetchAction.execute(searchRequest, listener);} else if (searchRequest.searchType() == SearchType.QUERY_AND_FETCH) {    queryAndFetchAction.execute(searchRequest, listener);} else if (searchRequest.searchType() == SearchType.SCAN) {    scanAction.execute(searchRequest, listener);} else if (searchRequest.searchType() == SearchType.COUNT) {    countAction.execute(searchRequest, listener);}

query_and_fetch执行过程

query_and_fetch搜索对应的类为TransportSearchQueryAndFetchAction。它在执行的时候会启动一个异步任务,对应的代码如下:

@Overrideprotected void doExecute(SearchRequest searchRequest, ActionListener
listener) { new AsyncAction(searchRequest, listener).start();}

AsyncAction的入口方法是start,定义在它的基类BaseAsyncAction中。

BaseAsyncAction中的start方法首先确定有哪些分片需要处理,然后给每个需要处理的分片都启动一个异步的搜索任务,对应的关键代码如下:

for (final ShardIterator shardIt : shardsIts) {    ...    performFirstPhase(shardIndex, shardIt, shard);    ...}

performFirstPhase的作用就是生成一个内部的分片搜索请求,这种请求只针对一个分片,然后调用了一个子类的方法sendExecuteFirstPhase,让子类选择的处理方式。对应的代码如下:

sendExecuteFirstPhase(node, firstRequest, new SearchServiceListener
() { @Override public void onResult(FirstResult result) { onFirstPhaseResult(shardIndex, shard, result, shardIt); } ...});

onFirstPhaseResult主要作用是调用子类的moveToSecondPhase。这个方法在executeFetchPhase之后才执行的,因此在其后面再介绍。

sendExecuteFirstPhase方法是抽象的。在搜索模式为query_and_fetch时,对分片请求的处理方式是调用sendExecuteFetch。子类对它的实现代码如下:

@Overrideprotected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListener
listener) { searchService.sendExecuteFetch(node, request, listener);}

sendExecuteFetch定义在SearchServiceTransportAction中,它会将分片搜索请求转发到对应的节点上。当然,如果对应的节点是自己这个节点,就不转发了,直接执行executeFetchPhase

executeFetchPhase的执行过程

executeFetchPhase中,首先初始化一个SearchContext,然后调用queryPhase,然后调用fetchPhase,最后清理SearchContext。对应的关键代码如下:

public QueryFetchSearchResult executeFetchPhase(ShardSearchRequest request) throws ElasticsearchException {    // 初始化SearchContext    final SearchContext context = createAndPutContext(request);    ...    try {        // 执行queryPhase        ...        queryPhase.execute(context);        ...        // 执行fetchPhase        ...        fetchPhase.execute(context);        ...        if (context.scroll() == null) {           freeContext(context.id());       }       ...        // 返回结果        return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());    } catch (Throwable e) {        ...    } finally {        // 清理SearchContext        cleanContext(context);    }}

从上面的代码可以看出,queryPhasefetchPhase是连续执行的,这就是query_and_fetch的含义。当这两个阶段执行完成之后,如果不是滚动查询,就直接调用freeContextSearchContextactiveContexts中删除。

freeContextclearContext有什么区别呢?

freeContext的主要作用是从activeContexts中删掉指定的SearchContext,这样JVM就能将这部分的内存进行回收了。相关的代码如下:

public boolean freeContext(long id) {    final SearchContext context = activeContexts.remove(id);    ...    context.close();    ...}

cleanContext的主要作用是释放部分内存空间,然后将指定的context从ThreadLocal中删掉。相关的代码如下:

private void cleanContext(SearchContext context) {    context.clearReleasables(Lifetime.PHASE);    SearchContext.removeCurrent();}

SearchContext.clearReleasables的主要作用是将可以释放的内存释放掉。那怎样判断哪些资源是可以释放的呢?这个需要别的类在里面进行注册了。注册的时候调用addReleasable,告知哪个阶段可以清理哪些资源。相关的代码如下:

private Multimap
clearables = null;public void addReleasable(Releasable releasable, Lifetime lifetime) { ... clearables.put(lifetime, releasable);}public void clearReleasables(Lifetime lifetime) { ... List
> releasables = new ArrayList<>(); for (Lifetime lc : Lifetime.values()) { if (lc.compareTo(lifetime) > 0) { break; } releasables.add(clearables.removeAll(lc)); } Releasables.close(Iterables.concat(releasables)); ...}

上面这段代码中的for循环是为了收集指定阶段,以及该阶段之前所有可清理的资源。因此clearReleasables的作用是清理掉指定阶段以及该阶段之前的所有可清理的资源。

SearchContext.removeCurrent代码如下。主要就是将当前的SearchContext删除掉。

private static ThreadLocal
current = new ThreadLocal<>();public static void removeCurrent() { current.remove(); QueryParseContext.removeTypes();}

分析一下executeFetchPhase返回结果。从代码中可以看出,返回的结果中包含query的结果和fetch的结果。对应的两个类分别是QuerySearchResultFetchSearchResult

QuerySearchResult

QuerySearchResult定义如下:

public class QuerySearchResult extends TransportResponse implements QuerySearchResultProvider {
private long id; private SearchShardTarget shardTarget; private int from; private int size; private TopDocs topDocs; private InternalFacets facets; private InternalAggregations aggregations; private Suggest suggest; private boolean searchTimedOut; ...}

其中最重要的应该是topDocs和aggregations。

再看看TopDocs的定义:

public class TopDocs {
public int totalHits; public ScoreDoc[] scoreDocs; private float maxScore; ...}

里面包含了搜索命中的结果数量、文档编号、文档匹配分数、最大匹配分数。再看看ScoreDoc是如何定义的。

public class ScoreDoc {
public float score; public int doc; public int shardIndex; ...}

这里的doc就是内部的文档编号,可以通过IndexSearcher#doc(int)方法获取对应的文档内容。

因此QuerySearchResult中只包含了内部的文档编号、文档的匹配分值。

FetchSearchResult

FetchSearchResult的定义如下:

public class FetchSearchResult extends TransportResponse implements FetchSearchResultProvider {
private long id; private SearchShardTarget shardTarget; private InternalSearchHits hits; private transient int counter; ...}

它包含了InternalSearchHitsInternalSearchHits的定义如下:

public class InternalSearchHits implements SearchHits {
private InternalSearchHit[] hits; public long totalHits; private float maxScore; ...}

它包含了InternalSearchHitInternalSearchHit的定义如下:

public class InternalSearchHit implements SearchHit {
... private Map
sourceAsMap; private byte[] sourceAsBytes; ...}

它包含了文档的原始内容和解析后的内容。

因此FetchSearchResult包含了文档的具体内容。

moveToSecondPhase执行过程

在上面的executeFetchPhase执行完成之后,得到query结果和fetch结果之后,就执行moveToSecondPhase了。

moveToSecondPhaseBaseAsyncAction中是一个抽象方法,它在子类TransportSearchQueryAndFetchAction中的定义如下:

@Overrideprotected void moveToSecondPhase() throws Exception {    threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {        @Override        public void run() {            ...            sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);            final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, firstResults);            ...            listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));        }    });}

从代码中可以看出,搜索的第二阶段是在search线程池中提交一个任务,首先是对分片结果进行整体排序,然后将搜索结果进行合并。这里面分别调用了searchPhaseController.sortDocssearchPhaseController.merge两个方法。

首先看看sortDocs做了什么。它的关键代码如下:

public ScoreDoc[] sortDocs(boolean scrollSort, AtomicArray
resultsArr) throws IOException { ... TopDocs mergedTopDocs = TopDocs.merge(sort, from, topN, shardTopDocs); return mergedTopDocs.scoreDocs;}

从代码中可以看出,对文档进行排序的时候调用了TopDocs.merge方法。这个方法的关键代码如下:

public static TopDocs merge(Sort sort, int start, int size, TopDocs[] shardHits) throws IOException {    final PriorityQueue
queue; ... queue = new ScoreMergeSortQueue(shardHits); ... for(int shardIDX=0;shardIDX

这段代码刚开始看起来有点复杂,看不太懂。其实它是归并排序的变种。因为普通的归并排序只针对两个数组。排序的时候每次从输入的两个数组中取出最小的元素,放到结果中。而这里输入的数组会有多个,每次也要取出最小的元素,放到结果中。如何快速的从多个数组中取出最小的元素呢?这里利用了优先级队列。

再看searchPhaseController.merge做了什么呢?主要是合并facet结果、aggregation结果、hits结果、suggest结果、count结果。这里就看一下最关键的hits是怎样合并的。相关的代码如下:

List
hits = new ArrayList<>();for (ScoreDoc shardDoc : sortedDocs) { FetchSearchResult fetchResult = ...; ... int index = fetchResult.counterGetAndIncrement(); if (index < fetchResult.hits().internalHits().length) { ... InternalSearchHit searchHit = fetchResult.hits().internalHits()[index]; ... hits.add(searchHit); } ...}

hit的合并过程就是将fetchResult中所有文档的具体内容组合成一个新的列表。其他的facet、aggregations、suggest、count的合并过程也是类似。

query_and_fetch下的fromsize

举个例子,如果在搜索的时候指定search_typequery_and_fetch,再指定size为10,那么就会返回50个结果。这是为什么呢?原来,在fetch的时候就已经把fromsize参数用掉了,导致每个分片都返回了10个文档,如果有5个分片的话,那么最后合并的结果就是50个。

如果query_and_fetch情况下这两和参数的作用和query_then_fetch的行为一样。那么一次就要取出from+size个文档。如果from比较大,那么取出的文档足以撑爆内存。而如果在fetch阶段就直接使用这两个参数,每个分片最多就取出size个文档,from稍微大一些也没有关系。因此,这种行为的设计也是可以理解的。

query_then_fetch执行过程

query_then_fetch的搜索逻辑在TransportSearchQueryThenFetchAction中。它在执行的时候会启动一个异步任务。相关代码如下:

@Overrideprotected void doExecute(SearchRequest searchRequest, ActionListener
listener) { new AsyncAction(searchRequest, listener).start();}

start方法定义在它的基类BaseAsyncTask中,主要作用是给每个需要搜索的分片单独启动一个异步任务,并让子类选择处理方式。在第一步处理完成之后,会调用子类中的moveToSecondPhase继续执行第二阶段的计算任务。在query_then_fetch中,子类的处理方式是sendExecuteQuery。相关的代码如下:

@Overrideprotected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListener
listener) { searchService.sendExecuteQuery(node, request, listener);}

sendExecuteQuery的作用是什么呢?它会将query请求发送到对应的节点上。如果对应的节点就是自己,那么就直接调用executeQueryPhase,执行query任务。

executeQueryPhase的主要作用是初始化SearchContext,然后执行queryPhase,最后清理SearchContext。返回的结果是QuerySearchResult,它只包含文档编号和必要的排序分值。相关的代码如下:

public QuerySearchResult executeQueryPhase(ShardSearchRequest request) throws ElasticsearchException {    final SearchContext context = createAndPutContext(request);    try {       ...       queryPhase.execute(context);       ...       if (context.searchType() == SearchType.COUNT) {           freeContext(context.id());       }       ...       return context.queryResult();    } catch(...) {        ...    } finally {       ...       cleanContext(context);    }}

代码里面可以看到一个细节,就是只有在search_typecount的时候才会调用freeContext。也就是说,如果search_type不是count,那么SearchContext仍然会驻留在内存中。cleanContextfreeContext的区别在前面的文章里面讲过了。

queryPhase结束之后,就开始第二阶段了。第二阶段从moveToSecondPhase开始。它的代码定义在TransportSearchQueryThenFetchAction中。主要的作用是将第一阶段获取的文档编号进行排序。排序完成之后再根据文档编号获取文档里面实际的内容。相关的代码如下:

@Overrideprotected void moveToSecondPhase() throws Exception {    ...    sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);    searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);    ...    final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());    for (AtomicArray.Entry
entry : docIdsToLoad.asList()) { QuerySearchResult queryResult = firstResults.get(entry.index); ... executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); }}

sortDocs前面已经讲过了,作用是将文档编号根据每个文档的匹配分值进行排序。

executeFetch相关的代码如下。它的作用是调用searchService开启一个异步任务,根据文档编号获取文档的具体内容,并将结果存放到fetchResults中。根据counter判断如果所有的fetch任务都执行完了,就调用finishHim来完成本次查询结果。

void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, final FetchSearchRequest fetchSearchRequest, DiscoveryNode node) {    searchService.sendExecuteFetch(node, fetchSearchRequest, new SearchServiceListener
() { @Override public void onResult(FetchSearchResult result) { ... fetchResults.set(shardIndex, result); if (counter.decrementAndGet() == 0) { finishHim(); } } ... });}

sendExecuteFetch最后会调用executeFetchPhase。它的关键代码如下:

public FetchSearchResult executeFetchPhase(FetchSearchRequest request) throws ElasticsearchException {    final SearchContext context = findContext(request.id());    ...    fetchPhase.execute(context);    ...    freeContext(request.id());    ...    return context.fetchResult();}

从代码中可以看出,在fetch阶段结束之后才会释放SearchContext

finishHim相关的代码如下。它的作用是合并每个分片的查询结果,让后将合并结果通知给listener。让它完成最后的查询结果。searchPhaseController.merge在前面讲过了,主要作用是

private void finishHim() {    ...    threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {        @Override        public void run() {            ...            final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, fetchResults);            ...            listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));            ...        }    });    ...}

搜索模式对聚合查询的影响

query_and_fetchquery_then_fetch下的聚合查询有什么区别呢?首先,前面已经讲过了,query_and_fetchexecuteFetchPhase的时候就把SearchContext释放掉了。

public QueryFetchSearchResult executeFetchPhase(ShardSearchRequest request) throws ElasticsearchException {    final SearchContext context = createAndPutContext(request);    ...    queryPhase.execute(context);    ...    fetchPhase.execute(context);    ...    freeContext(context.id());    ...    return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());}

而在query_then_fetch中,query阶段不会释放SearchContext,在fetch阶段结束之后才会释放SearchContext。两段相关的代码如下:

public QuerySearchResult executeQueryPhase(ShardSearchRequest request) throws ElasticsearchException {    final SearchContext context = createAndPutContext(request);    ...    queryPhase.execute(context);    ...    return context.queryResult();}public FetchSearchResult executeFetchPhase(FetchSearchRequest request) throws ElasticsearchException {    final SearchContext context = findContext(request.id());    ...    fetchPhase.execute(context);    ...    freeContext(request.id());    ...    return context.fetchResult();}

在聚类搜索的时候,内存占用比较大的是FieldData数据,这些数据储存在SearchContext中。在query_then_fetch的搜索模式下,载入FieldData是在query阶段完成的。而query阶段不会释放内存,因此内存中会存放所有分片的FieldData,从而导致内存溢出。

但是我还是有个疑问,为什么不在query阶段结束之后立即释放SearchContext中的FieldData呢?这样也就不会有内存溢出的问题了。

转载地址:http://zvyub.baihongyu.com/

你可能感兴趣的文章
【Pyton】【小甲鱼】异常处理:你不可能总是对的
查看>>
APP性能测试工具
查看>>
【Pyton】【小甲鱼】类和对象
查看>>
压力测试工具JMeter入门教程
查看>>
作为一名软件测试工程师,需要具备哪些能力
查看>>
【Pyton】【小甲鱼】类和对象:一些相关的BIF(内置函数)
查看>>
【Pyton】【小甲鱼】魔法方法
查看>>
单元测试需要具备的技能和4大阶段的学习
查看>>
【Loadrunner】【浙江移动项目手写代码】代码备份
查看>>
Python几种并发实现方案的性能比较
查看>>
[Jmeter]jmeter之脚本录制与回放,优化(windows下的jmeter)
查看>>
Jmeter之正则
查看>>
【JMeter】1.9上考试jmeter测试调试
查看>>
【虫师】【selenium】参数化
查看>>
【Python练习】文件引用用户名密码登录系统
查看>>
学习网站汇总
查看>>
【Python】用Python打开csv和xml文件
查看>>
【Loadrunner】性能测试报告实战
查看>>
【自动化测试】自动化测试需要了解的的一些事情。
查看>>
【selenium】selenium ide的安装过程
查看>>