温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

SpringBoot+SpringBatch+Quartz整合定时批量任务方式的示例分析

发布时间:2021-09-08 09:18:49 来源:亿速云 阅读:397 作者:小新 栏目:开发技术

这篇文章主要介绍了SpringBoot+SpringBatch+Quartz整合定时批量任务方式的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。

一、引言

需求内容如下:

PC网页触发一条设备升级记录(下图),后台要定时批量设备更新。这里定时要用到Quartz,批量数据处理要用到SpringBatch,二者结合,可以完成该需求。

由于之前,没有用过SpringBatch,于是上网查了下资料,发现可参考的不是很多,于是只能去慢慢的翻看官方文档。

遇到不少问题,就记录一下吧。

SpringBoot+SpringBatch+Quartz整合定时批量任务方式的示例分析

二、代码具体实现

1、pom文件

  <dependencies>     <dependency>       <groupId>org.springframework.boot</groupId>       <artifactId>spring-boot-starter-web</artifactId>     </dependency>     <dependency>       <groupId>org.postgresql</groupId>       <artifactId>postgresql</artifactId>     </dependency>     <dependency>       <groupId>org.springframework.boot</groupId>       <artifactId>spring-boot-starter-jdbc</artifactId>     </dependency>     <dependency>       <groupId>org.springframework.boot</groupId>       <artifactId>spring-boot-starter-batch</artifactId>     </dependency>     <dependency>       <groupId>org.projectlombok</groupId>       <artifactId>lombok</artifactId>     </dependency>     <dependency>       <groupId>org.springframework.boot</groupId>       <artifactId>spring-boot-starter-batch</artifactId>     </dependency>    </dependencies>

2、application.yaml文件

spring:   datasource:     username: thinklink     password: thinklink     url: jdbc:postgresql://172.16.205.54:5432/thinklink     driver-class-name: org.postgresql.Driver   batch:     job:       enabled: false server:   port: 8073 #upgrade-dispatch-base-url: http://172.16.205.125:8080/api/rpc/dispatch/command/ upgrade-dispatch-base-url: http://172.16.205.211:8080/api/noauth/rpc/dispatch/command/ # 每次批量处理的数据量,默认为5000 batch-size: 5000

3、Service实现类

触发批处理任务的入口,执行一个job

@Service("batchService") public class BatchServiceImpl implements BatchService {	// 框架自动注入     @Autowired     private JobLauncher jobLauncher;     @Autowired     private Job updateDeviceJob;     /**      * 根据 taskId 创建一个Job      * @param taskId      * @throws Exception      */     @Override     public void createBatchJob(String taskId) throws Exception {         JobParameters jobParameters = new JobParametersBuilder()                 .addString("taskId", taskId)                 .addString("uuid", UUID.randomUUID().toString().replace("-",""))                 .toJobParameters();         // 传入一个Job任务和任务需要的参数         jobLauncher.run(updateDeviceJob, jobParameters);     } }

4、SpringBatch配置类

此部分最重要(☆☆☆☆☆)

@Configuration public class BatchConfiguration {     private static final Logger log = LoggerFactory.getLogger(BatchConfiguration.class);     @Value("${batch-size:5000}")     private int batchSize;	// 框架自动注入     @Autowired     public JobBuilderFactory jobBuilderFactory;	// 框架自动注入     @Autowired     public StepBuilderFactory stepBuilderFactory;	// 数据过滤器,对从数据库读出来的数据,注意进行操作     @Autowired     public TaskItemProcessor taskItemProcessor;     // 接收job参数     public Map<String, JobParameter> parameters;     public Object taskId;     @Autowired     private JdbcTemplate jdbcTemplate;	// 读取数据库操作     @Bean     @StepScope     public JdbcCursorItemReader<DispatchRequest> itemReader(DataSource dataSource) {         String querySql = " SELECT " +                 " e. ID AS taskId, " +                 " e.user_id AS userId, " +                 " e.timing_startup AS startTime, " +                 " u.device_id AS deviceId, " +                 " d.app_name AS appName, " +                 " d.compose_file AS composeFile, " +                 " e.failure_retry AS failureRetry, " +                 " e.tetry_times AS retryTimes, " +                 " e.device_managered AS deviceManagered " +                 " FROM " +                 " eiot_upgrade_task e " +                 " LEFT JOIN eiot_upgrade_device u ON e. ID = u.upgrade_task_id " +                 " LEFT JOIN eiot_app_detail d ON e.app_id = d. ID " +                 " WHERE " +                 " ( " +                 " u.device_upgrade_status = 0 " +                 " OR u.device_upgrade_status = 2" +                 " )" +                 " AND e.tetry_times > u.retry_times " +                 " AND e. ID = ?";         return new JdbcCursorItemReaderBuilder<DispatchRequest>()                 .name("itemReader")                 .sql(querySql)                 .dataSource(dataSource)                 .queryArguments(new Object[]{parameters.get("taskId").getValue()})                 .rowMapper(new DispatchRequest.DispatchRequestRowMapper())                 .build();     }	// 将结果写回数据库     @Bean     @StepScope     public ItemWriter<ProcessResult> itemWriter() {         return new ItemWriter<ProcessResult>() {             private int updateTaskStatus(DispatchRequest dispatchRequest, int status) {                 log.info("update taskId: {}, deviceId: {} to status {}", dispatchRequest.getTaskId(), dispatchRequest.getDeviceId(), status);                 Integer retryTimes = jdbcTemplate.queryForObject(                         "select retry_times from eiot_upgrade_device where device_id = ? and upgrade_task_id = ?",                         new Object[]{ dispatchRequest.getDeviceId(), dispatchRequest.getTaskId()}, Integer.class                 );                 retryTimes += 1;                 int updateCount = jdbcTemplate.update("update eiot_upgrade_device set device_upgrade_status = ?, retry_times = ? " +                         "where device_id = ? and upgrade_task_id = ?", status, retryTimes, dispatchRequest.getDeviceId(), dispatchRequest.getTaskId());                 if (updateCount <= 0) {                     log.warn("no task updated");                 } else {                     log.info("count of {} task updated", updateCount);                 }                 // 最后一次重试                 if (status == STATUS_DISPATCH_FAILED && retryTimes == dispatchRequest.getRetryTimes()) {                     log.info("the last retry of {} failed, inc deviceManagered", dispatchRequest.getTaskId());                     return 1;                 } else {                     return 0;                 }             }             @Override             @Transactional             public void write(List<? extends ProcessResult> list) throws Exception {                 Map taskMap = jdbcTemplate.queryForMap(                         "select device_managered, device_count, task_status from eiot_upgrade_task where id = ?",                         list.get(0).getDispatchRequest().getTaskId() // 我们认定一个批量里面,taskId都是一样的                         );                 int deviceManagered = (int)taskMap.get("device_managered");                 Integer deviceCount = (Integer) taskMap.get("device_count");                 if (deviceCount == null) {                     log.warn("deviceCount of task {} is null", list.get(0).getDispatchRequest().getTaskId());                 }                 int taskStatus = (int)taskMap.get("task_status");                 for (ProcessResult result: list) {                     deviceManagered += updateTaskStatus(result.getDispatchRequest(), result.getStatus());                 }                 if (deviceCount != null && deviceManagered == deviceCount) {                     taskStatus = 2; //任务状态 0:待升级,1:升级中,2:已完成                 }                 jdbcTemplate.update("update eiot_upgrade_task  set device_managered = ?, task_status = ? " +                         "where id = ?", deviceManagered, taskStatus, list.get(0).getDispatchRequest().getTaskId());             }         };     }     /**      * 定义一个下发更新的 job      * @return      */     @Bean     public Job updateDeviceJob(Step updateDeviceStep) {         return jobBuilderFactory.get(UUID.randomUUID().toString().replace("-", ""))                 .listener(new JobListener()) // 设置Job的监听器                 .flow(updateDeviceStep)// 执行下发更新的Step                 .end()                 .build();     }     /**      * 定义一个下发更新的 step      * @return      */     @Bean     public Step updateDeviceStep(JdbcCursorItemReader<DispatchRequest> itemReader,ItemWriter<ProcessResult> itemWriter) {         return stepBuilderFactory.get(UUID.randomUUID().toString().replace("-", ""))                 .<DispatchRequest, ProcessResult> chunk(batchSize)                 .reader(itemReader) //根据taskId从数据库读取更新设备信息                 .processor(taskItemProcessor) // 每条更新信息,执行下发更新接口                 .writer(itemWriter)                 .build();     }     // job 监听器     public class JobListener implements JobExecutionListener {         @Override         public void beforeJob(JobExecution jobExecution) {             log.info(jobExecution.getJobInstance().getJobName() + " before... ");             parameters = jobExecution.getJobParameters().getParameters();             taskId = parameters.get("taskId").getValue();             log.info("job param taskId : " + parameters.get("taskId"));         }         @Override         public void afterJob(JobExecution jobExecution) {             log.info(jobExecution.getJobInstance().getJobName() + " after... ");             // 当所有job执行完之后,查询设备更新状态,如果有失败,则要定时重新执行job             String sql = " SELECT " +                     " count(*) " +                     " FROM " +                     " eiot_upgrade_device d " +                     " LEFT JOIN eiot_upgrade_task u ON d.upgrade_task_id = u. ID " +                     " WHERE " +                     " u. ID = ? " +                     " AND d.retry_times < u.tetry_times " +                     " AND ( " +                     " d.device_upgrade_status = 0 " +                     " OR d.device_upgrade_status = 2 " +                     " ) ";             // 获取更新失败的设备个数             Integer count = jdbcTemplate.queryForObject(sql, new Object[]{taskId}, Integer.class);             log.info("update device failure count : " + count);             // 下面是使用Quartz触发定时任务             // 获取任务时间,单位秒 //            String time = jdbcTemplate.queryForObject(sql, new Object[]{taskId}, Integer.class);             // 此处方便测试,应该从数据库中取taskId对应的重试间隔,单位秒             Integer millSecond = 10;             if(count != null && count > 0){                 String jobName = "UpgradeTask_" + taskId;                 String reTaskId = taskId.toString();                 Map<String,Object> params = new HashMap<>();                 params.put("jobName",jobName);                 params.put("taskId",reTaskId);                 if (QuartzManager.checkNameNotExist(jobName))                 {                     QuartzManager.scheduleRunOnceJob(jobName, RunOnceJobLogic.class,params,millSecond);                 }             }         }     } }

5、Processor,处理每条数据

可以在此对数据进行过滤操作

@Component("taskItemProcessor") public class TaskItemProcessor implements ItemProcessor<DispatchRequest, ProcessResult> {     public static final int STATUS_DISPATCH_FAILED = 2;     public static final int STATUS_DISPATCH_SUCC = 1;     private static final Logger log = LoggerFactory.getLogger(TaskItemProcessor.class);     @Value("${upgrade-dispatch-base-url:http://localhost/api/v2/rpc/dispatch/command/}")     private String dispatchUrl;     @Autowired     JdbcTemplate jdbcTemplate;     /**      * 在这里,执行 下发更新指令 的操作      * @param dispatchRequest      * @return      * @throws Exception      */     @Override     public ProcessResult process(final DispatchRequest dispatchRequest) {         // 调用接口,下发指令         String url = dispatchUrl + dispatchRequest.getDeviceId()+"/"+dispatchRequest.getUserId();         log.info("request url:" + url);         RestTemplate restTemplate = new RestTemplate();         HttpHeaders headers = new HttpHeaders();         headers.setContentType(MediaType.APPLICATION_JSON_UTF8);         MultiValueMap<String, String> params = new LinkedMultiValueMap<String, String>();         JSONObject jsonOuter = new JSONObject();         JSONObject jsonInner = new JSONObject();         try {             jsonInner.put("jobId",dispatchRequest.getTaskId());             jsonInner.put("name",dispatchRequest.getName());             jsonInner.put("composeFile", Base64Util.bytesToBase64Str(dispatchRequest.getComposeFile()));             jsonInner.put("policy",new JSONObject().put("startTime",dispatchRequest.getPolicy()));             jsonInner.put("timestamp",dispatchRequest.getTimestamp());             jsonOuter.put("method","updateApp");             jsonOuter.put("params",jsonInner);         } catch (JSONException e) {             log.info("JSON convert Exception :" + e);         }catch (IOException e) {             log.info("Base64Util bytesToBase64Str :" + e);         }         log.info("request body json :" + jsonOuter);         HttpEntity<String> requestEntity = new HttpEntity<String>(jsonOuter.toString(),headers);         int status;         try {             ResponseEntity<String> response = restTemplate.postForEntity(url,requestEntity,String.class);             log.info("response :" + response);             if (response.getStatusCode() == HttpStatus.OK) {                 status = STATUS_DISPATCH_SUCC;             } else {                 status = STATUS_DISPATCH_FAILED;             }         }catch (Exception e){             status = STATUS_DISPATCH_FAILED;         }         return new ProcessResult(dispatchRequest, status);     } }

6、封装数据库返回数据的实体Bean

注意静态内部类

public class DispatchRequest {     private String taskId;     private String deviceId;     private String userId;     private String name;     private byte[] composeFile;     private String policy;     private String timestamp;     private String md5;     private int failureRetry;     private int retryTimes;     private int deviceManagered;    // 省略构造函数,setter/getter/tostring方法    //......         public static class DispatchRequestRowMapper implements RowMapper<DispatchRequest> {         @Override         public DispatchRequest mapRow(ResultSet resultSet, int i) throws SQLException {             DispatchRequest dispatchRequest = new DispatchRequest();             dispatchRequest.setTaskId(resultSet.getString("taskId"));             dispatchRequest.setUserId(resultSet.getString("userId"));             dispatchRequest.setPolicy(resultSet.getString("startTime"));             dispatchRequest.setDeviceId(resultSet.getString("deviceId"));             dispatchRequest.setName(resultSet.getString("appName"));             dispatchRequest.setComposeFile(resultSet.getBytes("composeFile"));             dispatchRequest.setTimestamp(DateUtil.DateToString(new Date()));             dispatchRequest.setRetryTimes(resultSet.getInt("retryTimes"));             dispatchRequest.setFailureRetry(resultSet.getInt("failureRetry"));             dispatchRequest.setDeviceManagered(resultSet.getInt("deviceManagered"));             return dispatchRequest;         }     } }

7、启动类上要加上注解

@SpringBootApplication @EnableBatchProcessing public class Application {     public static void main(String[] args) {         SpringApplication.run(Application.class, args);     } }

三、小结一下

其实SpringBatch并没有想象中那么好用,当从数据库中每次取5000条数据后,进入processor中是逐条处理的,这个时候不能不行操作,等5000条数据处理完之后,再一次性执行ItemWriter方法。

在使用的过程中,最坑的地方是ItemReader和ItemWriter这两个地方,如何执行自定义的Sql,参考文中代码就行。至于Quartz定时功能,很简单,只要定时创建SpringBatch里面的Job,让这个job启动就好了,此处就不在给出了,贴的代码太多了。由于公司一些原因,代码不能放到GitHub上。

spring-batch与quartz集成过程中遇到的问题

问题

启动时报Exception

Driver's Blob representation is of an unsupported type: weblogic.jdbc.wrapper.Blob_oracle_sql_BLOB

原因

quartz的driverDelegateClass配置的是OracleDelegate,应用运行在weblogic上

解决

driverDelegateClass对应配置改为

org.quartz.impl.jdbcjobstore.oracle.weblogic.WebLogicOracleDelegate

感谢你能够认真阅读完这篇文章,希望小编分享的“SpringBoot+SpringBatch+Quartz整合定时批量任务方式的示例分析”这篇文章对大家有帮助,同时也希望大家多多支持亿速云,关注亿速云行业资讯频道,更多相关知识等着你来学习!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI