Skip to content

Conversation

nadirvishun
Copy link

@nadirvishun nadirvishun commented Apr 11, 2022

Tomcat 不能 graceful shutdown中只提供了微信公众号的方法,这次是增加企业微信相关方法
Fixes #535

@WangDada2016
Copy link

请问shutDownExecutorService这个方法是手动调用,还是每次http关闭时会自动调用该方法?

@0katekate0
Copy link

请问下,这个shutDownExecutorService这个方法应该在哪里去调用?

@nadirvishun
Copy link
Author

我这边是整个spring项目关闭时才需要手动调用执行,而不是单个http请求就要关闭,而且是只有优雅关闭时才需要,如果强制结束进程调不调用都无所谓。处理的方式是用@PreDestroy来实现的,以提供的demo中的类为例子:

 /**  * 结束时关闭路由线程池  */ @PreDestroy public void destroy() { messageRouter.shutDownExecutorService(5); }
 /**  * 结束时关闭线程池  */ @PreDestroy public void destroy() { List<WxCpMessageRouter> messageRouterList = new ArrayList<>(routers.values()); if (messageRouterList.size() > 0) { for (WxCpMessageRouter messageRouter : messageRouterList) { messageRouter.shutDownExecutorService(5); } } }
@WangDada2016
Copy link

如果是整个spring项目关闭时才需要手动调用执行,并不能解决因为jvm的设置导致创建线程过多自动宕机的。以下是我的错误日志。
Current thread (0x00007f8c3d987800): JavaThread "WxCpTpMessageRouter-pool-1" [_thread_new, id=890402, stack(0x00007f87ccec4000,0x00007f87ccf45000)]

Stack: [0x00007f87ccec4000,0x00007f87ccf45000], sp=0x00007f87ccf437a0, free space=509k
Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native code)
V [libjvm.so+0xad21aa] VMError::report_and_die()+0x2ba
V [libjvm.so+0x5022db] report_vm_out_of_memory(char const*, int, unsigned long, VMErrorType, char const*)+0x8b
V [libjvm.so+0x92fe33] os::Linux::commit_memory_impl(char*, unsigned long, bool)+0x123
V [libjvm.so+0x92fefc] os::pd_commit_memory(char*, unsigned long, bool)+0xc
V [libjvm.so+0x927f8a] os::commit_memory(char*, unsigned long, bool)+0x2a
V [libjvm.so+0x92bc4f] os::pd_create_stack_guard_pages(char*, unsigned long)+0x7f
V [libjvm.so+0xa771ce] JavaThread::create_stack_guard_pages()+0x5e
V [libjvm.so+0xa81454] JavaThread::run()+0x34
V [libjvm.so+0x930198] java_start(Thread*)+0x108
C [libpthread.so.0+0x7ea5] start_thread+0xc5

--------------- P R O C E S S ---------------

Java Threads: ( => current thread )
=>0x00007f8c3d987800 JavaThread "WxCpTpMessageRouter-pool-1" [_thread_new, id=890402, stack(0x00007f87ccec4000,0x00007f87ccf45000)]
0x00007f8c3d985800 JavaThread "WxCpTpMessageRouter-pool-0" [_thread_blocked, id=890401, stack(0x00007f87ccf45000,0x00007f87ccfc6000)]
0x00007f8c3d983800 JavaThread "Thread-10870" daemon [_thread_blocked, id=890400, stack(0x00007f87ccfc6000,0x00007f87cd047000)]
0x00007f8c4cd83000 JavaThread "Keep-Alive-Timer" daemon [_thread_blocked, id=890378, stack(0x00007f87e02a7000,0x00007f87e0328000)]
0x00007f8c3df83000 JavaThread "WxCpTpMessageRouter-pool-1" [_thread_blocked, id=889734, stack(0x00007f87cd047000,0x00007f87cd0c8000)]
0x00007f8c3df80800 JavaThread "WxCpTpMessageRouter-pool-0" [_thread_blocked, id=889733, stack(0x00007f87cd0c8000,0x00007f87cd149000)]
0x00007f8c3df7e800 JavaThread "Thread-10869" daemon [_thread_blocked, id=889732, stack(0x00007f87cd149000,0x00007f87cd1ca000)]
0x00007f8c3291e800 JavaThread "WxCpTpMessageRouter-pool-1" [_thread_blocked, id=889731, stack(0x00007f87cd1ca000,0x00007f87cd24b000)]
0x00007f8c224fc000 JavaThread "WxCpTpMessageRouter-pool-1" [_thread_blocked, id=889730, stack(0x00007f87cd24b000,0x00007f87cd2cc000)]
0x00007f8c2cea1000 JavaThread "WxCpTpMessageRouter-pool-1" [_thread_blocked, id=889729, stack(0x00007f87cd2cc000,0x00007f87cd34d000)]
0x00007f8c2ce9f000 JavaThread "WxCpTpMessageRouter-pool-0" [_thread_blocked, id=889728, stack(0x00007f87cd34d000,0x00007f87cd3ce000)]
0x00007f8c2ce9d000 JavaThread "Thread-10868" daemon [_thread_blocked, id=889727, stack(0x00007f87cd3ce000,0x00007f87cd44f000)]
0x00007f8c4cd9a800 JavaThread "WxCpTpMessageRouter-pool-1" [_thread_blocked, id=889726, stack(0x00007f87cd44f000,0x00007f87cd4d0000)]
0x00007f8c4cd98800 JavaThread "WxCpTpMessageRouter-pool-0" [_thread_blocked, id=889725, stack(0x00007f87cd4d0000,0x00007f87cd551000)]
0x00007f8c4cd96800 JavaThread "Thread-10867" daemon [_thread_blocked, id=889724, stack(0x00007f87cd551000,0x00007f87cd5d2000)]
0x00007f8c224fa000 JavaThread "WxCpTpMessageRouter-pool-0" [_thread_blocked, id=889723, stack(0x00007f87cd5d2000,0x00007f87cd653000)]
0x00007f8c224f8000 JavaThread "Thread-10866" daemon [_thread_blocked, id=889722, stack(0x00007f87cd653000,0x00007f87cd6d4000)]
0x00007f8c3291c800 JavaThread "WxCpTpMessageRouter-pool-0" [_thread_blocked, id=889721, stack(0x00007f87cd6d4000,0x00007f87cd755000)]
0x00007f8c3291a800 JavaThread "Thread-10865" daemon [_thread_blocked, id=889720, stack(0x00007f87cd755000,0x00007f87cd7d6000)]
0x00007f8c111c0000 JavaThread "WxCpTpMessageRouter-pool-1" [_thread_blocked, id=889719, stack(0x00007f87cd7d6000,0x00007f87cd857000)]
0x00007f8c51172800 JavaThread "WxCpTpMessageRouter-pool-1" [_thread_blocked, id=889718, stack(0x00007f87cd857000,0x00007f87cd8d8000)]
0x00007f8c111be000 JavaThread "WxCpTpMessageRouter-pool-0" [_thread_blocked, id=889717, stack(0x00007f87cd8d8000,0x00007f87cd959000)]
0x00007f8c111bc000 JavaThread "Thread-10864" daemon [_thread_blocked, id=889716, stack(0x00007f87cd959000,0x00007f87cd9da000)]
0x00007f8c51170800 JavaThread "WxCpTpMessageRouter-pool-0" [_thread_blocked, id=889715, stack(0x00007f87cd9da000,0x00007f87cda5b000)]
0x00007f8c5116e800 JavaThread "Thread-10863" daemon [_thread_blocked, id=889714, stack(0x00007f87cda5b000,0x00007f87cdadc000)]
0x00007f8c3df7c800 JavaThread "WxCpTpMessageRouter-pool-1" [_thread_blocked, id=889032, stack(0x00007f87cdadc000,0x00007f87cdb5d000)]
0x00007f8c3d1c8000 JavaThread "WxCpTpMessageRouter-pool-0" [_thread_blocked, id=889031, stack(0x00007f87cdb5d000,0x00007f87cdbde000)]

@WangDada2016
Copy link

WangDada2016 commented Apr 24, 2022

目前我将shutDownExecutorService方法加在
private WxMpXmlOutMessage route(WxMpXmlMessage message) {
try {
return this.messageRouter.route(message);
} catch (Exception e) {
log.error("路由消息时出现异常!", e);
}finally {
this.messageRouter.shutDownExecutorService();
}
return null;
}

@nadirvishun
Copy link
Author

目前我将shutDownExecutorService方法加在 private WxMpXmlOutMessage route(WxMpXmlMessage message) { try { return this.messageRouter.route(message); } catch (Exception e) { log.error("路由消息时出现异常!", e); }finally { this.messageRouter.shutDownExecutorService(); } return null; }

这样不行吧?如果放到这里关闭了,下次再来一个消息怎么办?你在哪里重新创建线程池的?
创建线程过多导致宕机个人感觉可以从这两个方面入手:

  1. 自定义线程池,不要用系统自己默认的,合理设置开启的线程数和设置有界的队列和拒绝策略,可能会减轻消耗。
  2. 参考微信公众号用threadlocal来存储相关,将路由这个弄成单例的,这样就只有一个线程池而不会创建很多个。
@WangDada2016
Copy link

每次收到一个事件推送就会创建一次呀
protected static WxCpTpMessageRouter newRouter(WxCpTpService wxCpTpService) {
final val newRouter = new WxCpTpMessageRouter(wxCpTpService);
}
public WxCpTpMessageRouter(WxCpTpService wxCpTpService) {
this.wxCpTpService = wxCpTpService;
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("WxCpTpMessageRouter-pool-%d").build();
this.executorService = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_SIZE, DEFAULT_THREAD_POOL_SIZE,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), namedThreadFactory);
this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker();
this.sessionManager = wxCpTpService.getSessionManager();
this.exceptionHandler = new LogExceptionHandler();
}

@nadirvishun
Copy link
Author

demo中都是先建好存起来,用的时候直接调用,如果你那边能保证能新建,逻辑上倒是没什么问题,但是线程池的目的就是为了线程复用,避免线程创建销毁带来的性能消耗的。如果这样频繁创新关闭。。。。
实在不行可以看下你程序中真的有用到异步路由吗?如果没有用可以去掉相关的异步路由,我看代码中只有异步路由才会走线程池,所以去掉后就不会存在线程池的占用了。

@WangDada2016
Copy link

你可以debug看下,每次请求有没有新建的,我目前没有用到异步路由,也是有创建的

@nadirvishun
Copy link
Author

你可以debug看下,每次请求有没有新建的,我目前没有用到异步路由,也是有创建的

是说路由的新建吗?这个官方只是给了个demo,实际上每个人的实现可都不一样,如果是按照weixin-java-cp-demo来的话(这是企业微信自建应用的,没找到企业微信第三方的demo),每次来消息并不需要要新建啊。
然后同样是这个官方demo中有用到异步路由:

private WxCpMessageRouter newRouter(WxCpService wxCpService) { final val newRouter = new WxCpMessageRouter(wxCpService); // 记录所有事件的日志 (异步执行) newRouter.rule().handler(this.logHandler).next(); ... }
@WangDada2016
Copy link

WangDada2016 commented Apr 24, 2022

自建和第三方实现都是一样的,你可以把断点打在源码上,然后模拟请求发送,看源码的执行。
现在我已经改为自定义线程池了。
核心线程 8 最大线程 20 保活时间30s 存储队列 10 有守护线程 拒绝策略:将超负荷任务回退到调用者
默认使用核心线程(8)数执行任务,任务数量超过核心线程数就丢到队列,队列(10)满了就再开启新的线程,新的线程数最大为20,当任务执行完,新开启的线程将存活30s,若没有任务就消亡,线程池回到核心线程数量
private static final ExecutorService executor = new ThreadPoolExecutor(8, 20,
30L, TimeUnit.SECONDS, new LinkedBlockingQueue(10),
new ThreadFactoryBuilder().setNameFormat("myExecutor-WxCpTpMessageRouter-pool--%d").setDaemon(true).build(),
new ThreadPoolExecutor.CallerRunsPolicy());

@nadirvishun
Copy link
Author

我实在是没看出来哪里需要新建啊,不是在WxCpConfiguration中初始化时就按照应用来把路由创建好了,然后存到map中了吗?还有如果没有异步路由压根不会走线程池啊,不如你好好对比下和demo是否一致,或者是贴一下你写的配置类WxCpConfiguration 和入口类WxPortalController的代码看下啊。

@WangDada2016
Copy link

断点打在WxCpTpMessageRouter源码的构造方法上
1、public WxCpTpMessageRouter(WxCpTpService wxCpTpService) {
this.wxCpTpService = wxCpTpService;
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("WxCpTpMessageRouter-pool-%d").build();
this.executorService = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_SIZE, DEFAULT_THREAD_POOL_SIZE,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), namedThreadFactory);
this.messageDuplicateChecker = new WxMessageInMemoryDuplicateChecker();
this.sessionManager = wxCpTpService.getSessionManager();
this.exceptionHandler = new LogExceptionHandler();
}
2、 public WxCpXmlOutMessage route(final WxCpTpXmlMessage wxMessage, final Map<String, Object> context)
WxCpXmlOutMessage res = null;
final List futures = new ArrayList<>();
for (final WxCpTpMessageRouterRule rule : matchRules) {
// 返回最后一个非异步的rule的执行结果
if (rule.isAsync()) {
futures.add(
this.executorService.submit(() -> {
rule.service(wxMessage, context, WxCpTpMessageRouter.this.wxCpTpService, WxCpTpMessageRouter.this.sessionManager, WxCpTpMessageRouter.this.exceptionHandler);
})
);
其中rule.isAsync()设置的是false,在源码执行还是true,会进到线程池里面

@nadirvishun
Copy link
Author

对于第一个,如果你是按照demo来,这个只有在WxCpConfigurationinitServices方法按照应用个数来执行一次,然后存起来复用,而不是来一个消息就执行一次。
对于的二个,rule.isAsync()如果是false,为什么在源码中还是true啊,怎么可能啊?是不是有好多个rules,其中有一个是true啊,是true的那个就是异步的,如果对你的逻辑没用的话可以注释掉,这样就不走线程池了。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

4 participants