Springboot整合Quartz实现动态定时任务

Java框架

浏览数:216

2018-12-11

简介

Quartz是一款功能强大的任务调度器,可以实现较为复杂的调度功能,如每月一号执行、每天凌晨执行、每周五执行等等,还支持分布式调度。本文使用Springboot+Mybatis+Quartz实现对定时任务的增、删、改、查、启用、停用等功能。并把定时任务持久化到数据库以及支持集群。对于如何创建Springboot项目和与Mybatis整合可以参考上篇文章:[spring-boot整合Mybatis如何部署在weblogic上
][1]。

Quartz的3个基本要素

  1. Scheduler:调度器。所有的调度都是由它控制。
  2. Trigger: 触发器。决定什么时候来执行任务。
  3. JobDetail & Job: JobDetail定义的是任务数据,而真正的执行逻辑是在Job中。使用JobDetail + Job而不是Job,这是因为任务是有可能并发执行,如果Scheduler直接使用Job,就会存在对同一个Job实例并发访问的问题。而JobDetail & Job 方式,sheduler每次执行,都会根据JobDetail创建一个新的Job实例,这样就可以规避并发访问的问题。

如何使用Quartz

  1. 添加依赖
<dependency>  
    <groupId>org.quartz-scheduler</groupId>  
    <artifactId>quartz</artifactId>  
    <version>2.2.3</version>  
</dependency> 
<dependency>  
    <groupId>org.quartz-scheduler</groupId>  
    <artifactId>quartz-jobs</artifactId>  
    <version>2.2.3</version>  
</dependency>

2.创建配置文件
在maven项目的resource目录下创建quartz.properties

org.quartz.scheduler.instanceName = MyScheduler
org.quartz.scheduler.instanceId = AUTO
org.quartz.scheduler.rmi.export = false
org.quartz.scheduler.rmi.proxy = false
org.quartz.scheduler.wrapJobExecutionInUserTransaction = false

#线程池配置
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount = 10
org.quartz.threadPool.threadPriority = 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true

#持久化配置
org.quartz.jobStore.misfireThreshold = 50000
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
#支持集群
org.quartz.jobStore.isClustered = true
org.quartz.jobStore.useProperties:true
org.quartz.jobStore.clusterCheckinInterval = 15000
#使用weblogic连接Oracle驱动
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.oracle.weblogic.WebLogicOracleDelegate
#org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.tablePrefix = QRTZ_
org.quartz.jobStore.dataSource = qzDS
#数据源连接信息,quartz默认使用c3p0数据源可以被自定义数据源覆盖
org.quartz.dataSource.qzDS.driver = oracle.jdbc.driver.OracleDriver
org.quartz.dataSource.qzDS.URL = jdbc:oracle:thin:@localhost:1521/XE
org.quartz.dataSource.qzDS.user = root
org.quartz.dataSource.qzDS.password = 123456
org.quartz.dataSource.qzDS.maxConnections = 10

说明:在使用quartz做持久化的时候需要用到quartz的11张表,可以去quartz官网下载对应版本的quartz,解压打开docs/dbTables里面有对应数据库的建表语句。关于quartz.properties配置的详细解释可以查看quartz官网。另外新建一张表TB_APP_QUARTZ用于存放定时任务基本信息和描述等信息,定时任务的增、删、改、执行等功能与此表没有任何关系。
quartz的11张表:
图片描述

//TB_APP_QUARTZ表的实体类
public class AppQuartz {
    private Integer quartzId;  //id  主键
    private String jobName;  //任务名称
    private String jobGroup;  //任务分组
    private String startTime;  //任务开始时间
    private String cronExpression;  //corn表达式
    private String invokeParam;//需要传递的参数
    ...省略set get
}

3.Quartz配置

/**
 * 创建job 实例工厂,解决spring注入问题,如果使用默认会导致spring的@Autowired 无法注入问题
 * @author LLQ
 *
 */
@Component
public class JobFactory extends AdaptableJobFactory{
    @Autowired
    private AutowireCapableBeanFactory capableBeanFactory;
    
     @Override
        protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
            //调用父类的方法
            Object jobInstance = super.createJobInstance(bundle);
            //进行注入
            capableBeanFactory.autowireBean(jobInstance);
            return jobInstance;
        }

}
@Configuration
public class SchedulerConfig implements ApplicationListener<ContextRefreshedEvent>{    
    @Autowired
    private JobFactory jobFactory;
    @Autowired
    @Qualifier("dataSource")
    private DataSource primaryDataSource;
    
    @Override
     public void onApplicationEvent(ContextRefreshedEvent event) {
        System.out.println("任务已经启动..."+event.getSource());
    }
    
    @Bean
    public SchedulerFactoryBean schedulerFactoryBean() throws IOException {        
       //获取配置属性
        PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
        propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));
        //在quartz.properties中的属性被读取并注入后再初始化对象
        propertiesFactoryBean.afterPropertiesSet();
        //创建SchedulerFactoryBean
        SchedulerFactoryBean factory = new SchedulerFactoryBean();
        factory.setQuartzProperties(propertiesFactoryBean.getObject());
        //使用数据源,自定义数据源
        factory.setDataSource(this.primaryDataSource);
        factory.setJobFactory(jobFactory);
        factory.setWaitForJobsToCompleteOnShutdown(true);//这样当spring关闭时,会等待所有已经启动的quartz job结束后spring才能完全shutdown。
        factory.setOverwriteExistingJobs(false); 
        factory.setStartupDelay(1);  
        return factory;
    }
    
    
    /*
     * 通过SchedulerFactoryBean获取Scheduler的实例
     */
    @Bean(name="scheduler")
    public Scheduler scheduler() throws IOException {
        return schedulerFactoryBean().getScheduler();
    }
    
    
    @Bean
    public QuartzInitializerListener executorListener() {
       return new QuartzInitializerListener();
    }
}

4.创建定时任务服务

@Service
public class JobUtil {
     @Autowired
     @Qualifier("scheduler")
     private Scheduler scheduler;
          
     
     /**
      * 新建一个任务
      * 
      */     
     public String addJob(AppQuartz appQuartz) throws Exception  {
         
             SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
             Date date=df.parse(appQuartz.getStartTime());
         
             if (!CronExpression.isValidExpression(appQuartz.getCronExpression())) {           
                return "Illegal cron expression";   //表达式格式不正确
            }                            
            JobDetail jobDetail=null;
            //构建job信息
            if("JobOne".equals(appQuartz.getJobGroup())) {
                 jobDetail = JobBuilder.newJob(JobOne.class).withIdentity(appQuartz.getJobName(), appQuartz.getJobGroup()).build();
            }
            if("JobTwo".equals(appQuartz.getJobGroup())) {
                 jobDetail = JobBuilder.newJob(JobTwo.class).withIdentity(appQuartz.getJobName(), appQuartz.getJobGroup()).build();
            }
                    
            //表达式调度构建器(即任务执行的时间,不立即执行)
            CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(appQuartz.getCronExpression()).withMisfireHandlingInstructionDoNothing();

            //按新的cronExpression表达式构建一个新的trigger
            CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(appQuartz.getJobName(), appQuartz.getJobGroup()).startAt(date)
                    .withSchedule(scheduleBuilder).build();
                                 
            //传递参数
            if(appQuartz.getInvokeParam()!=null && !"".equals(appQuartz.getInvokeParam())) {
                trigger.getJobDataMap().put("invokeParam",appQuartz.getInvokeParam());    
            }                                
            scheduler.scheduleJob(jobDetail, trigger);
           // pauseJob(appQuartz.getJobName(),appQuartz.getJobGroup());
            return "success";
        } 
         /**
          * 获取Job状态
          * @param jobName
          * @param jobGroup
          * @return
          * @throws SchedulerException
          */
         public String getJobState(String jobName, String jobGroup) throws SchedulerException {             
             TriggerKey triggerKey = new TriggerKey(jobName, jobGroup);    
             return scheduler.getTriggerState(triggerKey).name();
           }
         
         //暂停所有任务
         public void pauseAllJob() throws SchedulerException {            
             scheduler.pauseAll();
          }
        
        //暂停任务
        public String pauseJob(String jobName, String jobGroup) throws SchedulerException {            
            JobKey jobKey = new JobKey(jobName, jobGroup);
            JobDetail jobDetail = scheduler.getJobDetail(jobKey);
            if (jobDetail == null) {
                 return "fail";
            }else {
                 scheduler.pauseJob(jobKey);
                 return "success";
            }
                                         
        }
        
        //恢复所有任务
        public void resumeAllJob() throws SchedulerException {            
            scheduler.resumeAll();
        }
        
        // 恢复某个任务
        public String resumeJob(String jobName, String jobGroup) throws SchedulerException {
            
            JobKey jobKey = new JobKey(jobName, jobGroup);
            JobDetail jobDetail = scheduler.getJobDetail(jobKey);
            if (jobDetail == null) {
                return "fail";
            }else {               
                scheduler.resumeJob(jobKey);
                return "success";
            }
        }
        
        //删除某个任务
        public String  deleteJob(AppQuartz appQuartz) throws SchedulerException {            
            JobKey jobKey = new JobKey(appQuartz.getJobName(), appQuartz.getJobGroup());
            JobDetail jobDetail = scheduler.getJobDetail(jobKey);
            if (jobDetail == null ) {
                 return "jobDetail is null";
            }else if(!scheduler.checkExists(jobKey)) {
                return "jobKey is not exists";
            }else {
                 scheduler.deleteJob(jobKey);
                 return "success";
            }  
           
        }
        
        //修改任务
        public String  modifyJob(AppQuartz appQuartz) throws SchedulerException {            
            if (!CronExpression.isValidExpression(appQuartz.getCronExpression())) {
                return "Illegal cron expression";
            }
            TriggerKey triggerKey = TriggerKey.triggerKey(appQuartz.getJobName(),appQuartz.getJobGroup());            
            JobKey jobKey = new JobKey(appQuartz.getJobName(),appQuartz.getJobGroup());
            if (scheduler.checkExists(jobKey) && scheduler.checkExists(triggerKey)) {
                CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);            
                //表达式调度构建器,不立即执行
                CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(appQuartz.getCronExpression()).withMisfireHandlingInstructionDoNothing();
                //按新的cronExpression表达式重新构建trigger
                trigger = trigger.getTriggerBuilder().withIdentity(triggerKey)
                    .withSchedule(scheduleBuilder).build();
                //修改参数
                if(!trigger.getJobDataMap().get("invokeParam").equals(appQuartz.getInvokeParam())) {
                    trigger.getJobDataMap().put("invokeParam",appQuartz.getInvokeParam());
                }                
                //按新的trigger重新设置job执行
                scheduler.rescheduleJob(triggerKey, trigger);                                                
                return "success";                    
            }else {
                return "job or trigger not exists";
            }    
            
        }

}
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
@Component
public class JonOne implements Job{
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException{
        JobDataMap data=context.getTrigger().getJobDataMap();
        String invokeParam =(String) data.get("invokeParam");
        //在这里实现业务逻辑
        }
}
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
@Component
public class JobTwo implements Job{
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException{
        JobDataMap data=context.getTrigger().getJobDataMap();
        String invokeParam =(String) data.get("invokeParam");
        //在这里实现业务逻辑
        }
}

说明:每个定时任务都必须有一个分组,名称和corn表达式,corn表达式也就是定时任务的触发时间,关于corn表达式格式以及含义可以参考一些网络资源。每个定时任务都有一个入口类在这里我把类名当成定时任务的分组名称,例如:只要创建定时任务的分组是JobOne的都会执行JobOne这个任务类里面的逻辑。如果定时任务需要额外的参数可以使用JobDataMap传递参数,当然也可以从数据库中获取需要的数据。@PersistJobDataAfterExecution和@DisallowConcurrentExecution注解是不让某个定时任务并发执行,只有等当前任务完成下一个任务才会去执行。

5.封装定时任务接口

@RestController
public class JobController {
    @Autowired
    private JobUtil jobUtil;    
    @Autowired
    private AppQuartzService appQuartzService;
    
    
    //添加一个job
    @RequestMapping(value="/addJob",method=RequestMethod.POST)
    public ReturnMsg addjob(@RequestBody AppQuartz appQuartz) throws Exception {    
        appQuartzService.insertAppQuartzSer(appQuartz);        
        result=jobUtil.addJob(appQuartz);                                                
    }
    
    //暂停job    
    @RequestMapping(value="/pauseJob",method=RequestMethod.POST)
    public ReturnMsg pausejob(@RequestBody Integer[]quartzIds) throws Exception {    
        AppQuartz appQuartz=null;            
        if(quartzIds.length>0){
            for(Integer quartzId:quartzIds) {
                appQuartz=appQuartzService.selectAppQuartzByIdSer(quartzId).get(0);
                jobUtil.pauseJob(appQuartz.getJobName(), appQuartz.getJobGroup());                        
            }
            return new ReturnMsg("200","success pauseJob");    
        }else {
            return new ReturnMsg("404","fail pauseJob");    
        }                                                                
    }
    
    //恢复job
    @RequestMapping(value="/resumeJob",method=RequestMethod.POST)
    public ReturnMsg resumejob(@RequestBody Integer[]quartzIds) throws Exception {    
        AppQuartz appQuartz=null;
        if(quartzIds.length>0) {
            for(Integer quartzId:quartzIds) {
                appQuartz=appQuartzService.selectAppQuartzByIdSer(quartzId).get(0);
                jobUtil.resumeJob(appQuartz.getJobName(), appQuartz.getJobGroup());                
            }
            return new ReturnMsg("200","success resumeJob");
        }else {
            return new ReturnMsg("404","fail resumeJob");
        }            
    } 
        
    
    //删除job
    @RequestMapping(value="/deletJob",method=RequestMethod.POST)
    public ReturnMsg deletjob(@RequestBody Integer[]quartzIds) throws Exception {
        AppQuartz appQuartz=null;
        for(Integer quartzId:quartzIds) {
            appQuartz=appQuartzService.selectAppQuartzByIdSer(quartzId).get(0);
            String ret=jobUtil.deleteJob(appQuartz);
            if("success".equals(ret)) {
                appQuartzService.deleteAppQuartzByIdSer(quartzId);
            }
        }
        return new ReturnMsg("200","success deleteJob");    
    }
        
    //修改
    @RequestMapping(value="/updateJob",method=RequestMethod.POST)
    public ReturnMsg  modifyJob(@RequestBody AppQuartz appQuartz) throws Exception {
        String ret= jobUtil.modifyJob(appQuartz);            
        if("success".equals(ret)) {            
            appQuartzService.updateAppQuartzSer(appQuartz);
            return new ReturnMsg("200","success updateJob",ret);
        }else {
            return new ReturnMsg("404","fail updateJob",ret);
        }                
    }
    
    //暂停所有
    @RequestMapping(value="/pauseAll",method=RequestMethod.GET)
    public ReturnMsg pauseAllJob() throws Exception {
        jobUtil.pauseAllJob();
        return new ReturnMsg("200","success pauseAll");
    }
    
    //恢复所有
    @RequestMapping(value="/repauseAll",method=RequestMethod.GET)
    public ReturnMsg repauseAllJob() throws Exception {
        jobUtil.resumeAllJob();
        return new ReturnMsg("200","success repauseAll");
    }    
    
}

原文地址:https://segmentfault.com/a/1190000016554033