本文共 14122 字,大约阅读时间需要 47 分钟。
在ElasticSearch实际使用中遇到了一个问题,就是在数据量很大的情况下做聚合查询(aggregation)会导致内存溢出。当时看了文档,猜测修改search_type
能避免内存溢出。实际测试发现,在数据量相同的情况下,search_type
设置为query_and_fetch
的聚合查询不会导致内存溢出,而默认的query_then_fetch
则会内存溢出。本文就从源码层面分析这两种search_type
的区别。
当一个搜索请求发送到节点之后,节点首先判断出这是一个搜索请求,然后将这个请求传递给TransportSearchAction
。这个类负责处理所有的搜索请求。
TransportSearchAction
负责根据搜索请求中的search_type
将本次的搜索请求传递给对应的类。核心代码如下:
if (searchRequest.searchType() == DFS_QUERY_THEN_FETCH) { dfsQueryThenFetchAction.execute(searchRequest, listener);} else if (searchRequest.searchType() == SearchType.QUERY_THEN_FETCH) { queryThenFetchAction.execute(searchRequest, listener);} else if (searchRequest.searchType() == SearchType.DFS_QUERY_AND_FETCH) { dfsQueryAndFetchAction.execute(searchRequest, listener);} else if (searchRequest.searchType() == SearchType.QUERY_AND_FETCH) { queryAndFetchAction.execute(searchRequest, listener);} else if (searchRequest.searchType() == SearchType.SCAN) { scanAction.execute(searchRequest, listener);} else if (searchRequest.searchType() == SearchType.COUNT) { countAction.execute(searchRequest, listener);}
query_and_fetch
执行过程query_and_fetch
搜索对应的类为TransportSearchQueryAndFetchAction
。它在执行的时候会启动一个异步任务,对应的代码如下:
@Overrideprotected void doExecute(SearchRequest searchRequest, ActionListenerlistener) { new AsyncAction(searchRequest, listener).start();}
AsyncAction
的入口方法是start
,定义在它的基类BaseAsyncAction
中。
BaseAsyncAction
中的start
方法首先确定有哪些分片需要处理,然后给每个需要处理的分片都启动一个异步的搜索任务,对应的关键代码如下:
for (final ShardIterator shardIt : shardsIts) { ... performFirstPhase(shardIndex, shardIt, shard); ...}
performFirstPhase
的作用就是生成一个内部的分片搜索请求,这种请求只针对一个分片,然后调用了一个子类的方法sendExecuteFirstPhase
,让子类选择的处理方式。对应的代码如下:
sendExecuteFirstPhase(node, firstRequest, new SearchServiceListener() { @Override public void onResult(FirstResult result) { onFirstPhaseResult(shardIndex, shard, result, shardIt); } ...});
onFirstPhaseResult
主要作用是调用子类的moveToSecondPhase
。这个方法在executeFetchPhase
之后才执行的,因此在其后面再介绍。
sendExecuteFirstPhase
方法是抽象的。在搜索模式为query_and_fetch
时,对分片请求的处理方式是调用sendExecuteFetch
。子类对它的实现代码如下:
@Overrideprotected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListenerlistener) { searchService.sendExecuteFetch(node, request, listener);}
sendExecuteFetch
定义在SearchServiceTransportAction
中,它会将分片搜索请求转发到对应的节点上。当然,如果对应的节点是自己这个节点,就不转发了,直接执行executeFetchPhase
。
executeFetchPhase
的执行过程executeFetchPhase
中,首先初始化一个SearchContext
,然后调用queryPhase
,然后调用fetchPhase
,最后清理SearchContext
。对应的关键代码如下:
public QueryFetchSearchResult executeFetchPhase(ShardSearchRequest request) throws ElasticsearchException { // 初始化SearchContext final SearchContext context = createAndPutContext(request); ... try { // 执行queryPhase ... queryPhase.execute(context); ... // 执行fetchPhase ... fetchPhase.execute(context); ... if (context.scroll() == null) { freeContext(context.id()); } ... // 返回结果 return new QueryFetchSearchResult(context.queryResult(), context.fetchResult()); } catch (Throwable e) { ... } finally { // 清理SearchContext cleanContext(context); }}
从上面的代码可以看出,queryPhase
和fetchPhase
是连续执行的,这就是query_and_fetch
的含义。当这两个阶段执行完成之后,如果不是滚动查询,就直接调用freeContext
将SearchContext
从activeContexts
中删除。
freeContext
和clearContext
有什么区别呢?
freeContext
的主要作用是从activeContexts
中删掉指定的SearchContext
,这样JVM就能将这部分的内存进行回收了。相关的代码如下:
public boolean freeContext(long id) { final SearchContext context = activeContexts.remove(id); ... context.close(); ...}
cleanContext
的主要作用是释放部分内存空间,然后将指定的context从ThreadLocal
中删掉。相关的代码如下:
private void cleanContext(SearchContext context) { context.clearReleasables(Lifetime.PHASE); SearchContext.removeCurrent();}
SearchContext.clearReleasables
的主要作用是将可以释放的内存释放掉。那怎样判断哪些资源是可以释放的呢?这个需要别的类在里面进行注册了。注册的时候调用addReleasable
,告知哪个阶段可以清理哪些资源。相关的代码如下:
private Multimapclearables = null;public void addReleasable(Releasable releasable, Lifetime lifetime) { ... clearables.put(lifetime, releasable);}public void clearReleasables(Lifetime lifetime) { ... List > releasables = new ArrayList<>(); for (Lifetime lc : Lifetime.values()) { if (lc.compareTo(lifetime) > 0) { break; } releasables.add(clearables.removeAll(lc)); } Releasables.close(Iterables.concat(releasables)); ...}
上面这段代码中的for循环是为了收集指定阶段,以及该阶段之前所有可清理的资源。因此clearReleasables
的作用是清理掉指定阶段以及该阶段之前的所有可清理的资源。
SearchContext.removeCurrent
代码如下。主要就是将当前的SearchContext
删除掉。
private static ThreadLocalcurrent = new ThreadLocal<>();public static void removeCurrent() { current.remove(); QueryParseContext.removeTypes();}
分析一下executeFetchPhase
返回结果。从代码中可以看出,返回的结果中包含query的结果和fetch的结果。对应的两个类分别是QuerySearchResult
和FetchSearchResult
。
QuerySearchResult
QuerySearchResult
定义如下:
public class QuerySearchResult extends TransportResponse implements QuerySearchResultProvider { private long id; private SearchShardTarget shardTarget; private int from; private int size; private TopDocs topDocs; private InternalFacets facets; private InternalAggregations aggregations; private Suggest suggest; private boolean searchTimedOut; ...}
其中最重要的应该是topDocs和aggregations。
再看看TopDocs
的定义:
public class TopDocs { public int totalHits; public ScoreDoc[] scoreDocs; private float maxScore; ...}
里面包含了搜索命中的结果数量、文档编号、文档匹配分数、最大匹配分数。再看看ScoreDoc
是如何定义的。
public class ScoreDoc { public float score; public int doc; public int shardIndex; ...}
这里的doc
就是内部的文档编号,可以通过IndexSearcher#doc(int)
方法获取对应的文档内容。
因此QuerySearchResult
中只包含了内部的文档编号、文档的匹配分值。
FetchSearchResult
FetchSearchResult
的定义如下:
public class FetchSearchResult extends TransportResponse implements FetchSearchResultProvider { private long id; private SearchShardTarget shardTarget; private InternalSearchHits hits; private transient int counter; ...}
它包含了InternalSearchHits
。InternalSearchHits
的定义如下:
public class InternalSearchHits implements SearchHits { private InternalSearchHit[] hits; public long totalHits; private float maxScore; ...}
它包含了InternalSearchHit
。InternalSearchHit
的定义如下:
public class InternalSearchHit implements SearchHit { ... private MapsourceAsMap; private byte[] sourceAsBytes; ...}
它包含了文档的原始内容和解析后的内容。
因此FetchSearchResult
包含了文档的具体内容。
moveToSecondPhase
执行过程在上面的executeFetchPhase
执行完成之后,得到query结果和fetch结果之后,就执行moveToSecondPhase
了。
moveToSecondPhase
在BaseAsyncAction
中是一个抽象方法,它在子类TransportSearchQueryAndFetchAction
中的定义如下:
@Overrideprotected void moveToSecondPhase() throws Exception { threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { @Override public void run() { ... sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults); final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, firstResults); ... listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); } });}
从代码中可以看出,搜索的第二阶段是在search线程池中提交一个任务,首先是对分片结果进行整体排序,然后将搜索结果进行合并。这里面分别调用了searchPhaseController.sortDocs
和searchPhaseController.merge
两个方法。
首先看看sortDocs
做了什么。它的关键代码如下:
public ScoreDoc[] sortDocs(boolean scrollSort, AtomicArray resultsArr) throws IOException { ... TopDocs mergedTopDocs = TopDocs.merge(sort, from, topN, shardTopDocs); return mergedTopDocs.scoreDocs;}
从代码中可以看出,对文档进行排序的时候调用了TopDocs.merge
方法。这个方法的关键代码如下:
public static TopDocs merge(Sort sort, int start, int size, TopDocs[] shardHits) throws IOException { final PriorityQueuequeue; ... queue = new ScoreMergeSortQueue(shardHits); ... for(int shardIDX=0;shardIDX
这段代码刚开始看起来有点复杂,看不太懂。其实它是归并排序的变种。因为普通的归并排序只针对两个数组。排序的时候每次从输入的两个数组中取出最小的元素,放到结果中。而这里输入的数组会有多个,每次也要取出最小的元素,放到结果中。如何快速的从多个数组中取出最小的元素呢?这里利用了优先级队列。
再看searchPhaseController.merge
做了什么呢?主要是合并facet结果、aggregation结果、hits结果、suggest结果、count结果。这里就看一下最关键的hits是怎样合并的。相关的代码如下:
Listhits = new ArrayList<>();for (ScoreDoc shardDoc : sortedDocs) { FetchSearchResult fetchResult = ...; ... int index = fetchResult.counterGetAndIncrement(); if (index < fetchResult.hits().internalHits().length) { ... InternalSearchHit searchHit = fetchResult.hits().internalHits()[index]; ... hits.add(searchHit); } ...}
hit的合并过程就是将fetchResult
中所有文档的具体内容组合成一个新的列表。其他的facet、aggregations、suggest、count的合并过程也是类似。
query_and_fetch
下的from
和size
举个例子,如果在搜索的时候指定search_type
为query_and_fetch
,再指定size
为10,那么就会返回50个结果。这是为什么呢?原来,在fetch的时候就已经把from
和size
参数用掉了,导致每个分片都返回了10个文档,如果有5个分片的话,那么最后合并的结果就是50个。
如果query_and_fetch
情况下这两和参数的作用和query_then_fetch
的行为一样。那么一次就要取出from
+size
个文档。如果from
比较大,那么取出的文档足以撑爆内存。而如果在fetch阶段就直接使用这两个参数,每个分片最多就取出size
个文档,from
稍微大一些也没有关系。因此,这种行为的设计也是可以理解的。
query_then_fetch
执行过程query_then_fetch
的搜索逻辑在TransportSearchQueryThenFetchAction
中。它在执行的时候会启动一个异步任务。相关代码如下:
@Overrideprotected void doExecute(SearchRequest searchRequest, ActionListenerlistener) { new AsyncAction(searchRequest, listener).start();}
start
方法定义在它的基类BaseAsyncTask
中,主要作用是给每个需要搜索的分片单独启动一个异步任务,并让子类选择处理方式。在第一步处理完成之后,会调用子类中的moveToSecondPhase
继续执行第二阶段的计算任务。在query_then_fetch
中,子类的处理方式是sendExecuteQuery
。相关的代码如下:
@Overrideprotected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListenerlistener) { searchService.sendExecuteQuery(node, request, listener);}
sendExecuteQuery
的作用是什么呢?它会将query请求发送到对应的节点上。如果对应的节点就是自己,那么就直接调用executeQueryPhase
,执行query任务。
executeQueryPhase
的主要作用是初始化SearchContext
,然后执行queryPhase
,最后清理SearchContext
。返回的结果是QuerySearchResult
,它只包含文档编号和必要的排序分值。相关的代码如下:
public QuerySearchResult executeQueryPhase(ShardSearchRequest request) throws ElasticsearchException { final SearchContext context = createAndPutContext(request); try { ... queryPhase.execute(context); ... if (context.searchType() == SearchType.COUNT) { freeContext(context.id()); } ... return context.queryResult(); } catch(...) { ... } finally { ... cleanContext(context); }}
代码里面可以看到一个细节,就是只有在search_type
为count
的时候才会调用freeContext
。也就是说,如果search_type
不是count
,那么SearchContext
仍然会驻留在内存中。cleanContext
和freeContext
的区别在前面的文章里面讲过了。
当queryPhase
结束之后,就开始第二阶段了。第二阶段从moveToSecondPhase
开始。它的代码定义在TransportSearchQueryThenFetchAction
中。主要的作用是将第一阶段获取的文档编号进行排序。排序完成之后再根据文档编号获取文档里面实际的内容。相关的代码如下:
@Overrideprotected void moveToSecondPhase() throws Exception { ... sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults); searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList); ... final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); for (AtomicArray.Entryentry : docIdsToLoad.asList()) { QuerySearchResult queryResult = firstResults.get(entry.index); ... executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); }}
sortDocs
前面已经讲过了,作用是将文档编号根据每个文档的匹配分值进行排序。
executeFetch
相关的代码如下。它的作用是调用searchService
开启一个异步任务,根据文档编号获取文档的具体内容,并将结果存放到fetchResults
中。根据counter判断如果所有的fetch任务都执行完了,就调用finishHim
来完成本次查询结果。
void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, final FetchSearchRequest fetchSearchRequest, DiscoveryNode node) { searchService.sendExecuteFetch(node, fetchSearchRequest, new SearchServiceListener() { @Override public void onResult(FetchSearchResult result) { ... fetchResults.set(shardIndex, result); if (counter.decrementAndGet() == 0) { finishHim(); } } ... });}
sendExecuteFetch
最后会调用executeFetchPhase
。它的关键代码如下:
public FetchSearchResult executeFetchPhase(FetchSearchRequest request) throws ElasticsearchException { final SearchContext context = findContext(request.id()); ... fetchPhase.execute(context); ... freeContext(request.id()); ... return context.fetchResult();}
从代码中可以看出,在fetch阶段结束之后才会释放SearchContext
。
finishHim
相关的代码如下。它的作用是合并每个分片的查询结果,让后将合并结果通知给listener。让它完成最后的查询结果。searchPhaseController.merge
在前面讲过了,主要作用是
private void finishHim() { ... threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { @Override public void run() { ... final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, fetchResults); ... listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); ... } }); ...}
query_and_fetch
和query_then_fetch
下的聚合查询有什么区别呢?首先,前面已经讲过了,query_and_fetch
在executeFetchPhase
的时候就把SearchContext
释放掉了。
public QueryFetchSearchResult executeFetchPhase(ShardSearchRequest request) throws ElasticsearchException { final SearchContext context = createAndPutContext(request); ... queryPhase.execute(context); ... fetchPhase.execute(context); ... freeContext(context.id()); ... return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());}
而在query_then_fetch
中,query阶段不会释放SearchContext
,在fetch阶段结束之后才会释放SearchContext
。两段相关的代码如下:
public QuerySearchResult executeQueryPhase(ShardSearchRequest request) throws ElasticsearchException { final SearchContext context = createAndPutContext(request); ... queryPhase.execute(context); ... return context.queryResult();}public FetchSearchResult executeFetchPhase(FetchSearchRequest request) throws ElasticsearchException { final SearchContext context = findContext(request.id()); ... fetchPhase.execute(context); ... freeContext(request.id()); ... return context.fetchResult();}
在聚类搜索的时候,内存占用比较大的是FieldData
数据,这些数据储存在SearchContext
中。在query_then_fetch
的搜索模式下,载入FieldData是在query阶段完成的。而query阶段不会释放内存,因此内存中会存放所有分片的FieldData,从而导致内存溢出。
但是我还是有个疑问,为什么不在query阶段结束之后立即释放SearchContext中的FieldData呢?这样也就不会有内存溢出的问题了。
转载地址:http://zvyub.baihongyu.com/