任务调度
任务调度模块提供了灵活的任务调度功能,支持 Cron 表达式、延迟执行和定时执行。帮助你轻松实现定时任务、周期性任务和延迟任务。
概述
任务调度模块包含以下核心组件:
- Scheduler - 任务调度器类,统一管理所有任务
- CronExpression / parseCron - Cron 表达式解析器,支持标准的 5 位 Cron 表达式
主要特性:
- ✅ 支持 Cron 表达式任务
- ✅ 支持延迟执行任务(执行一次)
- ✅ 支持定时执行任务(重复执行)
- ✅ 任务查询和管理
- ✅ 任务取消和停止
- ✅ 计算下次执行时间
Cron 表达式
parseCron
解析 Cron 表达式,创建可计算下一次执行时间的 CronExpression 实例。这是一个便捷函数,等同于 new CronExpression(expression)。
适用场景:
- 解析和验证 Cron 表达式
- 计算任务的下次执行时间
- 动态创建 Cron 任务
基本用法
typescript
import { parseCron } from '@cat-kit/be'
// 解析 Cron 表达式
const cron = parseCron('0 0 * * *') // 每天凌晨执行
// 计算下一次执行时间
const nextRun = cron.getNextDate()
console.log('下次执行时间:', nextRun)
// 从指定时间开始计算
const nextRun = cron.getNextDate(new Date('2024-01-01'))Cron 表达式格式
Cron 表达式由 5 个字段组成,用空格分隔:
分钟 小时 日 月 星期
0 0 * * *字段说明:
- 分钟 (0-59) - 小时中的分钟
- 小时 (0-23) - 一天中的小时
- 日 (1-31) - 月份中的日期
- 月 (1-12) - 一年中的月份
- 星期 (0-6) - 一周中的星期(0 表示星期日)
支持的语法:
*- 匹配所有值?- 不指定值(仅用于日和星期字段),- 列表值,如1,3,5表示 1、3、5-- 范围值,如1-5表示 1 到 5/- 步长值,如*/5表示每 5 个单位,1-10/2表示 1 到 10 之间每 2 个单位
常用示例:
typescript
// 每分钟执行
parseCron('* * * * *')
// 每小时的第 0 分钟执行
parseCron('0 * * * *')
// 每天凌晨执行
parseCron('0 0 * * *')
// 每周一凌晨执行
parseCron('0 0 * * 1')
// 每 5 分钟执行
parseCron('*/5 * * * *')
// 工作日上午 9 点到下午 5 点执行
parseCron('0 9-17 * * 1-5')
// 每月 1 号凌晨执行
parseCron('0 0 1 * *')CronExpression
Cron 表达式实例,可以计算下一次执行时间。
typescript
import { parseCron } from '@cat-kit/be'
const cron = parseCron('0 0 * * *')
// 获取下一次执行时间
const nextRun = cron.getNextDate()
if (nextRun) {
console.log('下次执行:', nextRun)
} else {
console.log('没有下一次执行时间')
}
// 从指定时间开始计算
const nextRun = cron.getNextDate(new Date('2024-01-01'))API 参考
typescript
class CronExpression {
constructor(expression: string)
getNextDate(from?: Date): Date | null
}方法说明:
getNextDate(from?)- 计算下一次执行时间from- 起始时间,默认当前时间- 返回下一次执行时间,如果无法计算则返回
null
任务调度器
Scheduler
任务调度器类,支持 Cron 任务、延迟任务和定时任务。所有任务都需要先添加到调度器,然后调用 start() 启动调度器。
适用场景:
- 定时任务执行
- 数据清理
- 健康检查
- 报表生成
- 延迟任务处理
基本用法
typescript
import { Scheduler } from '@cat-kit/be'
// 创建调度器
const scheduler = new Scheduler()
// Cron 任务(每天凌晨执行)
scheduler.schedule('daily-cleanup', '0 0 * * *', async () => {
console.log('执行每日清理任务')
})
// 延迟执行(5秒后执行一次)
scheduler.once('delayed-task', 5000, () => {
console.log('延迟任务执行')
})
// 定时执行(每30秒执行一次)
scheduler.interval('heartbeat', 30000, () => {
console.log('心跳检测')
})
// 启动调度器
scheduler.start()
// 获取任务信息
const task = scheduler.getTask('daily-cleanup')
console.log('下次执行时间:', task?.nextRun)
// 获取所有任务
const tasks = scheduler.getTasks()
// 取消任务
scheduler.cancel('heartbeat')
// 停止调度器
scheduler.stop()API 参考
typescript
class Scheduler {
// 添加 Cron 任务
schedule(id: string, cron: string | CronExpression, task: TaskFunction): void
// 添加延迟任务(执行一次)
once(id: string, delay: number, task: TaskFunction): void
// 添加定时任务(重复执行)
interval(id: string, interval: number, task: TaskFunction): void
// 取消任务
cancel(id: string): boolean
// 启动调度器
start(): void
// 停止调度器
stop(): void
// 获取任务信息
getTask(id: string): TaskInfo | undefined
// 获取所有任务信息
getTasks(): TaskInfo[]
}方法说明:
schedule(id, cron, task)- 添加 Cron 任务id- 任务唯一标识cron- Cron 表达式字符串或CronExpression实例task- 要执行的任务函数(可以是异步函数)
once(id, delay, task)- 添加延迟任务(执行一次)id- 任务唯一标识delay- 延迟时间(毫秒)task- 要执行的任务函数
interval(id, interval, task)- 添加定时任务(重复执行)id- 任务唯一标识interval- 执行间隔(毫秒)task- 要执行的任务函数
cancel(id)- 取消任务id- 任务唯一标识- 返回是否成功取消
start()- 启动调度器,开始执行所有任务stop()- 停止调度器,停止所有任务getTask(id)- 获取指定任务的信息getTasks()- 获取所有任务的信息
任务信息:
typescript
interface TaskInfo {
id: string // 任务 ID
type: 'cron' | 'timeout' | 'interval' // 任务类型
nextRun?: Date // 下次执行时间
running: boolean // 是否正在运行
}使用示例
定时清理任务
typescript
import { Scheduler } from '@cat-kit/be'
const scheduler = new Scheduler()
// 每天凌晨清理临时文件
scheduler.schedule('cleanup-temp', '0 0 * * *', async () => {
console.log('开始清理临时文件...')
await cleanupTempFiles()
console.log('清理完成')
})
// 每小时清理缓存
scheduler.schedule('cleanup-cache', '0 * * * *', async () => {
await cleanupCache()
})
scheduler.start()健康检查
typescript
import { Scheduler } from '@cat-kit/be'
const scheduler = new Scheduler()
// 每 30 秒执行一次健康检查
scheduler.interval('health-check', 30000, async () => {
const health = await checkHealth()
if (!health.ok) {
await sendAlert(health)
}
})
scheduler.start()延迟任务
typescript
import { Scheduler } from '@cat-kit/be'
const scheduler = new Scheduler()
// 5 秒后发送通知
scheduler.once('send-notification', 5000, () => {
sendNotification('任务已完成')
})
scheduler.start()任务管理
typescript
import { Scheduler } from '@cat-kit/be'
const scheduler = new Scheduler()
// 添加任务
scheduler.schedule('task1', '0 * * * *', async () => {
// 任务逻辑
})
scheduler.start()
// 查看任务状态
const task = scheduler.getTask('task1')
if (task) {
console.log(`任务 ${task.id} 状态:`)
console.log(` 类型: ${task.type}`)
console.log(` 下次执行: ${task.nextRun}`)
console.log(` 正在运行: ${task.running}`)
}
// 列出所有任务
const tasks = scheduler.getTasks()
console.log(`共有 ${tasks.length} 个任务`)
// 取消任务
scheduler.cancel('task1')
// 停止所有任务
scheduler.stop()优雅关闭
typescript
import { Scheduler } from '@cat-kit/be'
const scheduler = new Scheduler()
scheduler.schedule('periodic-task', '*/5 * * * *', async () => {
// 定期任务
})
scheduler.start()
// 优雅关闭
process.on('SIGTERM', () => {
console.log('正在停止调度器...')
scheduler.stop()
console.log('调度器已停止')
process.exit(0)
})错误处理
typescript
import { Scheduler } from '@cat-kit/be'
const scheduler = new Scheduler()
scheduler.schedule('risky-task', '0 * * * *', async () => {
try {
await riskyOperation()
} catch (error) {
console.error('任务执行失败:', error)
// 发送告警
await sendAlert(error)
}
})
scheduler.start()动态任务管理
typescript
import { Scheduler, parseCron } from '@cat-kit/be'
const scheduler = new Scheduler()
// 动态添加任务
function addDynamicTask(id: string, cronExpression: string) {
const cron = parseCron(cronExpression)
scheduler.schedule(id, cron, async () => {
console.log(`执行任务: ${id}`)
})
if (!scheduler.getTask(id)) {
scheduler.start()
}
}
// 添加多个任务
addDynamicTask('task1', '0 0 * * *')
addDynamicTask('task2', '0 12 * * *')
addDynamicTask('task3', '*/30 * * * *')任务重试
typescript
import { Scheduler } from '@cat-kit/be'
const scheduler = new Scheduler()
async function executeWithRetry(task: () => Promise<void>, maxRetries = 3) {
for (let i = 0; i < maxRetries; i++) {
try {
await task()
return
} catch (error) {
if (i === maxRetries - 1) {
throw error
}
console.log(`任务失败,${1000 * (i + 1)}ms 后重试...`)
await new Promise(resolve => setTimeout(resolve, 1000 * (i + 1)))
}
}
}
scheduler.schedule('retry-task', '0 * * * *', async () => {
await executeWithRetry(async () => {
await riskyOperation()
})
})
scheduler.start()最佳实践
- 任务 ID 唯一性:确保每个任务都有唯一的 ID,避免冲突
- 错误处理:在任务函数中处理错误,避免任务失败影响调度器
- 优雅关闭:在应用关闭时调用
stop(),确保任务正常结束 - 任务监控:定期检查任务状态,确保任务正常执行
- 避免长时间任务:如果任务执行时间较长,考虑使用队列或异步处理
- 资源清理:在任务中及时释放资源,避免内存泄漏
- 日志记录:记录任务的执行情况,便于问题排查
