# 示例15 - TC集群化的Quartz
说明
此示例演示Quartz如何在Terracotta集群环境中运行以支持故障转移,使用Terracotta服务器作为共享调度器存储。
# 简析
本示例旨在演示Quartz与Terracotta集群环境的集成,展示如何在不使用数据库的情况下实现高可用的集群化作业调度。
该程序将执行以下操作:
- 启动Terracotta服务器作为共享数据存储
- 运行多个Quartz实例连接到同一Terracotta服务器
- 演示集群环境下的故障转移和作业恢复
- 展示负载均衡和分布式作业执行
# 运行该示例
此示例可以从examples/example15目录中执行。需要按以下步骤操作:
# 准备工作:
- 安装Terracotta:确保已安装Terracotta服务器
- 配置环境:设置JAVA_HOME和TC_HOME环境变量
# 运行步骤:
启动Terracotta服务器:
- start-sample-server.sh——UNIX/Linux shell脚本
- start-sample-server.bat——Windows批处理文件
启动第一个集群实例:
- instance1.sh——UNIX/Linux shell脚本
- instance1.bat——Windows批处理文件
- 可选参数:
clearJobs(清除现有作业)
启动第二个集群实例:
- instance2.sh——UNIX/Linux shell脚本
- instance2.bat——Windows批处理文件
- 自动使用
dontScheduleJobs参数
# 注意事项:
- 最好在不同计算机上运行客户端和服务器
- 确保集群中所有机器的时钟同步(使用NTP等)
- 可以在不同平台上运行实例进行测试
# 代码
此示例的代码位于org.quartz.examples.example15包中。
本示例中的代码由以下类组成:
| 类名 | 描述 |
|---|---|
| ClusterExample | 主程序,演示TC集群功能 |
| SimpleRecoveryJob | 支持故障恢复的简单作业 |
| SimpleRecoveryStatefulJob | 支持故障恢复的有状态作业 |
# SimpleRecoveryJob
SimpleRecoveryJob是一个简单的作业,支持故障恢复功能。当调度器崩溃时,正在执行的作业会被其他集群实例重新执行。
# SimpleRecoveryStatefulJob
这是一个有状态的作业类,继承自SimpleRecoveryJob,使用了@PersistJobDataAfterExecution和@DisallowConcurrentExecution注解。
# ClusterExample
程序首先创建调度器实例:
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler sched = sf.getScheduler();
如果需要清除现有作业:
if (inClearJobs) {
_log.warn("***** Deleting existing jobs/triggers *****");
sched.clear();
}
然后创建多个作业,每个都启用故障恢复:
JobDetail job = newJob(SimpleRecoveryJob.class).withIdentity("job_" + count, schedId)
.requestRecovery() // 请求调度器在崩溃时重新执行此作业
.build();
SimpleTrigger trigger = newTrigger().withIdentity("triger_" + count, schedId)
.startAt(futureDate(1, IntervalUnit.SECOND))
.withSchedule(simpleSchedule().withRepeatCount(20).withIntervalInSeconds(5)).build();
sched.scheduleJob(job, trigger);
# 配置文件
# instance1.properties
#============================================================================
# Configure Main Scheduler Properties
#============================================================================
org.quartz.scheduler.instanceName: TestScheduler
org.quartz.scheduler.instanceId: instance_one
#============================================================================
# Configure ThreadPool
#============================================================================
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 5
org.quartz.threadPool.threadPriority: 5
#============================================================================
# Configure JobStore
#============================================================================
org.quartz.jobStore.misfireThreshold: 60000
org.quartz.jobStore.class: org.terracotta.quartz.TerracottaJobStore
org.quartz.jobStore.tcConfigUrl: localhost:9510
# instance2.properties
#============================================================================
# Configure Main Scheduler Properties
#============================================================================
org.quartz.scheduler.instanceName: TestScheduler
org.quartz.scheduler.instanceId: instance_two
#============================================================================
# Configure ThreadPool
#============================================================================
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 5
org.quartz.threadPool.threadPriority: 5
#============================================================================
# Configure JobStore
#============================================================================
org.quartz.jobStore.misfireThreshold: 60000
org.quartz.jobStore.class: org.terracotta.quartz.TerracottaJobStore
org.quartz.jobStore.tcConfigUrl: localhost:9510
# 关键特性
# 集群配置要点
- 唯一实例ID:每个集群实例必须有不同的
instanceId - 相同配置:除实例ID外,其他配置应保持一致
- TerracottaJobStore:使用Terracotta作为作业存储而非数据库
# 故障转移机制
- 作业恢复:使用
.requestRecovery()标记作业支持故障恢复 - 自动检测:默认设置下故障检测需要约15秒
- 无缝切换:失败实例的作业会被其他实例接管
# 负载均衡
- 分布式执行:作业在集群中自动分配执行
- 共享数据:所有实例访问相同的调度数据
- 动态协调:Terracotta负责集群协调和数据同步
# 关键要点
- 高可用性:提供企业级的高可用作业调度解决方案
- 无数据库依赖:使用Terracotta内存数据网格替代传统数据库
- 时间同步:集群中所有机器必须使用时间同步服务
- 故障恢复:自动处理节点故障和作业恢复
- 水平扩展:可以轻松添加更多实例来扩展处理能力
- 生产就绪:适用于对可靠性要求较高的生产环境
# SimpleRecoveryJob.java源码
package org.quartz.examples.example15;
import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.JobKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
/**
* Job的一个缺陷实现,用于单元测试
* 作者:James House
*/
public class SimpleRecoveryJob implements Job {
private static Logger _log = LoggerFactory.getLogger(SimpleRecoveryJob.class);
private static final String COUNT = "count";
/**
* Quartz需要一个公共的空参构造函数,以便调度器可以在需要时实例化类
*/
public SimpleRecoveryJob() {
}
/**
* 当与此作业相关联的触发器触发时,由调度器调用此方法
* Throws:作业执行异常(JobExecutionException)-当执行作业时产生异常
*/
public void execute(JobExecutionContext context) throws JobExecutionException {
JobKey jobKey = context.getJobDetail().getKey();
//如果作业正在恢复,则打印消息
if (context.isRecovering()) {
_log.info("SimpleRecoveryJob: " + jobKey + " RECOVERING at " + new Date());
} else {
_log.info("SimpleRecoveryJob: " + jobKey + " starting at " + new Date());
}
//延迟10秒
long delay = 10L * 1000L;
try {
Thread.sleep(delay);
} catch (Exception e) {
//
}
JobDataMap data = context.getJobDetail().getJobDataMap();
int count;
if (data.containsKey(COUNT)) {
count = data.getInt(COUNT);
} else {
count = 0;
}
count++;
data.put(COUNT, count);
_log.info("SimpleRecoveryJob: " + jobKey + " done at " + new Date() + "\n Execution #" + count);
}
}
# SimpleRecoveryStatefulJob.java源码
package org.quartz.examples.example15;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.PersistJobDataAfterExecution;
/**
* 此作业具有与SimpleRecoveryJob相同的功能,只是此作业实现的是"有状态"的,因为它将在每次执行后自动重新持久化其数据
*(JobDataMap),并且一次只能执行JobDetail的一个实例
* 作者:Bill Kratzer
*/
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
public class SimpleRecoveryStatefulJob extends SimpleRecoveryJob {
public SimpleRecoveryStatefulJob() {
super();
}
}
# ClusterExample.java源码
package org.quartz.examples.example15;
import static org.quartz.DateBuilder.futureDate;
import static org.quartz.JobBuilder.newJob;
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
import static org.quartz.TriggerBuilder.newTrigger;
import org.quartz.DateBuilder.IntervalUnit;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerFactory;
import org.quartz.SimpleTrigger;
import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 用于测试/显示Terracotta作业存储(TerracottaJobStore)的集群功能
* 所有实例都必须使用不同的属性文件,因为它们的实例ID必须不同,但所有其他属性都应该相同
* 如果您希望它清除现有的作业和触发器,请传递一个名为"clearJobs"的命令行参数
* 您可能应该从一组"新的"表开始(假设您可能有来自其他测试的数据遗留在其中),因为将非集群设置的数据与集群设置的混合可能会很糟糕
* 尝试在运行时杀死其中一个集群实例,并查看其余实例是否恢复正在进行的作业。注意,在默认设置下,检测故障可能需要15秒左右的时间
* 也可以尝试在调度器中注册/不注册关机挂钩插件的情况下运行它。(org.quartz.plugins.management.ShutdownHookPlugin)
* 注意:不要在单独的机器上运行集群,除非它们的时钟使用某种形式的时间同步服务(如NTP守护程序)进行同步
*
* 参考:SimpleRecoveryJob
* 参考:SimpleRecoveryStatefulJob
* 作者:James House
*/
public class ClusterExample {
private static Logger _log = LoggerFactory.getLogger(ClusterExample.class);
public void run(boolean inClearJobs, boolean inScheduleJobs) throws Exception {
//首先,我们必须获得对调度器的引用
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler sched = sf.getScheduler();
if (inClearJobs) {
_log.warn("***** Deleting existing jobs/triggers *****");
sched.clear();
}
_log.info("------- Initialization Complete -----------");
if (inScheduleJobs) {
_log.info("------- Scheduling Jobs ------------------");
String schedId = sched.getSchedulerInstanceId();
int count = 1;
//将触发器放在以集群节点实例命名的组中,以便(在日志记录中)区分调度的内容和请求调度器
//在调度程序停止时重新执行此作业的位置
JobDetail job = newJob(SimpleRecoveryJob.class).withIdentity("job_" + count, schedId)
.requestRecovery() // 请求调度器在崩溃时重新执行此作业
.build();
SimpleTrigger trigger = newTrigger().withIdentity("triger_" + count, schedId)
.startAt(futureDate(1, IntervalUnit.SECOND))
.withSchedule(simpleSchedule().withRepeatCount(20).withIntervalInSeconds(5)).build();
_log.info(job.getKey() + " will run at: " + trigger.getNextFireTime() + " and repeat: "
+ trigger.getRepeatCount() + " times, every " + trigger.getRepeatInterval() / 1000 + " seconds");
sched.scheduleJob(job, trigger);
count++;
//将触发器放在以集群节点实例命名的组中,以便(在日志记录中)区分调度的内容和请求调度器
//在调度程序停止时重新执行此作业的位置
job = newJob(SimpleRecoveryJob.class).withIdentity("job_" + count, schedId)
.requestRecovery() // 请求调度器在崩溃时重新执行此作业
.build();
trigger = newTrigger().withIdentity("triger_" + count, schedId).startAt(futureDate(2, IntervalUnit.SECOND))
.withSchedule(simpleSchedule().withRepeatCount(20).withIntervalInSeconds(5)).build();
_log.info(job.getKey() + " will run at: " + trigger.getNextFireTime() + " and repeat: "
+ trigger.getRepeatCount() + " times, every " + trigger.getRepeatInterval() / 1000 + " seconds");
sched.scheduleJob(job, trigger);
count++;
//将触发器放在以集群节点实例命名的组中,以便(在日志记录中)区分调度的内容和请求调度器
//在调度程序停止时重新执行此作业的位置
job = newJob(SimpleRecoveryStatefulJob.class).withIdentity("job_" + count, schedId)
.requestRecovery() // 请求调度器在崩溃时重新执行此作业
.build();
trigger = newTrigger().withIdentity("triger_" + count, schedId).startAt(futureDate(1, IntervalUnit.SECOND))
.withSchedule(simpleSchedule().withRepeatCount(20).withIntervalInSeconds(3)).build();
_log.info(job.getKey() + " will run at: " + trigger.getNextFireTime() + " and repeat: "
+ trigger.getRepeatCount() + " times, every " + trigger.getRepeatInterval() / 1000 + " seconds");
sched.scheduleJob(job, trigger);
count++;
//将触发器放在以集群节点实例命名的组中,以便(在日志记录中)区分调度的内容和请求调度器
//在调度程序停止时重新执行此作业的位置
job = newJob(SimpleRecoveryJob.class).withIdentity("job_" + count, schedId)
.requestRecovery() // 请求调度器在崩溃时重新执行此作业
.build();
trigger = newTrigger().withIdentity("triger_" + count, schedId).startAt(futureDate(1, IntervalUnit.SECOND))
.withSchedule(simpleSchedule().withRepeatCount(20).withIntervalInSeconds(4)).build();
_log.info(job.getKey() + " will run at: " + trigger.getNextFireTime() + " & repeat: " + trigger.getRepeatCount()
+ "/" + trigger.getRepeatInterval());
sched.scheduleJob(job, trigger);
count++;
//将触发器放在以集群节点实例命名的组中,以便(在日志记录中)区分调度的内容和请求调度器
//在调度程序停止时重新执行此作业的位置
job = newJob(SimpleRecoveryJob.class).withIdentity("job_" + count, schedId)
.requestRecovery() // 请求调度器在崩溃时重新执行此作业
.build();
trigger = newTrigger().withIdentity("triger_" + count, schedId).startAt(futureDate(1, IntervalUnit.SECOND))
.withSchedule(simpleSchedule().withRepeatCount(20).withIntervalInMilliseconds(4500L)).build();
_log.info(job.getKey() + " will run at: " + trigger.getNextFireTime() + " & repeat: " + trigger.getRepeatCount()
+ "/" + trigger.getRepeatInterval());
sched.scheduleJob(job, trigger);
}
//在调用start()之前,作业不会启动
_log.info("------- Starting Scheduler ---------------");
sched.start();
_log.info("------- Started Scheduler ----------------");
_log.info("------- Waiting for one hour... ----------");
try {
Thread.sleep(3600L * 1000L);
} catch (Exception e) {
//
}
_log.info("------- Shutting Down --------------------");
sched.shutdown();
_log.info("------- Shutdown Complete ----------------");
}
public static void main(String[] args) throws Exception {
boolean clearJobs = false;
boolean scheduleJobs = true;
for (String arg : args) {
if (arg.equalsIgnoreCase("clearJobs")) {
clearJobs = true;
} else if (arg.equalsIgnoreCase("dontScheduleJobs")) {
scheduleJobs = false;
}
}
ClusterExample example = new ClusterExample();
example.run(clearJobs, scheduleJobs);
}
}

微信公众号

QQ交流群
原创网站开发,偏差难以避免。
如若发现错误,诚心感谢反馈。
愿你倾心相念,愿你学有所成。
愿你朝华相顾,愿你前程似锦。
如若发现错误,诚心感谢反馈。
愿你倾心相念,愿你学有所成。
愿你朝华相顾,愿你前程似锦。