0.前言
当elasticsearch机器CPU过高时,集群会出现拒绝响应EsRejectedExecutionException异常,甚至引起集群down掉。
[2018-11-11 08:38:34,867][DEBUG][action.search.type] [127.0.0.1] [12439276468] Failed to execute fetch phase
org.elasticsearch.transport.RemoteTransportException: [127.0.0.1][inet[/127.0.0.1:9300]][indices:data/read/search[phase/fetch/id]]
Caused by: org.elasticsearch.common.util.concurrent.EsRejectedExecutionException: rejected execution (queue capacity 1000) on org.elasticsearch.transport.netty.MessageChannelHandler$RequestHandler@7cf77b77
at org.elasticsearch.common.util.concurrent.EsAbortPolicy.rejectedExecution(EsAbortPolicy.java:62)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.execute(EsThreadPoolExecutor.java:79)
at org.elasticsearch.transport.netty.MessageChannelHandler.handleRequest(MessageChannelHandler.java:224)
at org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(MessageChannelHandler.java:114)
at org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:296)
at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
at org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
at org.elasticsearch.common.netty.OpenChannelsHandler.handleUpstream(OpenChannelsHandler.java:74)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at org.elasticsearch.common.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at org.elasticsearch.common.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at org.elasticsearch.common.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.elasticsearch.common.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
遇到这种情况,我们可以使用elasticsearch的hot_threads api来显示当前耗时较高的线程栈,请求的示例:
curl -XGET 127.0.0.1:9200/_cluster/nodes/hotthreads
返回的结果示例:
::: [127.0.0.1][pGtNLemNQNq2P1TYQPUqag][localhost][inet[/127.0.0.1:9300]]{master=true}
Hot threads at 2018-12-28T12:12:48.046Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:
1.2% (6.2ms out of 500ms) cpu usage by thread 'elasticsearch[127.0.0.1][search][T#18]'
9/10 snapshots sharing following 2 elements
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
1.请求映射
在elasticsearch启动时,会添加Rest模块,guice注入框架会执行模块的configure()方法进行配置
public class RestModule extends AbstractModule {
@Override
protected void configure() {
bind(RestController.class).asEagerSingleton();
new RestActionModule(restPluginsActions).configure(binder());
}
}
在elasticsearch的Rest模块中,会绑定RestNodesHotThreadsAction
public class RestActionModule extends AbstractModule {
@Override
protected void configure() {
bind(RestNodesHotThreadsAction.class).asEagerSingleton();
}
}
在RestNodesHotThreadsAction的构造方法中,会将url注入到当前对象中,其中nodeId参数为节点id,类似于响应中的pGtNLemNQNq2P1TYQPUqag。如果有url中包含nodeId参数,则只分析指定node的线程栈。
public class RestNodesHotThreadsAction extends BaseRestHandler {
@Inject
public RestNodesHotThreadsAction(Settings settings, RestController controller, Client client) {
super(settings, controller, client);
controller.registerHandler(RestRequest.Method.GET, "/_cluster/nodes/hotthreads", this);
controller.registerHandler(RestRequest.Method.GET, "/_cluster/nodes/hot_threads", this);
controller.registerHandler(RestRequest.Method.GET, "/_cluster/nodes/{nodeId}/hotthreads", this);
controller.registerHandler(RestRequest.Method.GET, "/_cluster/nodes/{nodeId}/hot_threads", this);
controller.registerHandler(RestRequest.Method.GET, "/_nodes/hotthreads", this);
controller.registerHandler(RestRequest.Method.GET, "/_nodes/hot_threads", this);
controller.registerHandler(RestRequest.Method.GET, "/_nodes/{nodeId}/hotthreads", this);
controller.registerHandler(RestRequest.Method.GET, "/_nodes/{nodeId}/hot_threads", this);
}
}
RestNodesHotThreadsAction重写了父类BaseRestHandler的handleRequest()方法,用来处理与url匹配的请求。
RestRequest支持如下参数:
nodeId
: 要分析的节点id线程栈,如果为空,则分析整个集群所有节点的线程栈
threads
: 需要分析的线程数,默认3
ignore_idle_threads
: 是否忽略空闲线程,默认为true
type
:需要检查的线程状态的类型,默认是cpu,支持block、wait
interval
:前后两次检查的时间间隔,默认500ms
snapshots
:需要生产堆栈跟踪快照的数量
最终调用AbstractClusterAdminClient的nodesHotThreads()方法分析热点线程
public class RestNodesHotThreadsAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
String[] nodesIds = Strings.splitStringByCommaToArray(request.param("nodeId"));
NodesHotThreadsRequest nodesHotThreadsRequest = new NodesHotThreadsRequest(nodesIds);
nodesHotThreadsRequest.threads(request.paramAsInt("threads", nodesHotThreadsRequest.threads()));
nodesHotThreadsRequest.ignoreIdleThreads(request.paramAsBoolean("ignore_idle_threads", nodesHotThreadsRequest.ignoreIdleThreads()));
nodesHotThreadsRequest.type(request.param("type", nodesHotThreadsRequest.type()));
nodesHotThreadsRequest.interval(TimeValue.parseTimeValue(request.param("interval"), nodesHotThreadsRequest.interval()));
nodesHotThreadsRequest.snapshots(request.paramAsInt("snapshots", nodesHotThreadsRequest.snapshots()));
client.admin().cluster().nodesHotThreads(nodesHotThreadsRequest, new RestResponseListener<NodesHotThreadsResponse>(channel) {
@Override
public RestResponse buildResponse(NodesHotThreadsResponse response) throws Exception {
StringBuilder sb = new StringBuilder();
for (NodeHotThreads node : response) {
sb.append("::: ").append(node.getNode().toString()).append("\n");
Strings.spaceify(3, node.getHotThreads(), sb);
sb.append('\n');
}
return new BytesRestResponse(RestStatus.OK, sb.toString());
}
});
}
}
2、Action子类及实现
nodesHotThreads()方法中指定当前操作的action为NodesHotThreadsAction.INSTANCE,elasticsearch会将此action绑定到TransportNodesHotThreadsAction类上
public class ActionModule extends AbstractModule {
@Override
protected void configure() {
registerAction(NodesHotThreadsAction.INSTANCE, TransportNodesHotThreadsAction.class);
}
}
因此在执行TransportAction.execute()方法时,会执行TransportNodesHotThreadsAction父类TransportNodesOperationAction的doExecute()方法
public abstract class TransportNodesOperationAction<Request extends NodesOperationRequest, Response extends NodesOperationResponse, NodeRequest extends NodeOperationRequest, NodeResponse extends NodeOperationResponse> extends TransportAction<Request, Response> {
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
new AsyncAction(request, listener).start();
}
private class AsyncAction {
private AsyncAction(Request request, ActionListener<Response> listener) {
this.request = request;
this.listener = listener;
clusterState = clusterService.state();
String[] nodesIds = resolveNodes(request, clusterState);
this.nodesIds = filterNodeIds(clusterState.nodes(), nodesIds);
this.responses = new AtomicReferenceArray<>(this.nodesIds.length);
}
private void start() {
if (nodesIds.length == 0) { // 集群中的节点
// nothing to notify
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
listener.onResponse(newResponse(request, responses));
}
});
return;
}
TransportRequestOptions transportRequestOptions = TransportRequestOptions.options();
if (request.timeout() != null) {
transportRequestOptions.withTimeout(request.timeout());
}
// 不启用压缩
transportRequestOptions.withCompress(transportCompress());
for (int i = 0; i < nodesIds.length; i++) { // 遍历集群中的节点
final String nodeId = nodesIds[i];
final int idx = i;
final DiscoveryNode node = clusterState.nodes().nodes().get(nodeId);
try {
// 如果是local节点
if (nodeId.equals("_local") || nodeId.equals(clusterState.nodes().localNodeId())) {
threadPool.executor(executor()).execute(new Runnable() {
@Override
public void run() {
try { // nodeOperation() -> TransportNodesStatsAction.nodeOperation()
onOperation(idx, nodeOperation(newNodeRequest(clusterState.nodes().localNodeId(), request)));
} catch (Throwable e) {
onFailure(idx, clusterState.nodes().localNodeId(), e);
}
}
});
} else if (nodeId.equals("_master")) {
// 如果是master节点
threadPool.executor(executor()).execute(new Runnable() {
@Override
public void run() {
try {
onOperation(idx, nodeOperation(newNodeRequest(clusterState.nodes().masterNodeId(), request)));
} catch (Throwable e) {
onFailure(idx, clusterState.nodes().masterNodeId(), e);
}
}
});
} else {
// 节点为null
if (node == null) {
onFailure(idx, nodeId, new NoSuchNodeException(nodeId));
} else if (!clusterService.localNode().shouldConnectTo(node)) {
// 连不上local节点
onFailure(idx, nodeId, new NodeShouldNotConnectException(clusterService.localNode(), node));
} else {
NodeRequest nodeRequest = newNodeRequest(nodeId, request);
// 向节点发送nodeRequest请求
transportService.sendRequest(node, transportNodeAction, nodeRequest, transportRequestOptions, new BaseTransportResponseHandler<NodeResponse>() {
@Override
public void handleResponse(NodeResponse response) {
onOperation(idx, response);
}
});
}
}
} catch (Throwable t) {
onFailure(idx, nodeId, t);
}
}
}
}
}
resolveNodes()方法用来解析传入的节点id,并返回节点集合,可以包含如下节点:
(1)节点id为空(null或者"")或者为"_all",则为集群中的所有节点
(2)节点id为"_local",则为当前节点
(3)节点id为"_master",则为master节点
(4)节点id为集群中已经存在的节点id,则为nodeId
(5)节点id和集群中节点名称能用通配符*匹配到,则为匹配到节点的nodeId
(6)节点id匹配到集群中节点的ip或者主机名,则为匹配到节点的nodeId
(7)如果节点id中包含":",如果为"data:true"为添加集群中的数据节点,"data:false"为删除集群中的数据节点;如果为"master:true"为添加集群中的master节点,"master:false"为删除集群中的master节点;否则用key和value匹配集群中节点的属性信息,添加匹配到的nodeId
接着会遍历上一步中得到的所有节点,如果目的节点是"_local"或者"master",则直接执行nodeOperation()方法,否则使用TransportService将请求发送到目的节点中执行
3. Action操作细节
TransportNodesOperationAction类中的nodeOperation()是个抽象方法,因此会执行子类TransportNodesHotThreadsAction的nodeOperation()方法。
主要是构造HotThreads对象,然后调用其detect()方法。
public class TransportNodesHotThreadsAction extends TransportNodesOperationAction<NodesHotThreadsRequest, NodesHotThreadsResponse, TransportNodesHotThreadsAction.NodeRequest, NodeHotThreads> {
@Override
protected NodeHotThreads nodeOperation(NodeRequest request) throws ElasticsearchException {
HotThreads hotThreads = new HotThreads()
.busiestThreads(request.request.threads)
.type(request.request.type)
.interval(request.request.interval)
.threadElementsSnapshotCount(request.request.snapshots)
.ignoreIdleThreads(request.request.ignoreIdleThreads);
try {
return new NodeHotThreads(clusterService.localNode(), hotThreads.detect());
} catch (Exception e) {
throw new ElasticsearchException("failed to detect hot threads", e);
}
}
}
探测耗时线程的具体逻辑主要在innerDetect()方法中,主要逻辑如下:
(1)首先探测一下线程,然后隔interval时间(默认500ms)再次探测一下线程
(2)根据线程类型(cpu、wait和block)对线程集合按cpu时间进行排序
(3)根据线程类型计算cpu时间,并以interval为基础计算时间占比
public class HotThreads {
public String detect() throws Exception {
synchronized (mutex) {
return innerDetect();
}
}
private String innerDetect() throws Exception {
StringBuilder sb = new StringBuilder();
sb.append("Hot threads at ");
sb.append(DATE_TIME_FORMATTER.printer().print(System.currentTimeMillis()));
sb.append(", interval=");
sb.append(interval);
sb.append(", busiestThreads=");
sb.append(busiestThreads);
sb.append(", ignoreIdleThreads=");
sb.append(ignoreIdleThreads);
sb.append(":\n");
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
boolean enabledCpu = false;
try {
if (threadBean.isThreadCpuTimeSupported()) {
if (!threadBean.isThreadCpuTimeEnabled()) {
enabledCpu = true;
threadBean.setThreadCpuTimeEnabled(true);
}
} else {
throw new IllegalStateException("MBean doesn't support thread CPU Time");
}
Map<Long, MyThreadInfo> threadInfos = new HashMap<>();
for (long threadId : threadBean.getAllThreadIds()) {
// ignore our own thread...
if (Thread.currentThread().getId() == threadId) {
continue;
}
long cpu = threadBean.getThreadCpuTime(threadId);
if (cpu == -1) {
continue;
}
ThreadInfo info = threadBean.getThreadInfo(threadId, 0);
if (info == null) {
continue;
}
threadInfos.put(threadId, new MyThreadInfo(cpu, info));
}
Thread.sleep(interval.millis());
for (long threadId : threadBean.getAllThreadIds()) {
// ignore our own thread...
if (Thread.currentThread().getId() == threadId) {
continue;
}
long cpu = threadBean.getThreadCpuTime(threadId);
if (cpu == -1) {
threadInfos.remove(threadId);
continue;
}
ThreadInfo info = threadBean.getThreadInfo(threadId, 0);
if (info == null) {
threadInfos.remove(threadId);
continue;
}
MyThreadInfo data = threadInfos.get(threadId);
if (data != null) {
data.setDelta(cpu, info);
} else {
threadInfos.remove(threadId);
}
}
// sort by delta CPU time on thread.
List<MyThreadInfo> hotties = new ArrayList<>(threadInfos.values());
final int busiestThreads = Math.min(this.busiestThreads, hotties.size());
// skip that for now
CollectionUtil.introSort(hotties, new Comparator<MyThreadInfo>() {
public int compare(MyThreadInfo o1, MyThreadInfo o2) {
if ("cpu".equals(type)) {
return (int) (o2.cpuTime - o1.cpuTime);
} else if ("wait".equals(type)) {
return (int) (o2.waitedTime - o1.waitedTime);
} else if ("block".equals(type)) {
return (int) (o2.blockedTime - o1.blockedTime);
}
throw new IllegalArgumentException();
}
});
// analyse N stack traces for M busiest threads
long[] ids = new long[busiestThreads];
for (int i = 0; i < busiestThreads; i++) {
MyThreadInfo info = hotties.get(i);
ids[i] = info.info.getThreadId();
}
ThreadInfo[][] allInfos = new ThreadInfo[threadElementsSnapshotCount][];
for (int j = 0; j < threadElementsSnapshotCount; j++) {
// NOTE, javadoc of getThreadInfo says: If a thread of the given ID is not alive or does not exist,
// null will be set in the corresponding element in the returned array. A thread is alive if it has
// been started and has not yet died.
allInfos[j] = threadBean.getThreadInfo(ids, Integer.MAX_VALUE);
Thread.sleep(threadElementsSnapshotDelay.millis());
}
for (int t = 0; t < busiestThreads; t++) {
long time = 0;
if ("cpu".equals(type)) {
time = hotties.get(t).cpuTime;
} else if ("wait".equals(type)) {
time = hotties.get(t).waitedTime;
} else if ("block".equals(type)) {
time = hotties.get(t).blockedTime;
}
String threadName = null;
for (ThreadInfo[] info : allInfos) {
if (info != null && info[t] != null) {
if (ignoreIdleThreads && isIdleThread(info[t])) {
info[t] = null;
continue;
}
threadName = info[t].getThreadName();
break;
}
}
if (threadName == null) {
continue; // thread is not alive yet or died before the first snapshot - ignore it!
}
double percent = (((double) time) / interval.nanos()) * 100;
sb.append(String.format(Locale.ROOT, "%n%4.1f%% (%s out of %s) %s usage by thread '%s'%n", percent, TimeValue.timeValueNanos(time), interval, type, threadName));
// for each snapshot (2nd array index) find later snapshot for same thread with max number of
// identical StackTraceElements (starting from end of each)
boolean[] done = new boolean[threadElementsSnapshotCount];
for (int i = 0; i < threadElementsSnapshotCount; i++) {
if (done[i]) continue;
int maxSim = 1;
boolean[] similars = new boolean[threadElementsSnapshotCount];
for (int j = i + 1; j < threadElementsSnapshotCount; j++) {
if (done[j]) continue;
int similarity = similarity(allInfos[i][t], allInfos[j][t]);
if (similarity > maxSim) {
maxSim = similarity;
similars = new boolean[threadElementsSnapshotCount];
}
if (similarity == maxSim) similars[j] = true;
}
// print out trace maxSim levels of i, and mark similar ones as done
int count = 1;
for (int j = i + 1; j < threadElementsSnapshotCount; j++) {
if (similars[j]) {
done[j] = true;
count++;
}
}
if (allInfos[i][t] != null) {
final StackTraceElement[] show = allInfos[i][t].getStackTrace();
if (count == 1) {
sb.append(String.format(Locale.ROOT, " unique snapshot%n"));
for (int l = 0; l < show.length; l++) {
sb.append(String.format(Locale.ROOT, " %s%n", show[l]));
}
} else {
sb.append(String.format(Locale.ROOT, " %d/%d snapshots sharing following %d elements%n", count, threadElementsSnapshotCount, maxSim));
for (int l = show.length - maxSim; l < show.length; l++) {
sb.append(String.format(Locale.ROOT, " %s%n", show[l]));
}
}
}
}
}
return sb.toString();
} finally {
if (enabledCpu) {
threadBean.setThreadCpuTimeEnabled(false);
}
}
}
}
这种方式可以推广到其他Java服务环境中,引入elasticsearch的程序只要创建HotThreads对象后,都可以调用innerDetect()方法显示热点线程。