Java自定义动态线程池


1.为什么需要动态线程池:

在项目中线程池是我们提高并发能力和高性能的重要手段;然而不恰当的使用线程池可能会起到反作用,如:

  1. 多个业务共用线程池可能会导致某个业务假死状态(线程池中的线程都被另一些业务占用,导致该业务长时间没有线程处理);
  2. 线程池的线程数设置太小则不能快速的处理业务;线程池线程设置太大则可能导致线程浪费。

因此我们在使用线程池的时候不同的业务不要共用线程池;并且设置合理的线程池参数。

在项目运行的过程中,固定的线程池参数可能会随着流量的变化从而性能也会随之产生变化,动态的监控和设置线程池参数变重要起来;一般我们可以使用开源的动态线程池框架,如:DynamicTpHippo4j等。当然我们也可以自定义一个动态线程池来使用。

2.自定义动态线程池:

博主自定义的线程池是基于mysql5.7+springboot2.7.6+mybatisplus动态配置实现的;

1)数据库设计:

CREATE TABLE `thread_pool` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
  `application_name` varchar(255) DEFAULT NULL COMMENT '应用名称',
  `thread_pool_name` varchar(255) DEFAULT NULL COMMENT '线程池名称',
  `core_size` int(11) DEFAULT NULL COMMENT '核心大小',
  `max_size` int(11) DEFAULT NULL COMMENT '最大大小',
  `queue_size` int(11) DEFAULT NULL COMMENT '队列大小',
  `keep_alive_time` bigint(20) DEFAULT NULL COMMENT '存活时间',
  `queue_elements_count` int(11) DEFAULT NULL COMMENT '队列元素数量',
  `queue_remaining_capacity` int(11) DEFAULT NULL COMMENT '队列剩余容量',
  `active_count` int(11) DEFAULT NULL COMMENT '存活的线程数',
  `task_count` bigint(20) DEFAULT NULL COMMENT '任务数',
  `completed_task_count` bigint(20) DEFAULT NULL COMMENT '完成的任务数',
  `largest_pool_size` int(11) DEFAULT NULL COMMENT '巅峰线程数',
  `load_pressure` decimal(3,2) DEFAULT NULL COMMENT '负载百分比',
  `create_dt` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_dt` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4;

2)entity类:

@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
@TableName("thread_pool")
public class ThreadPool {
    
  //id    
  @TableId(value = "id", type = IdType.AUTO)
  private Integer id;
    
  //应用名称    
  @TableField(value = "application_name")
  private String applicationName;
    
  //线程池名称    
  @TableField(value = "thread_pool_name")
  private String threadPoolName;
    
  //核心大小    
  @TableField(value = "core_size")
  private Integer coreSize;
    
  //最大大小    
  @TableField(value = "max_size")
  private Integer maxSize;
    
  //队列大小    
  @TableField(value = "queue_size")
  private Integer queueSize;
    
  //存活时间    
  @TableField(value = "keep_alive_time")
  private Long keepAliveTime;
    
  //队列元素数量    
  @TableField(value = "queue_elements_count")
  private Integer queueElementsCount;
    
  //队列剩余容量    
  @TableField(value = "queue_remaining_capacity")
  private Integer queueRemainingCapacity;
    
  //存活的线程数    
  @TableField(value = "active_count")
  private Integer activeCount;
    
  //任务数    
  @TableField(value = "task_count")
  private Long taskCount;
    
  //完成的任务数    
  @TableField(value = "completed_task_count")
  private Long completedTaskCount;
    
  //巅峰线程数    
  @TableField(value = "largest_pool_size")
  private Integer largestPoolSize;
    
  //负载百分比    
  @TableField(value = "load_pressure")
  private BigDecimal loadPressure;
    
  //创建时间    
  @TableField(value = "create_dt")
  private Date createDt;
    
  //修改时间    
  @TableField(value = "update_dt")
  private Date updateDt;
}

3)DAO类:

public interface ThreadPoolMapper extends BaseMapper<ThreadPool> {}

4)service类:

public interface ThreadPoolService extends IService<ThreadPool> {

    /**
     * 注册
     *
     * @param threadPool
     */
    void register(ThreadPool threadPool);
    
    /**
     * 更新线程池
     *
     * @param threadPool 线程池
     * @return {@link Boolean }
     */
    Boolean updateThreadPool(ThreadPool threadPool);
}
@Service("threadPoolService")
public class ThreadPoolServiceImpl extends ServiceImpl<ThreadPoolMapper, ThreadPool> implements ThreadPoolService {

    @Resource
    private ApplicationContext applicationContext;

    @Override
    public void register(ThreadPool threadPool) {
        save(threadPool);
    }

    @Override
    public Boolean updateThreadPool(ThreadPool threadPool) {
        boolean b = updateById(threadPool);
        if (b) {
            //更新容器内的线程池
            DynamicThreadPool tp = applicationContext.getBean(threadPool.getThreadPoolName(), DynamicThreadPool.class);
            tp.setCorePoolSize(threadPool.getCoreSize());
            tp.setMaximumPoolSize(threadPool.getMaxSize());
            tp.setKeepAliveTime(threadPool.getKeepAliveTime(), TimeUnit.MILLISECONDS);
            tp.setQueueSize(threadPool.getQueueSize());
            return true;
        }
        return false;
    }
}

5)controller类:

@Slf4j
@RestController
@RequestMapping("threadPool")
public class ThreadPoolController {
    /**
     * 服务对象
     */
    @Resource
    private ThreadPoolService threadPoolService;

    @Resource
    private DynamicThreadPool userThreadPoll;

    @GetMapping("test")
    public String test(){
        for (int i = 0; i < 10; i++) {
            userThreadPoll.execute(() -> {
                System.out.println("执行线程");
            });
        }
        return "test";
    }

    /**
     * 查询全部
     *
     * @return {@link Result }<{@link ThreadPool }>
     */
    @GetMapping("queryAll")
    public Result<ThreadPool> queryAll() {
        return Result.succeed(threadPoolService.list());
    }

    /**
     * 按 ID 查询
     *
     * @param id ID
     * @return {@link Result }<{@link ThreadPool }>
     */
    @GetMapping("queryById")
    public Result<ThreadPool> queryById(Integer id) {
        return Result.succeed(threadPoolService.getById(id));
    }

    /**
     * 更新
     *
     * @param threadPool 线程池
     * @return {@link Result }<{@link Boolean }>
     */
    @PostMapping("update")
    public Result<Boolean> update(ThreadPool threadPool) {
        return Result.succeed(threadPoolService.updateThreadPool(threadPool));
    }
	
}

6)自定义线程池:

@Getter
@Setter
public class DynamicThreadPool extends ThreadPoolExecutor {

    private Integer id;

    //线程池名称
    private String threadPoolName;

    //应用名称
    private String applicationName;

    //队列大小
    private Integer queueSize;

    public DynamicThreadPool(String applicationName, String threadPoolName, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, int queueSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new LinkedBlockingQueue<>(queueSize), threadFactory, handler);
        this.applicationName = applicationName;
        this.threadPoolName = threadPoolName;
        this.queueSize = queueSize;
    }

}

7)线程池配置类:

@Configuration
public class BaseConfig {

    private ThreadPool threadPool;

    @Value("${spring.application.name}")
    private String applicationName;

    @PostConstruct
    public void init() {
        threadPool = threadPool
                .builder()
                .threadPoolName("userThreadPoll")
                .coreSize(4)
                .maxSize(8)
                .keepAliveTime(5000L)
                .queueSize(6)
                .build();
    }

    @Bean
    public DynamicThreadPool userThreadPoll() {
        DynamicThreadPool dynamicThreadPool = new DynamicThreadPool(
                applicationName,
                "userThreadPoll",
                threadPool.getCoreSize(),
                threadPool.getMaxSize(),
                threadPool.getKeepAliveTime(),
                TimeUnit.MILLISECONDS,
                threadPool.getQueueSize(),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );
        return dynamicThreadPool;
    }
}

8)线程池监控类:

@Component
public class ThreadPoolTask implements ApplicationContextAware {

    @Resource
    private ThreadPoolService threadPoolService;

    private ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    /**
     * 每秒监控一次线程池
     */
    @Scheduled(fixedRate = 1000)
    public void monitorThreadPool() {
        Map<String, DynamicThreadPool> beansOfType = applicationContext.getBeansOfType(DynamicThreadPool.class);
        beansOfType.entrySet().forEach(e -> {
            String name = e.getKey();
            DynamicThreadPool dynamicThreadPool = e.getValue();
            LinkedBlockingQueue queue = (LinkedBlockingQueue) dynamicThreadPool.getQueue();
            ThreadPool tp = ThreadPool.builder()
                    .id(dynamicThreadPool.getId())
                    //.applicationName(dynamicThreadPool.getApplicationName())
                    //.threadPoolName(name)
                    //.coreSize(dynamicThreadPool.getCorePoolSize())
                    //.maxSize(dynamicThreadPool.getMaximumPoolSize())
                    //.keepAliveTime(dynamicThreadPool.getKeepAliveTime(TimeUnit.MILLISECONDS))
                    //.queueSize(dynamicThreadPool.getQueueSize())
                    .queueElementsCount(queue.size())
                    .queueRemainingCapacity(queue.remainingCapacity())
                    .activeCount(dynamicThreadPool.getActiveCount())
                    .taskCount(dynamicThreadPool.getTaskCount())
                    .completedTaskCount(dynamicThreadPool.getCompletedTaskCount())
                    .largestPoolSize(dynamicThreadPool.getLargestPoolSize())
                    .loadPressure(new BigDecimal((double) dynamicThreadPool.getActiveCount() / (double) dynamicThreadPool.getMaximumPoolSize()).setScale(2, BigDecimal.ROUND_HALF_UP))
                    .build();
            threadPoolService.updateById(tp);
        });
    }
}

9)项目启动监听器:

项目启动时创建线程池。

/**
 * @description: 线程池监听器,初始化线程池并记录线程池参数
 * @Title: ThreadPoolListener
 * @Author xlw
 * @Package xlw.test.dynamicthreadpoll_demo.config
 * @Date 2024/3/16 22:11
 */
@Component
@Slf4j
public class ThreadPoolListener implements ApplicationListener<ApplicationStartedEvent> {

    @Resource
    private ThreadPoolService threadPoolService;

    @Override
    public void onApplicationEvent(ApplicationStartedEvent event) {
        log.info("注册线程池");
        ConfigurableApplicationContext applicationContext = event.getApplicationContext();
        Map<String, DynamicThreadPool> beansOfType = applicationContext.getBeansOfType(DynamicThreadPool.class);
        beansOfType.entrySet().forEach(e -> {
            String name = e.getKey();
            DynamicThreadPool dynamicThreadPool = e.getValue();
            LinkedBlockingQueue queue = (LinkedBlockingQueue) dynamicThreadPool.getQueue();
            ThreadPool tp = ThreadPool.builder()
                    .applicationName(dynamicThreadPool.getApplicationName())
                    .threadPoolName(name)
                    .coreSize(dynamicThreadPool.getCorePoolSize())
                    .maxSize(dynamicThreadPool.getMaximumPoolSize())
                    .keepAliveTime(dynamicThreadPool.getKeepAliveTime(TimeUnit.MILLISECONDS))
                    .queueSize(dynamicThreadPool.getQueueSize())
                    .queueElementsCount(queue.size())
                    .queueRemainingCapacity(queue.remainingCapacity())
                    .activeCount(dynamicThreadPool.getActiveCount())
                    .taskCount(dynamicThreadPool.getTaskCount())
                    .completedTaskCount(dynamicThreadPool.getCompletedTaskCount())
                    .largestPoolSize(dynamicThreadPool.getLargestPoolSize())
                    .loadPressure(new BigDecimal((double) dynamicThreadPool.getActiveCount() / (double) dynamicThreadPool.getMaximumPoolSize()).setScale(2, BigDecimal.ROUND_HALF_UP))
                    .build();
            threadPoolService.register(tp);
            dynamicThreadPool.setId(tp.getId());
        });
    }
}

3.结语:

动态线程池的基本原理是动态的修改spring容器内的线程池对象参数;以上代码仅提供思路,功能尚未完善,例如动态修改线程池队列大小还未实现,如需引用可自行修改增强动态线程池功能。


文章作者: 威@猫
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 威@猫 !
评论
  目录