动态线程池(DynamicTp),动态调整 Tomcat、Jetty、Undertow 线程池参数篇 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
请不要在回答技术问题时复制粘贴 AI 生成的内容
yanhomlin
V2EX    程序员

动态线程池(DynamicTp),动态调整 Tomcat、Jetty、Undertow 线程池参数篇

  •  2
     
  •   yanhomlin
    yanhom1314 2022-04-26 16:28:49 +08:00 1555 次点击
    这是一个创建于 1281 天前的主题,其中的信息可能已经有所发展或是发生改变。

    大家好,这篇文章我们来介绍下动态线程池框架( DynamicTp )的 adapter 模块,上篇文章也大概介绍过了,该模块主要是用来适配一些第三方组件的线程池管理,让第三方组件内置的线程池也能享受到动态参数调整,监控告警这些增强功能。


    DynamicTp 项目地址

    目前近 1k star ,感谢你的 star ,欢迎 pr ,业务之余给开源贡献一份力量

    gitee 地址https://gitee.com/yanhom/dynamic-tp

    github 地址https://github.com/lyh200/dynamic-tp


    系列文章

    美团动态线程池实践思路,开源了

    动态线程池框架( DynamicTp ),监控及源码解析篇


    adapter 已接入组件

    adapter 模块目前已经接入了 SpringBoot 内置的三大 WebServer ( Tomcat 、Jetty 、Undertow )的线程池管理,实现层面也是和核心模块做了解耦,利用 spring 的事件机制进行通知监听处理。

    可以看出有两个监听器

    1. 当监听到配置中心配置变更时,在更新我们项目内部线程池后会发布一个 RefreshEvent 事件,DtpWebRefreshListener 监听到该事件后会去更新对应 WebServer 的线程池参数。

    2. 同样监控告警也是如此,在 DtpMonitor 中执行监控任务时会发布 CollectEvent 事件,DtpWebCollectListener 监听到该事件后会去采集相应 WebServer 的线程池指标数据。

    要想去管理第三方组件的线程池,首先肯定要对这些组件有一定的熟悉度,了解整个请求的一个处理过程,找到对应处理请求的线程池,这些线程池不一定是 JUC 包下的 ThreadPoolExecutor 类,也可能是组件自己实现的线程池,但是基本原理都差不多。

    Tomcat 、Jetty 、Undertow 这三个都是这样,他们并没有直接使用 JUC 提供的线程池实现,而是自己实现了一套,或者扩展了 JUC 的实现;翻源码找到相应的线程池后,然后看有没有暴露 public 方法供我们调用获取,如果没有就需要考虑通过反射来拿了。


    Tomcat 内部线程池的实现

    • Tomcat 内部线程池没有直接使用 JUC 下的 ThreadPoolExecutor ,而是选择继承 JUC 下的 Executor 体系类,然后重写 execute()等方法,不同版本有差异。

    1.继承 JUC 原生 ThreadPoolExecutor ( 9.0.50 版本及以下),并覆写了一些方法,主要 execute()和 afterExecute()

    2.继承 JUC 的 AbstractExecutorService ( 9.0.51 版本及以上),代码基本是拷贝 JUC 的 ThreadPoolExecutor ,也相应的微调了 execute()方法

    注意 Tomcat 实现的线程池类名称也叫 ThreadPoolExecutor ,名字跟 JUC 下的是一样的,Tomcat 的 ThreadPoolExecutor 类 execute()方法如下:

    public void execute(Runnable command, long timeout, TimeUnit unit) { submittedCount.incrementAndGet(); try { super.execute(command); } catch (RejectedExecutionException rx) { if (super.getQueue() instanceof TaskQueue) { final TaskQueue queue = (TaskQueue)super.getQueue(); try { if (!queue.force(command, timeout, unit)) { submittedCount.decrementAndGet(); throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull")); } } catch (InterruptedException x) { submittedCount.decrementAndGet(); throw new RejectedExecutionException(x); } } else { submittedCount.decrementAndGet(); throw rx; } } } 

    可以看出他是先调用父类的 execute()方法,然后捕获 RejectedExecutionException 异常,再去判断如果任务队列类型是 TaskQueue ,则尝试将任务添加到任务队列中,如果添加失败,证明队列已满,然后再执行拒绝策略,此处 submittedCount 是一个原子变量,记录提交到此线程池但未执行完成的任务数(主要在下面要提到的 TaskQueue 队列的 offer()方法用),为什么要这样设计呢?继续往下看!

    • Tomcat 定义了阻塞队列 TaskQueue 继承自 LinkedBlockingQueue ,该队列主要重写了 offer()方法。
     @Override public boolean offer(Runnable o) { //we can't do any checks if (parent==null) return super.offer(o); //we are maxed out on threads, simply queue the object if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o); //we have idle threads, just add it to the queue if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o); //if we have less threads than maximum force creation of a new thread if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false; //if we reached here, we need to add it to the queue return super.offer(o); } 

    可以看到他在入队之前做了几个判断,这里的 parent 就是所属的线程池对象

    1.如果 parent 为 null ,直接调用父类 offer 方法入队

    2.如果当前线程数等于最大线程数,则直接调用父类 offer()方法入队

    3.如果当前未执行的任务数量小于等于当前线程数,仔细思考下,是不是说明有空闲的线程呢,那么直接调用父类 offer()入队后就马上有线程去执行它

    4.如果当前线程数小于最大线程数量,则直接返回 false ,然后回到 JUC 线程池的执行流程回想下,是不是就去添加新线程去执行任务了呢

    5.其他情况都直接入队

    • 因为 Tomcat 线程池主要是来做 IO 任务的,做这一切的目的主要也是为了以最小代价的改动更好的支持 IO 密集型的场景,JUC 自带的线程池主要是适合于 CPU 密集型的场景,可以回想一下 JUC 原生线程池 ThreadPoolExecutor#execute()方法的执行流程

    1.判断如果当前线程数小于核心线程池,则新建一个线程来处理提交的任务

    2.如果当前线程数大于核心线程数且队列没满,则将任务放入任务队列等待执行

    3.如果当前当前线程池数大于核心线程池,小于最大线程数,且任务队列已满,则创建新的线程执行提交的任务

    4.如果当前线程数等于最大线程数,且队列已满,则拒绝该任务

    可以看出当当前线程数大于核心线程数时,JUC 原生线程池首先是把任务放到队列里等待执行,而不是先创建线程执行。

    如果 Tomcat 接收的请求数量大于核心线程数,请求就会被放到队列中,等待核心线程处理,这样会降低请求的总体处理速度,所以 Tomcat 并没有使用 JUC 原生线程池,利用 TaskQueue 的 offer()方法巧妙的修改了 JUC 线程池的执行流程,改写后 Tomcat 线程池执行流程如下:

    1.判断如果当前线程数小于核心线程池,则新建一个线程来处理提交的任务

    2.如果当前当前线程池数大于核心线程池,小于最大线程数,则创建新的线程执行提交的任务

    3.如果当前线程数等于最大线程数,则将任务放入任务队列等待执行

    4.如果队列已满,则执行拒绝策略

    • Tomcat 核心线程池有对应的获取方法,获取方式如下
     public Executor doGetTp(WebServer webServer) { TomcatWebServer tomcatWebServer = (TomcatWebServer) webServer; return tomcatWebServer.getTomcat().getConnector().getProtocolHandler().getExecutor(); } 
    • 想要动态调整 Tomcat 线程池的线程参数,可以在引入 DynamicTp 依赖后,在配置文件中添加以下配置就行,参数名称也是和 SpringBoot 提供的 Properties 配置类参数相同,配置文件完整示例看项目 readme 介绍
    spring: dynamic: tp: // 其他配置项 tomcatTp: # tomcat web server 线程池配置 minSpare: 100 # 核心线程数 max: 400 # 最大线程数 

    Tomcat 线程池就介绍到这里吧,通过以上的一些介绍想必大家对 Tomcat 线程池执行任务的流程都很清楚了吧。


    Jetty 内部线程池的实现

    • Jetty 内部线程池,定义了一个继承自 Executor 的 ThreadPool 顶级接口,实现类有以下几个

    • 内部主要使用 QueuedThreadPool 这个实现类,该线程池执行流程就不在详细解读了,感兴趣的可以自己去看源码,核心思想都差不多,围绕核心线程数、最大线程数、任务队列三个参数入手,跟 Tocmat 比对着来看,其实也挺简单的。
    public void execute(Runnable job) { // Determine if we need to start a thread, use and idle thread or just queue this job int startThread; while (true) { // Get the atomic counts long counts = _counts.get(); // Get the number of threads started (might not yet be running) int threads = AtomicBiInteger.getHi(counts); if (threads == Integer.MIN_VALUE) throw new RejectedExecutionException(job.toString()); // Get the number of truly idle threads. This count is reduced by the // job queue size so that any threads that are idle but areabout to take // a job from the queue are not counted. int idle = AtomicBiInteger.getLo(counts); // Start a thread if we have insufficient idle threads to meet demand // and we are not at max threads. startThread = (idle <= 0 && threads < _maxThreads) ? 1 : 0; // The job will be run by an idle thread when available if (!_counts.compareAndSet(counts, threads + startThread, idle + startThread - 1)) continue; break; } if (!_jobs.offer(job)) { // reverse our changes to _counts. if (addCounts(-startThread, 1 - startThread)) LOG.warn("{} rejected {}", this, job); throw new RejectedExecutionException(job.toString()); } if (LOG.isDebugEnabled()) LOG.debug("queue {} startThread={}", job, startThread); // Start a thread if one was needed while (startThread-- > 0) startThread(); } 
    • Jetty 线程池有提供 public 的获取方法,获取方式如下
     public Executor doGetTp(WebServer webServer) { JettyWebServer jettyWebServer = (JettyWebServer) webServer; return jettyWebServer.getServer().getThreadPool(); } 
    • 想要动态调整 Jetty 线程池的线程参数,可以在引入 DynamicTp 依赖后,在配置文件中添加以下配置就行,参数名称也是和 SpringBoot 提供的 Properties 配置类参数相同,配置文件完整示例看项目 readme 介绍
    spring: dynamic: tp: // 其他配置项 jettyTp: # jetty web server 线程池配置 min: 100 # 核心线程数 max: 400 # 最大线程数 

    Undertow 内部线程池的实现

    • Undertow 因为其性能彪悍,轻量,现在用的还是挺多的,wildfly (前身 Jboss )从 8 开始内部默认的 WebServer 用 Undertow 了,之前是 Tomcat 吧。了解 Undertow 的小伙伴应该知道,他底层是基于 XNIO 框架( 3.X 之前)来做的,这也是 Jboss 开发的一款基于 java nio 的优秀网络框架。但 Undertow 宣布从 3.0 开始底层网络框架要切换成 Netty 了,官方给的原因是说起网络编程,Netty 已经是事实上标准,用 Netty 的好处远大于 XNIO 能提供的,所以让我们期待 3.0 的发布吧,只可惜三年前就宣布了,至今也没动静,不知道是夭折了还是咋的,说实话,改动也挺大的,看啥时候发布吧,以下的介绍是基于 Undertow 2.x 版本来的

    • Undertow 内部是定义了一个叫 TaskPool 的线程池顶级接口,该接口有如图所示的几个实现。其实这几个实现类都是采用组合的方式,内部都维护一个 JUC 的 Executor 体系类或者维护 Jboss 提供的 EnhancedQueueExecutor 类(也继承 JUC ExecutorService 类),执行流程可以自己去分析

    • 具体的创建代码如下,根据外部是否传入,如果有传入则用外部传入的类,如果没有,根据参数设置内部创建一个,具体是用 JUC 的 ThreadPoolExecutor 还是 Jboss 的 EnhancedQueueExecutor ,根据配置参数选择

    • Undertow 线程池没有提供 public 的获取方法,所以通过反射来获取,获取方式如下
     public Executor doGetTp(WebServer webServer) { UndertowWebServer undertowWebServer = (UndertowWebServer) webServer; Field undertowField = ReflectionUtils.findField(UndertowWebServer.class, "undertow"); if (Objects.isNull(undertowField)) { return null; } ReflectionUtils.makeAccessible(undertowField); Undertow undertow = (Undertow) ReflectionUtils.getField(undertowField, undertowWebServer); if (Objects.isNull(undertow)) { return null; } return undertow.getWorker(); } 
    • 想要动态调整 Undertow 线程池的线程参数,可以在引入 DynamicTp 依赖后,在配置文件中添加以下配置就行,配置文件完整示例看项目 readme 介绍
    spring: dynamic: tp: // 其他配置项 undertowTp: # undertow web server 线程池配置 coreWorkerThreads: 100 # worker 核心线程数 maxWorkerThreads: 400 # worker 最大线程数 workerKeepAlive: 60 # 空闲线程超时时间 

    总结

    以上介绍了 Tomcat 、Jetty 、Undertow 三大 WebServer 内置线程池的一些情况,重点介绍了 Tomcat 的,篇幅有限,其他两个感兴趣可以自己分析,原理都差不多。同时也介绍了基于 DynamicTp 怎么动态调整线程池的参数,当我们做 WebServer 性能调优时,能动态调整参数真的是非常好用的。

    再次欢迎大家使用 DynamicTp 框架,一起完善项目。

    下篇文章打算分享一个 DynamicTp 使用过程中因为 Tomcat 版本不一致导致的监控线程 halt 住的奇葩问题,通过一个问题来掌握 ScheduledExecutorService 的原理,欢迎大家持续关注。


    联系我

    欢迎加我微信或者关注公众号交流,一起变强!

    公众号:CodeFox

    微信:yanhom1314

    目前尚无回复
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     1607 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 25ms UTC 16:23 PVG 00:23 LAX 09:23 JFK 12:23
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86