node.executorManager.getScheduledExecutorService().scheduleAtFixedRate(new Runnable() { publicvoid run() { for (CMap cMap : maps.values()) { cMap.startCleanup(false); } } }, 1, 1, TimeUnit.SECONDS);
第二种是通过配置文件来触发startCleanup的执行,配置 PutOperationhandlerif overcapacity policy,我们系统的配置文件没有配置这方面的policy,所有这种方式在我们系统中没有使用。
第三种是自己直接写代码去调用startCleanup函数(public方法,线程安全的). 这个没有实现在我们的系统中。
所以我的调查方向放在了第一种调用的情况,hazelcast里面的ScheduledExecutorService是通过java.util.ScheduledThreadPoolExecutor 来实现的.
esScheduled = new ScheduledThreadPoolExecutor(5, new ExecutorThreadFactory(node.threadGroup, node.getThreadPoolNamePrefix("scheduled"), classLoader), new RejectionHandler()) { protected void beforeExecute(Thread t, Runnable r) { threadPoolBeforeExecute(t, r); } }查看ScheduledThreadPoolExecutor的实现,它把线程实现分成了3个部分: runnable tasks可执行任务, workers to execute the tasks执行任务的具体线程 以及 ScheduledThreadPoolExecutor 调度workers按照要求执行runnable tasks。我们通过scheduleAtFixdRate提交了task,scheduleAtFixedRate先把它打包成重复执行的ScheduleFutureTask
<pre name="code" class="java"> public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); RunnableScheduledFuture<?> t = decorateTask(command, new <strong>ScheduledFutureTas</strong>k<Object>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period))); delayedExecute(t); return t; }
public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic); else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); <strong> reExecutePeriodic(outerTask);</strong> } }delayedExecute里面如果当前worker的数目小于初始化定义的CorePool的数目,就创建新的worker线程,然后把task放到queue里面
private void delayedExecute(Runnable command) { if (isShutdown()) { reject(command); return; } // Prestart a thread if necessary. We cannot prestart it // running the task because the task (probably) shouldn't be // run yet, so thread will just idle until delay elapses. if (getPoolSize() < getCorePoolSize()) prestartCoreThread(); <strong> super.getQueue().add(command);</strong> } public boolean prestartCoreThread() { return addIfUnderCorePoolSize(null); } private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < corePoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } return t != null; } private Thread addThread(Runnable firstTask) { Worker w = new Worker(firstTask); Thread t = threadFactory.newThread(w); boolean workerStarted = false; if (t != null) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); w.thread = t; workers.add(w); int nt = ++poolSize; if (nt > largestPoolSize) largestPoolSize = nt; try { t.start(); workerStarted = true; } finally { if (!workerStarted) workers.remove(w); } } return t; }所有启动的worker就做一件事情,从queue中取task执行
try { hasRun = true; Runnable task = firstTask; firstTask = null; while (task != null || (task = <strong>getTask</strong>()) != null) { <strong>runTask(task);</strong> task = null; } } finally { workerDone(this); } } } Runnable getTask() { <strong> for (;;) {</strong> try { int state = runState; if (state > SHUTDOWN) return null; Runnable r; if (state == SHUTDOWN) // Help drain queue r = workQueue.poll(); else if (poolSize > corePoolSize || allowCoreThreadTimeOut) r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); else <strong> r = workQueue.take();</strong> if (r != null) return r; if (workerCanExit()) { if (runState >= SHUTDOWN) // Wake up others interruptIdleWorkers(); return null; } // Else retry } catch (InterruptedException ie) { // On interruption, re-check runState } } } private void runTask(Runnable task) { final ReentrantLock runLock = this.runLock; runLock.lock(); try { if ((runState >= STOP || (Thread.interrupted() && runState >= STOP)) && hasRun) thread.interrupt(); boolean ran = false; beforeExecute(thread, task); <strong> try {; ran = true; afterExecute(task, null); ++completedTasks; } catch (RuntimeException ex) { if (!ran) afterExecute(task, ex); throw ex; }</strong> } finally { runLock.unlock(); } }了解了java threadpool的工作原理之后,我们可以知道,startCleanup是代码pass给ScheduledThreadPoolExecutor的runnable task,它不被执行,可能的原因有:
1. ScheduledThreadPoolExecutor初始化时候出错,task完全没有提交成功。由于lastCleanup并不是系统应用的启动时间,已经过了几个月了,所以,很明显在系统初始化的时候,esScheduled(ScheduledThreadPoolExecutor)还是正常工作的,只是突然在2月4号停止了工作,所以这种可能性可以排除。
2. Worker 没有正常工作,不在从ScheduledThreadPoolExecutor的queue里面取数据,这个很快就被我排除了:
首先heap dump中有5个pending workers in esScheduled (0/2/3/5/9):
其次从thread dump中可以看出,这五个线程都是在等着从queue里面取数据:
…… <strong> at java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.awaitNanos([optimiz</strong>ed] at java/util/concurrent/DelayQueue.take([optimized] at java/util/concurrent/ScheduledThreadPoolExecutor$DelayedWorkQueue.take([inlined] at java/util/concurrent/ScheduledThreadPoolExecutor$DelayedWorkQueue.take([optimized] at java/util/concurrent/ThreadPoolExecutor.getTask([optimized] at java/util/concurrent/ThreadPoolExecutor$ at java/lang/ at jrockit/vm/RNI.c2java(JJJJJ)V(Native Method) -- end of trace" id=51 idx=0xd8 tid=32639 prio=5 alive, parked, native_blocked" id=52 idx=0xdc tid=32640 prio=5 alive, parked, native_blocked" id=53 idx=0xe0 tid=32641 prio=5 alive, parked, native_blocked" id=75590 idx=0x3cc tid=3308 prio=5 alive, parked, native_blocked所以worker不正常也被排除了。
3. 我们提交给系统的runner task自动从queue里面消失了,从memory dump中确实发现queue没有tasks了
而没有task的原因很明显是因为当前task执行完之后没有重新reschedule,至于原因,由于scheduledFutrueTask已经不存在,无法从memory dump和thread dump中分析出结果,成为了一个谜。。。。。。
public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic); else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); <strong> reExecutePeriodic(outerTask);</strong> } }
实战Java内存泄漏问题分析 -- hazelcast2.0.3使用时内存泄漏 -- 2,布布扣,
实战Java内存泄漏问题分析 -- hazelcast2.0.3使用时内存泄漏 -- 2