FunTester Web3j 异步导致 JVM 无法退出 BUG 分享

FunTester · 2023年08月21日 · 2123 次阅读

最近在学习和使用Web3j的过程中,发现一个非常奇怪的现象,当我使用了sendAsync()方法后,JVM 进程一直无法退出。

一开始怀疑自己的代码有问题(因为引入了FunTester框架的 jar 包),开始注释了自己写的学习性质的测试代码后,问题依然存在。我这才意识到问题的严重性,可能不是我的问题。然后我又重新将代码和Web3j解耦开之后发现问题就消失了。

我基本判断就是Web3j某个API调用导致的这个问题。所以我开始使用排除法,最终确定了org.web3j.protocol.core.Request#sendAsync就是罪魁祸首。

Web3j 异步回调

针对 JVM 无法退出的问题,之前也遇到过,大概率就是有线程或者线程池在运行或者没有及时关闭回收。而在使用Web3j异步时候肯定无法避免用到线程池。我们先来看看Web3j是如何实现的异步回调的。

第一层:

public CompletableFuture<T> sendAsync() {  
    return web3jService.sendAsync(this, responseType);  
}

第二层:

@Override  
public <T extends Response> CompletableFuture<T> sendAsync(  
        Request jsonRpc20Request, Class<T> responseType) {  
    return Async.run(() -> send(jsonRpc20Request, responseType));  
}

第三层:

public static <T> CompletableFuture<T> run(Callable<T> callable) {  
    CompletableFuture<T> result = new CompletableFuture<>();  
    CompletableFuture.runAsync(  
            () -> {  
                // we need to explicitly catch any exceptions,  
                // otherwise they will be silently discarded                try {  
                    result.complete(callable.call());  
                } catch (Throwable e) {  
                    result.completeExceptionally(e);  
                }  
            },  
            executor);  
    return result;  
}

最终代码锁定在第三层,位置是org.web3j.utils.Async

这个类的首先定义了一个private static final ExecutorService executor = Executors.newCachedThreadPool();,源代码如下:

public static ExecutorService newCachedThreadPool() {  
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  
                                  60L, TimeUnit.SECONDS,  
                                  new SynchronousQueue<Runnable>());  
}

这是个最大长度为java.lang.Integer#MAX_VALUE的缓存线程池,活跃时间keepAliveTime=60s,这个时间请记住,坑就在这里。

一般使用线程池也都不建议直接使用这个 API,都是通过原生的构造方法java.util.concurrent.ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, java.util.concurrent.TimeUnit, java.util.concurrent.BlockingQueue<java.lang.Runnable>)创建符合业务的线程池,不可能把最大值设置到java.lang.Integer#MAX_VALUE

看了一下,并没有提供外部调用的关闭org.web3j.utils.Async#executor的 API,Web3j通过注册一个ShutdownHook实现的。如下:

static {  
    Runtime.getRuntime().addShutdownHook(new Thread(() -> shutdown(executor)));  
}

这样当 JVM 要结束时,就可以直接调用org.web3j.utils.Async#shutdown来关闭线程池。这个结束大概就是其他无关线程都结束(我了解到的GC线程daemon线程),精确的解释可以参考之前发过关于ShutdownHook的文章里面官方注释。

理论上这个方案是可以实现关闭线程池需求的,咋一看并不能看到什么问题。

BUG 排查

通常遇到这种问题,首先观察线程状态,本地话可以使用jvisualvm或者jconsole,我选择了前者,因为提示比较靠前。

我对比了同步和异步线程状态,并没有看出来问题。只要选择 dump 线程了。最终发现了一个处于TIMED_WAITING状态的线程,堆栈如下:


"pool-4-thread-1" #16 prio=5 os_prio=31 tid=0x00007fdedf121000 nid=0x5803 waiting on condition [0x000070000ade4000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x000000076ebe82f0> (a java.util.concurrent.SynchronousQueue$TransferStack)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
        at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
        at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:941)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1073)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

这里我们可以看到,这是一个线程池的线程,而且经过判断就是org.web3j.utils.Async#executor这个线程池的线程。如果有一个线程一直没有结束的话,那么就不会执行ShutdownHook注册的钩子了。

元凶就是它。

ThreadPoolExecutor 源码分析

根据堆栈信息找到了java.util.concurrent.ThreadPoolExecutor#getTask方法,这是一个从任务等待队列中获取任务的方法,内容如下:

private Runnable getTask() {  
    boolean timedOut = false; // Did the last poll() time out?  

    for (;;) {  
        int c = ctl.get();  
        int rs = runStateOf(c);  

        // Check if queue empty only if necessary.  
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {  
            decrementWorkerCount();  
            return null;  
        }  

        int wc = workerCountOf(c);  

        // Are workers subject to culling?  
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;  

        if ((wc > maximumPoolSize || (timed && timedOut))  
            && (wc > 1 || workQueue.isEmpty())) {  
            if (compareAndDecrementWorkerCount(c))  
                return null;  
            continue;  
        }  

        try {  
            Runnable r = timed ?  
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :  
                workQueue.take();  
            if (r != null)  
                return r;  
            timedOut = true;  
        } catch (InterruptedException retry) {  
            timedOut = false;  
        }  
    }  
}

其中workQueue定义类型java.util.concurrent.ThreadPoolExecutor#workQueue。在最后一个try-catch代码块中,java.util.concurrent.BlockingQueue#poll方法有个超时设置,刚好就是keepAliveTime=60s,真相大白,就是这个代码让 JVM 等待 60s 之后才会选择关闭。

复现

上面分享的代码有点多,下面是我根据结论写的一个复现 Case:

import com.funtester.frame.SourceCode  

import java.util.concurrent.CompletableFuture  
import java.util.concurrent.ExecutorService  
import java.util.concurrent.Executors  

class Tas extends SourceCode {  

    static void main(String[] args) {  
        ExecutorService executor = Executors.newCachedThreadPool();  
        addShutdownHook {  
            output("over")  
            executor.shutdown()  
        }  
        def future = new CompletableFuture<Integer>()  
        executor.execute({  
            future.complete(324)  
        })  
        future.get()  
    }  
}

解决办法

主要思路 2 个:

  1. 避免使用Web3j的异步功能,采取自实现异步。
  2. 使用反射直接关闭java.util.concurrent.ThreadPoolExecutor#execute

下面就不分享解决方案的代码,本文含码量太大。有兴趣的可以私信讨论。

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册