线程池如何观测?这个方案让你对线程池的运行情况了如指掌!

标签: 开源软件 java 线程池 监控 | 发表时间:2022-01-17 16:04 | 作者:铂赛东
出处:https://segmentfault.com/blogs

今天我们来聊一个比较实用的话题,动态可监控可观测的线程池实践。

这是个全新的开源项目,作者提供了一种非常好的思路解决了线程池的可观测问题。

这个开源项目叫: DynamicTp

地址在文章末尾。


写在前面

稍微有些Java编程经验的小伙伴都知道,Java的精髓在juc包,这是大名鼎鼎的Doug Lea老爷子的杰作,评价一个程序员Java水平怎么样,一定程度上看他对juc包下的一些技术掌握的怎么样,这也是面试中的基本上必问的一些技术点之一。

juc包主要包括:

1.原子类(AtomicXXX)

2.锁类(XXXLock)

3.线程同步类(AQS、CountDownLatch、CyclicBarrier、Semaphore、Exchanger)

4.任务执行器类(Executor体系类,包括今天的主角ThreadPoolExecutor)

5.并发集合类(ConcurrentXXX、CopyOnWriteXXX)相关集合类

6.阻塞队列类(BlockingQueue继承体系类)

7.Future相关类

8.其他一些辅助工具类

多线程编程场景下,这些类都是必备技能,会这些可以帮助我们写出高质量、高性能、少bug的代码,同时这些也是Java中比较难啃的一些技术,需要持之以恒,学以致用,在使用中感受他们带来的奥妙。

上边简单罗列了下juc包下功能分类,这篇文章我们主要来介绍动态可监控线程池的,所以具体内容也就不展开讲了,以后有时间单独来聊吧。看这篇文章前,希望读者最好有一定的线程池ThreadPoolExecutor使用经验,不然看起来会有点懵。

如果你对ThreadPoolExecutor不是很熟悉,推荐阅读下面两篇文章

javadoop: https://www.javadoop.com/post/java-thread-pool

美团技术博客: https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html


背景

使用ThreadPoolExecutor过程中你是否有以下痛点呢?

1.代码中创建了一个ThreadPoolExecutor,但是不知道那几个核心参数设置多少比较合适

2.凭经验设置参数值,上线后发现需要调整,改代码重启服务,非常麻烦

3.线程池相对开发人员来说是个黑盒,运行情况不能感知到,直到出现问题

如果你有以上痛点,这篇文章要介绍的动态可监控线程池(DynamicTp)或许能帮助到你。

如果看过ThreadPoolExecutor的源码,大概可以知道其实它有提供一些set方法,可以在运行时动态去修改相应的值,这些方法有:

  public void setCorePoolSize(int corePoolSize);
public void setMaximumPoolSize(int maximumPoolSize);
public void setKeepAliveTime(long time, TimeUnit unit);
public void setThreadFactory(ThreadFactory threadFactory);
public void setRejectedExecutionHandler(RejectedExecutionHandler handler);

现在大多数的互联网项目其实都会微服务化部署,有一套自己的服务治理体系,微服务组件中的分布式配置中心扮演的就是动态修改配置,实时生效的角色。那么我们是否可以结合配置中心来做运行时线程池参数的动态调整呢?答案是肯定的,而且配置中心相对都是高可用的,使用它也不用过于担心配置推送出现问题这类事儿,而且也能减少研发动态线程池组件的难度和工作量。

综上,我们总结出以下的背景

  • 广泛性:在Java开发中,想要提高系统性能,线程池已经是一个90%以上的人都会选择使用的基础工具
  • 不确定性:项目中可能会创建很多线程池,既有IO密集型的,也有CPU密集型的,但线程池的参数并不好确定;需要有套机制在运行过程中动态去调整参数
  • 无感知性,线程池运行过程中的各项指标一般感知不到;需要有套监控报警机制在事前、事中就能让开发人员感知到线程池的运行状况,及时处理
  • 高可用性,配置变更需要及时推送到客户端;需要有高可用的配置管理推送服务,配置中心是现在大多数互联网系统都会使用的组件,与之结合可以大幅度减少开发量及接入难度

简介

我们基于配置中心对线程池ThreadPoolExecutor做一些扩展,实现对运行中线程池参数的动态修改,实时生效;以及实时监控线程池的运行状态,触发设置的报警策略时报警,报警信息会推送办公平台(钉钉、企微等)。报警维度包括(队列容量、线程池活性、拒绝触发等);同时也会定时采集线程池指标数据供监控平台可视化使用。使我们能时刻感知到线程池的负载,根据情况及时调整,避免出现问题影响线上业务。

      |  __ \                            (_) |__   __|
    | |  | |_   _ _ __   __ _ _ __ ___  _  ___| |_ __  
    | |  | | | | | '_ \ / _` | '_ ` _ | |/ __| | '_ \ 
    | |__| | |_| | | | | (_| | | | | | | | (__| | |_) |
    |_____/ __, |_| |_|__,_|_| |_| |_|_|___|_| .__/ 
             __/ |                              | |    
            |___/                               |_|    
     :: Dynamic Thread Pool :: 

特性

  • 参考美团线程池实践 ,对线程池参数动态化管理,增加监控、报警功能
  • 基于Spring框架,现只支持SpringBoot项目使用,轻量级,引入starter即可食用
  • 基于配置中心实现线程池参数动态调整,实时生效;集成主流配置中心,默认支持Nacos、Apollo,同时也提供SPI接口可自定义扩展实现
  • 内置通知报警功能,提供多种报警维度(配置变更通知、活性报警、容量阈值报警、拒绝策略触发报警),默认支持企业微信、钉钉报警,同时提供SPI接口可自定义扩展实现
  • 内置线程池指标采集功能,支持通过MicroMeter、JsonLog日志输出、Endpoint三种方式,可通过SPI接口自定义扩展实现

架构设计

主要分四大模块

  • 配置变更监听模块:

    1.监听特定配置中心的指定配置文件(默认实现Nacos、Apollo),可通过内部提供的SPI接口扩展其他实现

    2.解析配置文件内容,内置实现yml、properties配置文件的解析,可通过内部提供的SPI接口扩展其他实现

    3.通知线程池管理模块实现刷新

  • 线程池管理模块:

    1.服务启动时从配置中心拉取配置信息,生成线程池实例注册到内部线程池注册中心中

    2.监听模块监听到配置变更时,将变更信息传递给管理模块,实现线程池参数的刷新

    3.代码中通过getExecutor()方法根据线程池名称来获取线程池对象实例

  • 监控模块:

    实现监控指标采集以及输出,默认提供以下三种方式,也可通过内部提供的SPI接口扩展其他实现

    1.默认实现Json log输出到磁盘

    2.MicroMeter采集,引入MicroMeter相关依赖

    3.暴雷Endpoint端点,可通过http方式访问

  • 通知告警模块:

    对接办公平台,实现通告告警功能,默认实现钉钉、企微,可通过内部提供的SPI接口扩展其他实现,通知告警类型如下

    1.线程池参数变更通知

    2.阻塞队列容量达到设置阈值告警

    3.线程池活性达到设置阈值告警

    4.触发拒绝策略告警


使用

  • maven依赖

        <dependency>
         <groupId>io.github.lyh200</groupId>
         <artifactId>dynamic-tp-spring-cloud-starter</artifactId>
         <version>1.0.2-RELEASE</version>
    </dependency>
  • 线程池配置

        spring:
      dynamic:
        tp:
          enabled: true
          enabledBanner: true        # 是否开启banner打印,默认true
          enabledCollect: false      # 是否开启监控指标采集,默认false
          collectorType: logging     # 监控数据采集器类型(JsonLog | MicroMeter),默认logging
          logPath: /home/logs        # 监控日志数据路径,默认${user.home}/logs
          monitorInterval: 5         # 监控时间间隔(报警判断、指标采集),默认5s
          nacos:                     # nacos配置,不配置有默认值(规则name-dev.yml这样)
            dataId: dynamic-tp-demo-dev.yml
            group: DEFAULT_GROUP
          apollo:                    # apollo配置,不配置默认拿apollo配置第一个namespace
            namespace: dynamic-tp-demo-dev.yml
          configType: yml            # 配置文件类型
          platforms:                 # 通知报警平台配置
            - platform: wechat
              urlKey: 3a7500-1287-4bd-a798-c5c3d8b69c  # 替换
              receivers: test1,test2                   # 接受人企微名称
            - platform: ding
              urlKey: f80dad441fcd655438f4a08dcd6a     # 替换
              secret: SECb5441fa6f375d5b9d21           # 替换,非sign模式可以没有此值
              receivers: 15810119805                   # 钉钉账号手机号          
          executors:                                   # 动态线程池配置
            - threadPoolName: dynamic-tp-test-1
              corePoolSize: 6
              maximumPoolSize: 8
              queueCapacity: 200
              queueType: VariableLinkedBlockingQueue   # 任务队列,查看源码QueueTypeEnum枚举类
              rejectedHandlerType: CallerRunsPolicy    # 拒绝策略,查看RejectedTypeEnum枚举类
              keepAliveTime: 50
              allowCoreThreadTimeOut: false
              threadNamePrefix: test           # 线程名前缀
              notifyItems:                     # 报警项,不配置自动会配置(变更通知、容量报警、活性报警、拒绝报警)
                - type: capacity               # 报警项类型,查看源码 NotifyTypeEnum枚举类
                  enabled: true
                  threshold: 80                # 报警阈值
                  platforms: [ding,wechat]     # 可选配置,不配置默认拿上层platforms配置的所以平台
                  interval: 120                # 报警间隔(单位:s)
                - type: change
                  enabled: true
                - type: liveness
                  enabled: true
                  threshold: 80
                - type: reject
                  enabled: true
                  threshold: 1
  • 代码方式生成,服务启动会自动注册

        @Configuration
    public class DtpConfig {
    
       @Bean
       public DtpExecutor demo1Executor() {
           return DtpCreator.createDynamicFast("demo1-executor");
      }
    
       @Bean
       public ThreadPoolExecutor demo2Executor() {
           return ThreadPoolBuilder.newBuilder()
                  .threadPoolName("demo2-executor")
                  .corePoolSize(8)
                  .maximumPoolSize(16)
                  .keepAliveTime(50)
                  .allowCoreThreadTimeOut(true)
                  .workQueue(QueueTypeEnum.SYNCHRONOUS_QUEUE.getName(), null, false)
                  .rejectedExecutionHandler(RejectedTypeEnum.CALLER_RUNS_POLICY.getName())
                  .buildDynamic();
      }
    }
  • 代码调用,根据线程池名称获取

        public static void main(String[] args) {
           DtpExecutor dtpExecutor = DtpRegistry.getExecutor("dynamic-tp-test-1");
           dtpExecutor.execute(() -> System.out.println("test"));
    }

注意事项

  1. 配置文件配置的参数会覆盖通过代码生成方式配置的参数
  2. 阻塞队列只有VariableLinkedBlockingQueue类型可以修改capacity,该类型功能和LinkedBlockingQueue相似,只是capacity不是final类型,可以修改,
    VariableLinkedBlockingQueue参考RabbitMq的实现
  3. 启动看到如下日志输出证明接入成功

        
    |  __ \                            (_) |__   __|   
    | |  | |_   _ _ __   __ _ _ __ ___  _  ___| |_ __  
    | |  | | | | | '_ \ / _` | '_ ` _ | |/ __| | '_ \ 
    | |__| | |_| | | | | (_| | | | | | | | (__| | |_) |
    |_____/ __, |_| |_|__,_|_| |_| |_|_|___|_| .__/ 
             __/ |                              | |    
            |___/                               |_|    
     :: Dynamic Thread Pool :: 
    
    DynamicTp register, executor: DtpMainPropWrapper(dtpName=dynamic-tp-test-1, corePoolSize=6, maxPoolSize=8, keepAliveTime=50, queueType=VariableLinkedBlockingQueue, queueCapacity=200, rejectType=RejectedCountableCallerRunsPolicy, allowCoreThreadTimeOut=false)
  4. 配置变更会推送通知消息,且会高亮变更的字段

        
    DynamicTp [dynamic-tp-test-1] refresh end, changed keys: [corePoolSize, queueCapacity], corePoolSize: [6 => 4], maxPoolSize: [8 => 8], queueType: [VariableLinkedBlockingQueue => VariableLinkedBlockingQueue], queueCapacity: [200 => 2000], keepAliveTime: [50s => 50s], rejectedType: [CallerRunsPolicy => CallerRunsPolicy], allowsCoreThreadTimeOut: [false => false]

通知报警

触发报警阈值会推送相应报警消息(活性、容量、拒绝),且会高亮显示相应字段

配置变更会推送通知消息,且会高亮变更的字段


监控日志

通过collectType属性配置监控指标采集类型,默认 logging

  • MicroMeter:通过引入相关MicroMeter依赖采集到相应的平台
    (如Prometheus,InfluxDb...)
  • Logging:定时采集指标数据以Json日志格式输出磁盘,地址${logPath}/dy
    namictp/${appName}.monitor.log

        2022-01-11 00:25:20.599 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":0,"queueSize":0,"largestPoolSize":0,"poolSize":0,"rejectHandlerName":"RejectedCountableCallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":0,"taskCount":0,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":0,"dtpName":"remoting-call","maximumPoolSize":8}
    2022-01-11 00:25:25.603 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":0,"queueSize":0,"largestPoolSize":0,"poolSize":0,"rejectHandlerName":"RejectedCountableCallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":0,"taskCount":0,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":0,"dtpName":"remoting-call","maximumPoolSize":8}
    2022-01-11 00:25:30.609 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":0,"queueSize":0,"largestPoolSize":0,"poolSize":0,"rejectHandlerName":"RejectedCountableCallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":0,"taskCount":0,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":0,"dtpName":"remoting-call","maximumPoolSize":8}
    2022-01-11 00:25:35.613 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":0,"queueSize":0,"largestPoolSize":0,"poolSize":0,"rejectHandlerName":"RejectedCountableCallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":0,"taskCount":0,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":0,"dtpName":"remoting-call","maximumPoolSize":8}
    2022-01-11 00:25:40.616 INFO [dtp-monitor-thread-1:d.m.log] {"activeCount":0,"queueSize":0,"largestPoolSize":0,"poolSize":0,"rejectHandlerName":"RejectedCountableCallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":0,"taskCount":0,"queueRemainingCapacity":1024,"corePoolSize":6,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":0,"dtpName":"remoting-call","maximumPoolSize":8}
  • 暴露EndPoint端点(dynamic-tp),可以通过http方式请求

        [
        {
            "dtp_name": "remoting-call",
            "core_pool_size": 6,
            "maximum_pool_size": 12,
            "queue_type": "SynchronousQueue",
            "queue_capacity": 0,
            "queue_size": 0,
            "fair": false,
            "queue_remaining_capacity": 0,
            "active_count": 0,
            "task_count": 21760,
            "completed_task_count": 21760,
            "largest_pool_size": 12,
            "pool_size": 6,
            "wait_task_count": 0,
            "reject_count": 124662,
            "reject_handler_name": "CallerRunsPolicy"
        },
        {
            "max_memory": "228 MB",
            "total_memory": "147 MB",
            "free_memory": "44.07 MB",
            "usable_memory": "125.07 MB"
        }
    ]

项目地址

gitee地址: https://gitee.com/yanhom/dynamic-tp-spring-cloud-starter

github地址https://github.com/lyh200/dynamic-tp-spring-cloud-starter


联系作者

对项目有什么想法或者建议,可以在上述地址中加到作者微信进行交流,或者创建issues,一起完善项目!

最后,支持的话还望大家去点个star哦。

相关 [线程池 何观 线程池] 推荐:

Java线程池

- - 企业架构 - ITeye博客
线程的使用在java中占有极其重要的地位,在jdk1.4极其之前的jdk版本中,关于线程池的使用是极其简陋的. 在jdk1.5之后这一情况有了很大的改观. Jdk1.5之后加入了java.util.concurrent包,这个包中主要介绍java中线程以及线程池的使用. 为我们在开发中处理线程的问题提供了非常大的帮助.

Java 线程池

- - 编程语言 - ITeye博客
在项目中,系统启动一个新线程的成本是比较高的,因为它涉及与操作系统交互. 在这种情形下,使用线程池可以很好地提高性能,尤其是当程序中需要创建大量生存周期很短的线程时,更应该考虑使用线程池. 使用线程池可以有效地控制系统中并发线程的数量,当系统中包含大量并发线程时,会导致系统性能剧烈下降,甚至导致JVM崩溃,而线程池的最大线程数参数可以控制系统中并发线程数不超过此数.

线程池如何观测?这个方案让你对线程池的运行情况了如指掌!

- - SegmentFault 最新的文章
今天我们来聊一个比较实用的话题,动态可监控可观测的线程池实践. 这是个全新的开源项目,作者提供了一种非常好的思路解决了线程池的可观测问题. 这个开源项目叫: DynamicTp. 稍微有些Java编程经验的小伙伴都知道,Java的精髓在juc包,这是大名鼎鼎的Doug Lea老爷子的杰作,评价一个程序员Java水平怎么样,一定程度上看他对juc包下的一些技术掌握的怎么样,这也是面试中的基本上必问的一些技术点之一.

java线程池分析

- - BlogJava-首页技术区
    在Java 5.0之前启动一个任务是通过调用Thread类的start()方法来实现的,任务的提于交和执行是同时进行的,如果你想对任务的执行进行调度或是控制 同时执行的线程数量就需要额外编写代码来完成. 5.0里提供了一个新的任务执行架构使你可以轻松地调度和控制任务的执行,并且可以建立一个类似数据库连接 池的线程池来执行任务.

Java线程池应用

- - CSDN博客架构设计推荐文章
1.减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务. 2.可以根据系统的承受能力,调整线程池中工作线线程的数目,防止因为消耗过多的内存,而把服务器累趴下(每个线程需要大约1MB内存,线程开的越多,消耗的内存也就越大,最后死机). Java里面线程池的顶级接口是Executor,但是严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具.

Java线程池总结

- - Java - 编程语言 - ITeye博客
  假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间. 当T1 + T3 远大于 T2时,采用多线程技术可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力.     线程池就是一个线程的容器,每次只执行额定数量的线程, 线程池作用就是限制系统中执行线程的数量.

java 线程池原理及几种线程池详解

- - CSDN博客综合推荐文章
服务器经常出现处理大量单个任务处理的时间很短而请求的数目却是巨大的请求. 构建服务器应用程序的一个过于简单的模型应该是:每当一个请求到达就创建一个新线程,然后在新线程中为请求服务. 实际上,对于原型开发这种方法工作得很好,但如果试图部署以这种方式运行的服务器应用程序,那么这种方法的严重不足就很明显.

Spring提供的线程池支持

- - 博客园_首页
核心提示:一旦企业应用越来越复杂时(比如,基于流程服务器的EIS),它们对相关技术也提出了更高的要求. 在使用 EJB 3.0组件技术开发企业应用过程中,它们能够享受到EJB容器提供的线程池、任务调度(@Timeout)服务. 现如今,运行于Web容器的Web应用、单独的桌面应用. 一旦企业应用越来越复杂时(比如,基于流程服务器的EIS),它们对相关技术也提出了更高的要求.

用线程池启动定时器

- - BlogJava-首页技术区
(1)调用ScheduledExecutorService的schedule方法,返回的ScheduleFuture对象可以取消任务. (2)支持间隔重复任务的定时方式,不直接支持绝对定时方式,需要转换成相对时间方式.             System.out.println("响");           .

Web容器线程池机制小议

- - ITeye博客
从刚开始学习java,我们就被告知Java是一种支持多线程的语言,每条程序指令都会在一个线程中执行,而启动主线程的入口,是可执行类中的main方法. 我们可以在main方法或其调用的方法中创建新的线程以实现多线程、并发处理的效果. Java入门资料上介绍线程时往往会说明一点,创建线程不是免费的,是有成本的--对内存的消耗、对CPU切换调度的消耗都是成本,所以像数据库连接池这类“创建昂贵型”资源一样,创建好的线程优先被复用而不是每次都创建新的,这就是线程池出现的原因.