百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程网 > 正文

Java定时调度机制 - ScheduledExecutorService

yuyutoo 2024-10-12 00:07 17 浏览 0 评论

我们知道,Java的定时调度可以通过Timer&TimerTask来实现。由于其实现的方式为单线程,因此从JDK1.3发布之后就一直存在一些问题,大致如下:

  1. 多个任务之间会相互影响
  2. 多个任务的执行是串行的,性能较低

ScheduledExecutorService在设计之初就是为了解决Timer&TimerTask的这些问题。因为天生就是基于多线程机制,所以任务之间不会相互影响(只要线程数足够。当线程数不足时,有些任务会复用同一个线程)。

除此之外,因为其内部使用的延迟队列,本身就是基于等待/唤醒机制实现的,所以CPU并不会一直繁忙。同时,多线程带来的CPU资源复用也能极大地提升性能。

如何使用

基本作用

因为ScheduledExecutorService继承于ExecutorService,所以本身支持线程池的所有功能。额外还提供了4种方法,我们来看看其作用。

/**
 * 带延迟时间的调度,只执行一次
 * 调度之后可通过Future.get()阻塞直至任务执行完毕
 */
1. public ScheduledFuture<?> schedule(Runnable command,
 long delay, TimeUnit unit);
/**
 * 带延迟时间的调度,只执行一次
 * 调度之后可通过Future.get()阻塞直至任务执行完毕,并且可以获取执行结果
 */
2. public <V> ScheduledFuture<V> schedule(Callable<V> callable,
 long delay, TimeUnit unit);
/**
 * 带延迟时间的调度,循环执行,固定频率
 */
3. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
 long initialDelay,
 long period,
 TimeUnit unit);
/**
 * 带延迟时间的调度,循环执行,固定延迟
 */
4. public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
 long initialDelay,
 long delay,
 TimeUnit unit);

1. schedule Runnable

该方法用于带延迟时间的调度,只执行一次。调度之后可通过Future.get()阻塞直至任务执行完毕。我们来看一个例子。

@Test public void test_schedule4Runnable() throws Exception {
 ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
 ScheduledFuture future = service.schedule(() -> {
 try {
 Thread.sleep(3000L);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 System.out.println("task finish time: " + format(System.currentTimeMillis()));
 }, 1000, TimeUnit.MILLISECONDS);
 System.out.println("schedule finish time: " + format(System.currentTimeMillis()));
 System.out.println("Runnable future's result is: " + future.get() +
 ", and time is: " + format(System.currentTimeMillis()));
}

上述代码达到的效果应该是这样的:延迟执行时间为1秒,任务执行3秒,任务只执行一次,同时通过Future.get()阻塞直至任务执行完毕。

我们运行看到的效果的确和我们猜想的一样,如下图所示。

2. schedule Callable

在schedule Runnable的基础上,我们将Runnable改为Callable来看一下。

@Test public void test_schedule4Callable() throws Exception {
 ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
 ScheduledFuture<String> future = service.schedule(() -> {
 try {
 Thread.sleep(3000L);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 System.out.println("task finish time: " + format(System.currentTimeMillis()));
 return "success";
 }, 1000, TimeUnit.MILLISECONDS);
 System.out.println("schedule finish time: " + format(System.currentTimeMillis()));
 System.out.println("Callable future's result is: " + future.get() +
 ", and time is: " + format(System.currentTimeMillis()));
}

运行看到的结果和Runnable基本相同,唯一的区别在于future.get()能拿到Callable返回的真实结果。

3. scheduleAtFixedRate

该方法用于固定频率地对一个任务循环执行,我们通过一个例子来看看效果。

@Test public void test_scheduleAtFixedRate() {
 ScheduledExecutorService service = Executors.newScheduledThreadPool(5);
 service.scheduleAtFixedRate(() -> {
 try {
 Thread.sleep(3000L);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 System.out.println("task finish time: " + format(System.currentTimeMillis()));
 }, 1000L, 1000L, TimeUnit.MILLISECONDS);
 System.out.println("schedule finish time: " + format(System.currentTimeMillis()));
 while (true) {
 }
}

在这个例子中,任务初始延迟1秒,任务执行3秒,任务执行间隔为1秒。我们来看看执行结果:

4. scheduleWithFixedDelay

该方法用于固定延迟地对一个任务循环执行,我们通过一个例子来看看效果。

@Test public void test_scheduleWithFixedDelay() {
 ScheduledExecutorService service = Executors.newScheduledThreadPool(5);
 service.scheduleWithFixedDelay(() -> {
 try {
 Thread.sleep(3000L);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 System.out.println("task finish time: " + format(System.currentTimeMillis()));
 }, 1000L, 1000L, TimeUnit.MILLISECONDS);
 System.out.println("schedule finish time: " + format(System.currentTimeMillis()));
 while (true) {
 }
}

在这个例子中,任务初始延迟1秒,任务执行3秒,任务执行间隔为1秒。我们来看看执行结果:

5. scheduleAtFixedRate和scheduleWithFixedDelay的区别

既然这两个方法都是对任务循环执行,那么他们又有何区别呢?通过jdk文档我们找到了答案。


直白地讲,scheduleAtFixedRate()为固定频率,scheduleWithFixedDelay()为固定延迟。固定频率是相对于任务执行的开始时间,而固定延迟是相对于任务执行的结束时间,这就是他们最根本的区别!

另外,从3和4的运行结果也能看出这些差异。

源码阅读初体验

一般源码的入口在于构造方法,我们来看看。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
 return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
 new DelayedWorkQueue());
}

在构造方法中我们看到以下信息:

  1. ScheduledThreadPoolExecutor构造方法最终调用的是ThreadPoolExecutor构造方法
  2. 阻塞队列使用的是DelayedWorkQueue

上述信息的第2点至关重要,但是限于篇幅,本文将不做深入分析。

接下来我们看看scheduleWithFixedDelay()方法,主要做了3件事情:

  1. 入参校验,包括空指针、数字范围
  2. 将Runnable包装成RunnableScheduledFuture
  3. 延迟执行RunnableScheduledFuture
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
 long initialDelay,
 long delay,
 TimeUnit unit) {
 // 1. 入参校验,包括空指针、数字范围
 if (command == null || unit == null)
 throw new NullPointerException();
 if (delay <= 0)
 throw new IllegalArgumentException();
 // 2. 将Runnable包装成`RunnableScheduledFuture`
 ScheduledFutureTask<Void> sft =
 new ScheduledFutureTask<Void>(command,
 null,
 triggerTime(initialDelay, unit),
 unit.toNanos(-delay));
 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
 sft.outerTask = t;
 // 3. 延迟执行`RunnableScheduledFuture`
 delayedExecute(t);
 return t;
}

delayedExecute()这个方法从字面描述来看是延迟执行的意思,我们深入到这个方法里面去看看。

private void delayedExecute(RunnableScheduledFuture<?> task) {
 // 1. 线程池运行状态判断
 if (isShutdown())
 reject(task);
 else {
 // 2. 将任务添加到队列
 super.getQueue().add(task);
 // 3. 如果任务添加到队列之后,线程池状态变为非运行状态,
 // 需要将任务从队列移除,同时通过任务的`cancel()`方法来取消任务
 if (isShutdown() &&
 !canRunInCurrentRunState(task.isPeriodic()) &&
 remove(task))
 task.cancel(false);
 // 4. 如果任务添加到队列之后,线程池状态是运行状态,需要提前启动线程
 else
 ensurePrestart();
 }
}

在线程池状态正常的情况下,最终会调用ensurePrestart()方法来完成线程的创建。主要逻辑有两个:

  1. 当前线程数未达到核心线程数,则创建核心线程
  2. 当前线程数已达到核心线程数,则创建非核心线程,不会将任务放到阻塞队列中,这一点是和普通线程池是不相同的
/**
 * Same as prestartCoreThread except arranges that at least one
 * thread is started even if corePoolSize is 0.
 */
void ensurePrestart() {
 int wc = workerCountOf(ctl.get());
 // 1. 当前线程数未达到核心线程数,则创建核心线程
 if (wc < corePoolSize)
 addWorker(null, true);
 // 2. 当前线程数已达到核心线程数,则创建非核心线程,
 // 2.1 不会将任务放到阻塞队列中,这一点是和普通线程池是不相同的
 else if (wc == 0)
 addWorker(null, false);
}

至此,除了DelayedWorkQueue延迟队列的源码还未分析,其他的我们都分析完了。

总结

首先,我们了解了ScheduledExecutorService的基本作用,然后在此基础上写了一些demo来做验证,得到的结果和基本作用是完全相同的。

然后,我们对其内部的实现原理和源代码做了初步的分析,知道了其和普通线程池是不同的地方在于:阻塞队列和创建线程的方式。

相关推荐

ETCD 故障恢复(etc常见故障)

概述Kubernetes集群外部ETCD节点故障,导致kube-apiserver无法启动。...

在Ubuntu 16.04 LTS服务器上安装FreeRADIUS和Daloradius的方法

FreeRADIUS为AAARadiusLinux下开源解决方案,DaloRadius为图形化web管理工具。...

如何排查服务器被黑客入侵的迹象(黑客 抓取服务器数据)

---排查服务器是否被黑客入侵需要系统性地检查多个关键点,以下是一份详细的排查指南,包含具体命令、工具和应对策略:---###**一、快速初步检查**####1.**检查异常登录记录**...

使用 Fail Ban 日志分析 SSH 攻击行为

通过分析`fail2ban`日志可以识别和应对SSH暴力破解等攻击行为。以下是详细的操作流程和关键分析方法:---###**一、Fail2ban日志位置**Fail2ban的日志路径因系统配置...

《5 个实用技巧,提升你的服务器安全性,避免被黑客盯上!》

服务器的安全性至关重要,特别是在如今网络攻击频繁的情况下。如果你的服务器存在漏洞,黑客可能会利用这些漏洞进行攻击,甚至窃取数据。今天我们就来聊聊5个实用技巧,帮助你提升服务器的安全性,让你的系统更...

聊聊Spring AI Alibaba的YuQueDocumentReader

序本文主要研究一下SpringAIAlibaba的YuQueDocumentReaderYuQueDocumentReader...

Mac Docker环境,利用Canal实现MySQL同步ES

Canal的使用使用docker环境安装mysql、canal、elasticsearch,基于binlog利用canal实现mysql的数据同步到elasticsearch中,并在springboo...

RustDesk:开源远程控制工具的技术架构与全场景部署实战

一、开源远程控制领域的革新者1.1行业痛点与解决方案...

长安汽车一代CS75Plus2020款安装高德地图7.5

不用破解原车机,一代CS75Plus2020款,安装车机版高德地图7.5,有红绿灯读秒!废话不多讲,安装步骤如下:一、在拨号状态输入:在电话拨号界面,输入:*#518200#*(进入安卓设置界面,...

Zookeeper使用详解之常见操作篇(zookeeper ui)

一、Zookeeper的数据结构对于ZooKeeper而言,其存储结构类似于文件系统,也是一个树形目录服务,并通过Key-Value键值对的形式进行数据存储。其中,Key由斜线间隔的路径元素构成。对...

zk源码—4.会话的实现原理一(会话层的基本功能是什么)

大纲1.创建会话...

Zookeeper 可观测性最佳实践(zookeeper能够确保)

Zookeeper介绍ZooKeeper是一个开源的分布式协调服务,用于管理和协调分布式系统中的节点。它提供了一种高效、可靠的方式来解决分布式系统中的常见问题,如数据同步、配置管理、命名服务和集群...

服务器密码错误被锁定怎么解决(服务器密码错几次锁)

#服务器密码错误被锁定解决方案当服务器因多次密码错误导致账户被锁定时,可以按照以下步骤进行排查和解决:##一、确认锁定状态###1.检查账户锁定状态(Linux)```bash#查看账户锁定...

zk基础—4.zk实现分布式功能(分布式zk的使用)

大纲1.zk实现数据发布订阅...

《死神魂魄觉醒》卡死问题终极解决方案:从原理到实战的深度解析

在《死神魂魄觉醒》的斩魄刀交锋中,游戏卡死犹如突现的虚圈屏障,阻断玩家与尸魂界的连接。本文将从技术架构、解决方案、预防策略三个维度,深度剖析卡死问题的成因与应对之策,助力玩家突破次元壁障,畅享灵魂共鸣...

取消回复欢迎 发表评论: