# 示例15 - TC集群化的Quartz

说明

此示例演示Quartz如何在Terracotta集群环境中运行以支持故障转移,使用Terracotta服务器作为共享调度器存储。

# 简析

本示例旨在演示Quartz与Terracotta集群环境的集成,展示如何在不使用数据库的情况下实现高可用的集群化作业调度。

该程序将执行以下操作:

  • 启动Terracotta服务器作为共享数据存储
  • 运行多个Quartz实例连接到同一Terracotta服务器
  • 演示集群环境下的故障转移和作业恢复
  • 展示负载均衡和分布式作业执行

# 运行该示例

此示例可以从examples/example15目录中执行。需要按以下步骤操作:

# 准备工作:

  1. 安装Terracotta:确保已安装Terracotta服务器
  2. 配置环境:设置JAVA_HOME和TC_HOME环境变量

# 运行步骤:

  1. 启动Terracotta服务器

    • start-sample-server.sh——UNIX/Linux shell脚本
    • start-sample-server.bat——Windows批处理文件
  2. 启动第一个集群实例

    • instance1.sh——UNIX/Linux shell脚本
    • instance1.bat——Windows批处理文件
    • 可选参数:clearJobs(清除现有作业)
  3. 启动第二个集群实例

    • instance2.sh——UNIX/Linux shell脚本
    • instance2.bat——Windows批处理文件
    • 自动使用dontScheduleJobs参数

# 注意事项:

  • 最好在不同计算机上运行客户端和服务器
  • 确保集群中所有机器的时钟同步(使用NTP等)
  • 可以在不同平台上运行实例进行测试

# 代码

此示例的代码位于org.quartz.examples.example15包中。

本示例中的代码由以下类组成:

类名 描述
ClusterExample 主程序,演示TC集群功能
SimpleRecoveryJob 支持故障恢复的简单作业
SimpleRecoveryStatefulJob 支持故障恢复的有状态作业

# SimpleRecoveryJob

SimpleRecoveryJob是一个简单的作业,支持故障恢复功能。当调度器崩溃时,正在执行的作业会被其他集群实例重新执行。

# SimpleRecoveryStatefulJob

这是一个有状态的作业类,继承自SimpleRecoveryJob,使用了@PersistJobDataAfterExecution@DisallowConcurrentExecution注解。

# ClusterExample

程序首先创建调度器实例:

SchedulerFactory sf = new StdSchedulerFactory();
Scheduler sched = sf.getScheduler();

如果需要清除现有作业:

if (inClearJobs) {
  _log.warn("***** Deleting existing jobs/triggers *****");
  sched.clear();
}

然后创建多个作业,每个都启用故障恢复:

JobDetail job = newJob(SimpleRecoveryJob.class).withIdentity("job_" + count, schedId)
    .requestRecovery() // 请求调度器在崩溃时重新执行此作业
    .build();

SimpleTrigger trigger = newTrigger().withIdentity("triger_" + count, schedId)
    .startAt(futureDate(1, IntervalUnit.SECOND))
    .withSchedule(simpleSchedule().withRepeatCount(20).withIntervalInSeconds(5)).build();

sched.scheduleJob(job, trigger);

# 配置文件

# instance1.properties

#============================================================================
# Configure Main Scheduler Properties
#============================================================================
org.quartz.scheduler.instanceName: TestScheduler
org.quartz.scheduler.instanceId: instance_one

#============================================================================
# Configure ThreadPool
#============================================================================
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 5
org.quartz.threadPool.threadPriority: 5

#============================================================================
# Configure JobStore
#============================================================================
org.quartz.jobStore.misfireThreshold: 60000
org.quartz.jobStore.class: org.terracotta.quartz.TerracottaJobStore
org.quartz.jobStore.tcConfigUrl: localhost:9510

# instance2.properties

#============================================================================
# Configure Main Scheduler Properties
#============================================================================
org.quartz.scheduler.instanceName: TestScheduler
org.quartz.scheduler.instanceId: instance_two

#============================================================================
# Configure ThreadPool
#============================================================================
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 5
org.quartz.threadPool.threadPriority: 5

#============================================================================
# Configure JobStore
#============================================================================
org.quartz.jobStore.misfireThreshold: 60000
org.quartz.jobStore.class: org.terracotta.quartz.TerracottaJobStore
org.quartz.jobStore.tcConfigUrl: localhost:9510

# 关键特性

# 集群配置要点

  • 唯一实例ID:每个集群实例必须有不同的instanceId
  • 相同配置:除实例ID外,其他配置应保持一致
  • TerracottaJobStore:使用Terracotta作为作业存储而非数据库

# 故障转移机制

  • 作业恢复:使用.requestRecovery()标记作业支持故障恢复
  • 自动检测:默认设置下故障检测需要约15秒
  • 无缝切换:失败实例的作业会被其他实例接管

# 负载均衡

  • 分布式执行:作业在集群中自动分配执行
  • 共享数据:所有实例访问相同的调度数据
  • 动态协调:Terracotta负责集群协调和数据同步

# 关键要点

  • 高可用性:提供企业级的高可用作业调度解决方案
  • 无数据库依赖:使用Terracotta内存数据网格替代传统数据库
  • 时间同步:集群中所有机器必须使用时间同步服务
  • 故障恢复:自动处理节点故障和作业恢复
  • 水平扩展:可以轻松添加更多实例来扩展处理能力
  • 生产就绪:适用于对可靠性要求较高的生产环境

# SimpleRecoveryJob.java源码

package org.quartz.examples.example15;

import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.JobKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;

/**
 * Job的一个缺陷实现,用于单元测试
 * 作者:James House
 */
public class SimpleRecoveryJob implements Job {

  private static Logger _log = LoggerFactory.getLogger(SimpleRecoveryJob.class);

  private static final String COUNT = "count";

  /**
   * Quartz需要一个公共的空参构造函数,以便调度器可以在需要时实例化类
   */
  public SimpleRecoveryJob() {
  }

  /**
   * 当与此作业相关联的触发器触发时,由调度器调用此方法
   * Throws:作业执行异常(JobExecutionException)-当执行作业时产生异常
   */
  public void execute(JobExecutionContext context) throws JobExecutionException {

    JobKey jobKey = context.getJobDetail().getKey();

    //如果作业正在恢复,则打印消息
    if (context.isRecovering()) {
      _log.info("SimpleRecoveryJob: " + jobKey + " RECOVERING at " + new Date());
    } else {
      _log.info("SimpleRecoveryJob: " + jobKey + " starting at " + new Date());
    }

    //延迟10秒
    long delay = 10L * 1000L;
    try {
      Thread.sleep(delay);
    } catch (Exception e) {
      //
    }

    JobDataMap data = context.getJobDetail().getJobDataMap();
    int count;
    if (data.containsKey(COUNT)) {
      count = data.getInt(COUNT);
    } else {
      count = 0;
    }
    count++;
    data.put(COUNT, count);

    _log.info("SimpleRecoveryJob: " + jobKey + " done at " + new Date() + "\n Execution #" + count);

  }

}

# SimpleRecoveryStatefulJob.java源码

package org.quartz.examples.example15;

import org.quartz.DisallowConcurrentExecution;
import org.quartz.PersistJobDataAfterExecution;

/**
 * 此作业具有与SimpleRecoveryJob相同的功能,只是此作业实现的是"有状态"的,因为它将在每次执行后自动重新持久化其数据
 *(JobDataMap),并且一次只能执行JobDetail的一个实例
 * 作者:Bill Kratzer
 */
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
public class SimpleRecoveryStatefulJob extends SimpleRecoveryJob {

  public SimpleRecoveryStatefulJob() {
    super();
  }
}

# ClusterExample.java源码

package org.quartz.examples.example15;

import static org.quartz.DateBuilder.futureDate;
import static org.quartz.JobBuilder.newJob;
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
import static org.quartz.TriggerBuilder.newTrigger;

import org.quartz.DateBuilder.IntervalUnit;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerFactory;
import org.quartz.SimpleTrigger;
import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 用于测试/显示Terracotta作业存储(TerracottaJobStore)的集群功能
 * 所有实例都必须使用不同的属性文件,因为它们的实例ID必须不同,但所有其他属性都应该相同
 * 如果您希望它清除现有的作业和触发器,请传递一个名为"clearJobs"的命令行参数
 * 您可能应该从一组"新的"表开始(假设您可能有来自其他测试的数据遗留在其中),因为将非集群设置的数据与集群设置的混合可能会很糟糕
 * 尝试在运行时杀死其中一个集群实例,并查看其余实例是否恢复正在进行的作业。注意,在默认设置下,检测故障可能需要15秒左右的时间
 * 也可以尝试在调度器中注册/不注册关机挂钩插件的情况下运行它。(org.quartz.plugins.management.ShutdownHookPlugin)
 * 注意:不要在单独的机器上运行集群,除非它们的时钟使用某种形式的时间同步服务(如NTP守护程序)进行同步
 *
 * 参考:SimpleRecoveryJob
 * 参考:SimpleRecoveryStatefulJob
 * 作者:James House
 */
public class ClusterExample {

  private static Logger _log = LoggerFactory.getLogger(ClusterExample.class);

  public void run(boolean inClearJobs, boolean inScheduleJobs) throws Exception {

    //首先,我们必须获得对调度器的引用
    SchedulerFactory sf = new StdSchedulerFactory();
    Scheduler sched = sf.getScheduler();

    if (inClearJobs) {
      _log.warn("***** Deleting existing jobs/triggers *****");
      sched.clear();
    }

    _log.info("------- Initialization Complete -----------");

    if (inScheduleJobs) {

      _log.info("------- Scheduling Jobs ------------------");

      String schedId = sched.getSchedulerInstanceId();

      int count = 1;

      //将触发器放在以集群节点实例命名的组中,以便(在日志记录中)区分调度的内容和请求调度器
      //在调度程序停止时重新执行此作业的位置
      JobDetail job = newJob(SimpleRecoveryJob.class).withIdentity("job_" + count, schedId)
          .requestRecovery() // 请求调度器在崩溃时重新执行此作业
          .build();

      SimpleTrigger trigger = newTrigger().withIdentity("triger_" + count, schedId)
          .startAt(futureDate(1, IntervalUnit.SECOND))
          .withSchedule(simpleSchedule().withRepeatCount(20).withIntervalInSeconds(5)).build();

      _log.info(job.getKey() + " will run at: " + trigger.getNextFireTime() + " and repeat: "
                + trigger.getRepeatCount() + " times, every " + trigger.getRepeatInterval() / 1000 + " seconds");
      sched.scheduleJob(job, trigger);

      count++;

      //将触发器放在以集群节点实例命名的组中,以便(在日志记录中)区分调度的内容和请求调度器
      //在调度程序停止时重新执行此作业的位置
      job = newJob(SimpleRecoveryJob.class).withIdentity("job_" + count, schedId)
          .requestRecovery() // 请求调度器在崩溃时重新执行此作业
          .build();

      trigger = newTrigger().withIdentity("triger_" + count, schedId).startAt(futureDate(2, IntervalUnit.SECOND))
          .withSchedule(simpleSchedule().withRepeatCount(20).withIntervalInSeconds(5)).build();

      _log.info(job.getKey() + " will run at: " + trigger.getNextFireTime() + " and repeat: "
                + trigger.getRepeatCount() + " times, every " + trigger.getRepeatInterval() / 1000 + " seconds");
      sched.scheduleJob(job, trigger);

      count++;

      //将触发器放在以集群节点实例命名的组中,以便(在日志记录中)区分调度的内容和请求调度器
      //在调度程序停止时重新执行此作业的位置
      job = newJob(SimpleRecoveryStatefulJob.class).withIdentity("job_" + count, schedId)
          .requestRecovery() // 请求调度器在崩溃时重新执行此作业
          .build();

      trigger = newTrigger().withIdentity("triger_" + count, schedId).startAt(futureDate(1, IntervalUnit.SECOND))
          .withSchedule(simpleSchedule().withRepeatCount(20).withIntervalInSeconds(3)).build();

      _log.info(job.getKey() + " will run at: " + trigger.getNextFireTime() + " and repeat: "
                + trigger.getRepeatCount() + " times, every " + trigger.getRepeatInterval() / 1000 + " seconds");
      sched.scheduleJob(job, trigger);

      count++;

      //将触发器放在以集群节点实例命名的组中,以便(在日志记录中)区分调度的内容和请求调度器
      //在调度程序停止时重新执行此作业的位置
      job = newJob(SimpleRecoveryJob.class).withIdentity("job_" + count, schedId)
          .requestRecovery() // 请求调度器在崩溃时重新执行此作业
          .build();

      trigger = newTrigger().withIdentity("triger_" + count, schedId).startAt(futureDate(1, IntervalUnit.SECOND))
          .withSchedule(simpleSchedule().withRepeatCount(20).withIntervalInSeconds(4)).build();

      _log.info(job.getKey() + " will run at: " + trigger.getNextFireTime() + " & repeat: " + trigger.getRepeatCount()
                + "/" + trigger.getRepeatInterval());
      sched.scheduleJob(job, trigger);

      count++;

      //将触发器放在以集群节点实例命名的组中,以便(在日志记录中)区分调度的内容和请求调度器
      //在调度程序停止时重新执行此作业的位置
      job = newJob(SimpleRecoveryJob.class).withIdentity("job_" + count, schedId)
          .requestRecovery() // 请求调度器在崩溃时重新执行此作业
          .build();

      trigger = newTrigger().withIdentity("triger_" + count, schedId).startAt(futureDate(1, IntervalUnit.SECOND))
          .withSchedule(simpleSchedule().withRepeatCount(20).withIntervalInMilliseconds(4500L)).build();

      _log.info(job.getKey() + " will run at: " + trigger.getNextFireTime() + " & repeat: " + trigger.getRepeatCount()
                + "/" + trigger.getRepeatInterval());
      sched.scheduleJob(job, trigger);
    }

    //在调用start()之前,作业不会启动
    _log.info("------- Starting Scheduler ---------------");
    sched.start();
    _log.info("------- Started Scheduler ----------------");

    _log.info("------- Waiting for one hour... ----------");
    try {
      Thread.sleep(3600L * 1000L);
    } catch (Exception e) {
      //
    }

    _log.info("------- Shutting Down --------------------");
    sched.shutdown();
    _log.info("------- Shutdown Complete ----------------");
  }

  public static void main(String[] args) throws Exception {
    boolean clearJobs = false;
    boolean scheduleJobs = true;

    for (String arg : args) {
      if (arg.equalsIgnoreCase("clearJobs")) {
        clearJobs = true;
      } else if (arg.equalsIgnoreCase("dontScheduleJobs")) {
        scheduleJobs = false;
      }
    }

    ClusterExample example = new ClusterExample();
    example.run(clearJobs, scheduleJobs);
  }
}

微信公众号

QQ交流群
原创网站开发,偏差难以避免。

如若发现错误,诚心感谢反馈。

愿你倾心相念,愿你学有所成。

愿你朝华相顾,愿你前程似锦。