Spring Batch 批处理(1) – 简介及使用场景

Java基础

浏览数:37

2020-6-18

什么是 Spring Batch

<br/>

介绍

Spring Batch 作为 Spring 的子项目,是一款基于 Spring 的企业批处理框架。通过它可以构建出健壮的企业批处理应用。Spring Batch 不仅提供了统一的读写接口、丰富的任务处理方式、灵活的事务管理及并发处理,同时还支持日志、监控、任务重启与跳过等特性,大大简化了批处理应用开发,将开发人员从复杂的任务配置管理过程中解放出来,使他们可以更多地去关注核心的业务处理过程。

Spring Batch把批处理简化为Job和Job step两部分,在Job step中,把数据处理分为读数据(Reader)、处理数据(Processor)、写数据(Writer)三个步骤,异常处理机制分为跳过、重试、重启三种,作业方式分为多线程、并行、远程、分区四种。开发者在开发过程中,大部分工作是根据业务要求编写Reader、Processor和Writer即可,提高了批处理开发的效率。同时Spring Batch本身也提供了很多默认的Reader和Writer,开箱即用。

官网详细介绍:https://spring.io/projects/sp…

<br/>

<br/>

架构组件分类

<br/>

  • Application(应用层):包含开发者应用Spring-batch编写的所有批处理作业和自定义代码;
  • Batch Core(核心层):包含加载和控制批处理作业所必需的核心类,它包含了Job,Step,JobLauncher的实现;
  • Infrastructure(基础架构层):基础架构层包含了Reader(ItemReader),Writer(ItemWriter),Services可以被应用层和核心层使用;

<br/>

<br/>

<br/>

优势

<br/>

  • 丰富的开箱即用组件 开箱即用组件包括各种资源的读、写。读/写:支持文本文件读/写、XML文件读/写、数据库读/写、JMS队列读/写等。还提供作业仓库,作业调度器等基础设施,大大简化开发复杂度。
  • 面向chunk处理 支持多次读、一次写、避免多次对资源的写入,大幅提升批处理效率。
  • 事务管理能力 默认采用Spring提供的声明式事务管理模型,面向Chunk的操作支持事务管理,同时支持为每个tasklet操作设置细粒度的事务配置:隔离级别、传播行为、超时设置等。
  • 元数据管理 自动记录Job和Step的执行情况、包括成功、失败、失败的异常信息、执行次数、重试次数、跳过次数、执行时间等,方便后期的维护和查看。
  • 易监控的批处理应用 提供了灵活的监控模式,包括直接查看数据库、通过Spring Batch提供的API查看、JMX控制台查看等。其中还说到Spring Batch Admin,不过这个项目已不维护,改为用Spring Cloud Data Flow了。
  • 丰富的流程定义 支持顺序任务、条件分支任务、基于这两种任务可以组织复杂的任务流程。
  • 健壮的批处理应用 支持作业的跳过、重试、重启能力、避免因错误导致批处理作业的异常中断。
  • 易扩展的批处理应用 扩展机制包括多线程执行一个Step(Multithreaded step)、多线程并行执行多个Step(Parallelizing step)、远程执行作业(Remote chunking)、分区执行(partitioning step)。
  • 复用企业现有IT资产 提供多种Adapter能力,使得企业现有的服务可以方便集成到批处理应用中。避免重新开发、达到复用企业遗留的服务资产。

<br/>

<br/>

使用场景

<br/>

  • 定期提交批处理任务
  • 并行批处理
  • 企业消息驱动处理
  • 大规模并行批处理
  • 失败后手动或定时重启
  • 按顺序处理依赖的任务(可扩展为工作流驱动的批处理)
  • 部分处理:跳过记录(例如,回滚时)
  • 批处理事务

<br/>

<br/>

原则与建议

<br/>

当我们构建一个批处理的过程时,必须注意以下原则:

1、通常情况下,批处理的过程对系统和架构的设计要够要求比较高,因此尽可能的使用通用架构来处理批量数据处理,降低问题发生的可能性。Spring Batch是一个是一个轻量级的框架,适用于处理一些灵活并没有到海量的数据。

2、批处理应该尽可能的简单,尽量避免在单个批处理中去执行过于复杂的任务。我们可以将任务分成多个批处理或者多个步骤去实现。

3、保证数据处理和物理数据紧密相连。笼统的说就是我们在处理数据的过程中有很多步骤,在某些步骤执行完时应该就写入数据,而不是等所有都处理完。

4、尽可能减少系统资源的使用、尤其是耗费大量资源的IO以及跨服务器引用,尽量分配好数据处理的批次。

5、定期分析系统的IO使用情况、SQL语句的执行情况等,尽可能的减少不必要的IO操作。优化的原则有:

  • 尽量在一次事物中对同一数据进行读取或写缓存。
  • 一次事物中,尽可能在开始就读取所有需要使用的数据。
  • 优化索引,观察SQL的执行情况,尽量使用主键索引,尽量避免全表扫描或过多的索引扫描。
  • SQL中的where尽可能通过主键查询。

6、不要在批处理中对相同的数据执行2次相同的操作。

7、对于批处理程序而言应该在批处理启动之前就分配足够的内存,以免处理的过程中去重新申请新的内存页。

8、对数据的完整性应该从最差的角度来考虑,每一步的处理都应该建立完备的数据校验。

9、对于数据的总量我们应该有一个和数据记录在数据结构的某个字段 上。

10、所有的批处理系统都需要进行压力测试。

11、如果整个批处理的过程是基于文件系统,在处理的过程中请切记完成文件的备份以及文件内容的校验。

<br/><br/>

通用策略

<br/>

和软件开发的设计模式一样,批处理也有各种各样的现成模式可供参考。当一个开发(设计)人员开始执行批处理任务时,应该将业务逻辑拆分为一下的步骤或者板块分批执行:

  • 数据转换:某个(某些)批处理的外部数据可能来自不同的外部系统或者外部提供者,这些数据的结构千差万别。在统一进行批量数据处理之前需要对这些数据进行转换,合并为一个统一的结构。因此在数据开始真正的执行业务处理之前,先要使用其他的方法或者一些批处理任务将这些数据转换为统一的格式。
  • 数据校验:批处理是对大量数据进行处理,并且数据的来源千差万别,所以批处理的输入数据需要对数据的完整性性进行校验(比如校验字段数据是否缺失)。另外批处理输出的数据也需要进行合适的校验(例如处理了100条数据,校验100条数据是否校验成功)
  • 提取数据:批处理的工作是逐条从数据库或目标文件读取记录(records),提取时可以通过一些规则从数据源中进行数据筛选。
  • 数据实时更新处理:根据业务要求,对实时数据进行处理。某些时候一行数据记录的处理需要绑定在一个事物之下。
  • 输出记录到标准的文档格式:数据处理完成之后需要根据格式写入到对应的外部数据系统中。

以上五个步骤是一个标准的数据批处理过程,Spring batch框架为业务实现提供了以上几个功能入口。

<br/><br/>

数据额外处理

<br/>
某些情况需要实现对数据进行额外处理,在进入批处理之前通过其他方式将数据进行处理。主要内容有:

排序:由于批处理是以独立的行数据(record)进行处理的,在处理的时候并不知道记录前后关系。因此如果需要对整体数据进行排序,最好事先使用其他方式完成。

分割:数据拆分也建议使用独立的任务来完成。理由类似排序,因为批处理的过程都是以行记录为基本处理单位的,无法再对分割之后的数据进行扩展处理。

合并:理由如上。

<br/>

<br/>

Spring Batch核心概念

<br/>

<br/>

Spring Batch在基础架构层,把任务抽象为Job和Step,一个Job由多个Step来完成,step就是每个job要执行的单个步骤。

1、Job:是一个接口,接口中定义了一个作业是怎么样执行的

2、JobInstance:是job的一次执行,一个JobInstance可重复执行,如果上一次执行失败下次执行的时候还会重新执行上次失败的job,每一次执行就是一个JobExceution

3、JobParameters:作为参数可以用来启动Job,并且可以用来标识不同的Job,运行时提供给JobInstance,jonExceution根据状态和参数决定下一次是否继续执行

4、JobExceution:每一次尝试执行一个Job的时候,我们就可以将其称为一个JobExceution,这个执行的结果可以为成功,也可以为失败,例如一个JobInstance执行失败了,下一次执行他传入的参数是上次执行的时间,他将会继续执行,这样始终执行的是一个JobInstance,而产生了两个JobExceution

5、Step:主要分为两块

(1)Tasklet:是一个任务单元,它是属于可以重复利用的东西。接口其中包含了一个唯一的方法execute();

(2)Chunk-based:chunk就是数据块,你需要定义多大的数据量是一个chunk。Chunk里面就是不断循环的一个流程,读数据,处理数据,然后写数据。Spring Batch会不断的循环这个流程,直到批处理数据完成。

  • ·itemReader:数据输入input:对于一个Step而言,每次读取一个条目;
  • ·itemProcessor:数据处理processing
  • ·ItemWriter:数据输出output:对于一个Step而言,每次根据设定输出批量一个条目;

6、StepExecution:一个Step的每一次尝试执行,都会创建一个StepExection,在一个Step实际开始执行的时候创建

7、ExecutionContext:执行上下文,代表的是一个key-value键值对的集合,可以被Spring框架进行在持久化管理,能够是开发人员存储持久化状态,每一个JobExecution以及每一个StepExecution的执行都会对应一个执行上下文(ExecutionContext);对于StepExecution在每一次提交点时就会保存一下执行上下文,而对于Job是在每一个StepExecution执行之间进行保存,例如,我们从Step1换到Step2是就会保存;

8、JobLauncher:接口,用于启动和加载Job,根据传入的参数进行启动,返回Job一次执行的情况

9、JobRepository:Job及Job的运行结果和状态、Step的运行结果和状态,都会保存在JobRepository中。

<br/>

<br/>

<br/>

概念说明可见下表:

|领域对象|描述|
|:–:|:–|
|JobRepository|作业仓库,保存Job、Step执行过程中的状态及结果|
|JobLauncher|作业执行器,是执行Job的入口|
|Job|一个批处理任务,由一个或多个Step组成|
|Step|一个任务的具体的执行逻辑单位|
|Item|一条数据记录|
|ItemReader|从数据源读数据|
|ItemProcessor|对数据进行处理,如数据清洗、转换、过滤、校验等|
|ItemWriter|写入数据到指定目标|
|Chunk|给定数量的Item集合,如读取到chunk数量后,才进行写操作|
|Tasklet|Step中具体执行逻辑,可重复执行|

<br/>

<br/>

Spring Batch数据表

batch_job_instance:这张表能看到每次运行的job名字。

batch_job_execution:这张表能看到每次运行job的开始时间,结束时间,状态,以及失败后的错误消息是什么。

batch_step_execution:这张表你能看到更多关于step的详细信息。比如step的开始时间,结束时间,提交次数,读写次数,状态,以及失败后的错误信息等。
图片描述

<br/>

<br/>

Job

<br/>
简单的说Job是封装一个批处理过程的实体,与其他的Spring项目类似,Job可以通过XML或Java类配置,称为“Job Configuration”。如下图Job是单个批处理的最顶层。

<br/>
为了便于理解,可以简单的将Job理解为是每一步(Step)实例的容器。他结合了多个Step,为它们提供统一的服务同时也为Step提供个性化的服务,比如步骤重启。通常情况下Job的配置包含以下内容:

  • Job的名称
  • 定义和排序Step执行实例。
  • 标记每个Step是否可以重启。

<br/>

Spring Batch为Job接口提供了默认的实现——SimpleJob,其中实现了一些标准的批处理方法。下面的代码展示了如可注入一个Job。

@Bean
public Job testJob() {
    return this.jobBuilderFactory.get("testJob") //get中命名了Job的名称
                     .start(stepOne())  
                     .next(stepTwo())
                     .next(stepThree())
                     .end()
                     .build();
}

<br/>

JobInstance

<br/>
JobInstance是指批处理作业运行的实例。

例如一个批处理必须在每天执行一次,系统在2019年5月1日执行了一次我们称之为2019-05-01的实例,类似的还会有2019-05-02、2019-05-03实例。

通常情况下,一个JobInstance对应一个JobParameters,对应多个JobExecution。(JobParameters、JobExecution见后文)。同一个JobInstance具有相同的上下文(ExecutionContext内容见后文)。

<br/>

JobParameters

<br/>
前面讨论了JobInstance与Job的区别,但是具体的区别内容都是通过JobParameters体现的。一个JobParameters对象中包含了一系列Job运行相关的参数,这些参数可以用于参考或者用于实际的业务使用。对应的关系如下图:

<br/>
当我们执行2个不同的JobInstance时JobParameters中的属性都会有差异。可以简单的认为一个JobInstance的标识就是Job+JobParameters。

<br/>

JobExecution

<br/>
JobExecution可以理解为单次运行Job的容器。一次JobInstance执行的结果可能是成功、也可能是失败。但是对于Spring Batch框架而言,只有返回运行成功才会视为完成一次批处理。

例如2019-05-01执行了一次JobInstance,但是执行的过程失败,因此第二次还会有一个“相同的”的JobInstance被执行。

Job用于定义批处理如何执行,JobInstance纯粹的就是一个处理对象,把所有的运行内容和信息组织在一起,主要是为了当面临问题时定义正确的重启参数。而JobExecution是运行时的“容器”,记录动态运行时的各种属性和上线文。

<br/>
他包括的信息有:

|属性| 说明|
|:–:|:–:|
|status| 状态类名为BatchStatus,它指示了执行的状态。在执行的过程中状态为BatchStatus#STARTED,失败:BatchStatus#FAILED,完成:BatchStatus#COMPLETED|
|startTime| java.util.Date对象,标记批处理任务启动的系统时间,批处理任务未启动数据为空|
|endTime| java.util.Date对象,结束时间无论是否成功都包含该数据,如未处理完为空|
|exitStatus| ExitStatus类,记录运行结果。|
|createTime| java.util.Date,JobExecution的创建时间,某些使用execution已经创建但是并未开始运行。|
|lastUpdate| java.util.Date,最后一次更新时间|
|executionContext| 批处理任务执行的所有用户数据|
|failureExceptions| 记录在执行Job时的异常,对于排查问题非常有用|

<br/>

以上这些内容Spring Batch都会通过JobRepository进行持久化(这些信息官方文成称之为MetaData),因此在对应的数据源中可以看到下列信息:

BATCH_JOB_INSTANCE:

|JOB_INST_ID| JOB_NAME|
|:–:|:–:|
|1| EndOfDayJob|

<br/>

BATCH_JOB_EXECUTION_PARAMS:

|JOB_EXECUTION_ID| TYPE_CD| KEY_NAME| DATE_VAL| IDENTIFYING|
|:–:|:–:|
|1| DATE| schedule.Date| 2019-01-01| TRUE|

<br/>

BATCH_JOB_EXECUTION:

|JOB_EXEC_ID| JOB_INST_ID| START_TIME| END_TIME| STATUS|
|:–:|:–:|
|1| 1| 2019-01-01 21:00| 2017-01-01 21:30| FAILED|

<br/>

当某个Job批处理任务失败之后会在对应的数据库表中路对应的状态。假设1月1号执行的任务失败,技术团队花费了大量的时间解决这个问题到了第二天才继续执行这个任务。

BATCH_JOB_INSTANCE:

|JOB_INST_ID| JOB_NAME|
|:–:|:–:|
|1| EndOfDayJob|
|2| EndOfDayJob|

<br/>

BATCH_JOB_EXECUTION_PARAMS:

|JOB_EXECUTION_ID| TYPE_CD| KEY_NAME| DATE_VAL| IDENTIFYING|
|:–:|:–:|
|1| DATE| schedule.Date| 2019-01-01| TRUE|
|2| DATE| schedule.Date| 2019-01-01| TRUE|
|3| DATE| schedule.Date| 2019-01-02| TRUE|

<br/>

BATCH_JOB_EXECUTION:

|JOB_EXEC_ID| JOB_INST_ID| START_TIME| END_TIME| STATUS|
|:–:|:–:|
|1| 1| 2019-01-01 21:00| 2017-01-01 21:30| FAILED|
|2| 1| 2019-01-02 21:00| 2017-01-02 21:30| COMPLETED|
|3| 2| 2019-01-02 21:31| 2017-01-02 22:29| COMPLETED|

<br/>

从数据上看好似JobInstance是一个接一个顺序执行的,但是对于Spring Batch并没有进行任何控制。不同的JobInstance很有可能是同时在运行(相同的JobInstance同时运行会抛出JobExecutionAlreadyRunningException异常)。

<br/>

<br/>

Step

<br/>
Step是批处理重复运行的最小单元,它按照顺序定义了一次执行的必要过程。

因此每个Job可以视作由一个或多个多个Step组成。一个Step包含了所有所有进行批处理的必要信息,这些信息的内容是由开发人员决定的并没有统一的标准。一个Step可以很简单,也可以很复杂。他可以是复杂业务的组合,也有可能仅仅用于迁移数据。

与JobExecution的概念类似,Step也有特定的StepExecution,关系结构如下:

<br/><br/>

StepExecution

<br/>
StepExecution表示单次执行Step的容器,每次Step执行时都会有一个新的StepExecution被创建。与JobExecution不同的是,当某个Step执行失败后默认并不会重新执行。StepExecution包含以下属性:

|属性| 说明|
|:–:|:–:|
|status| 状态类名为BatchStatus,它指示了执行的状态。在执行的过程中状态为BatchStatus#STARTED,失败:BatchStatus#FAILED,完成:BatchStatus#COMPLETED|
|startTime| java.util.Date对象,标记StepExecution启动的系统时间,未启动数据为空|
|endTime| java.util.Date对象,结束时间,无论是否成功都包含该数据,如未处理完为空|
|exitStatus| ExitStatus类,记录运行结果。|
|createTime| java.util.Date,JobExecution的创建时间,某些使用execution已经创建但是并未开始运行。|
|lastUpdate| java.util.Date,最后一次更新时间|
|executionContext| 批处理任务执行的所有用户数据|
|readCount| 成功读取数据的次数|
|wirteCount| 成功写入数据的次数|
|commitCount| 成功提交数据的次数|
|rollbackCount| 回归数据的次数,有业务代码触发|
|readSkipCount| 当读数据发生错误时跳过处理的次数|
|processSkipCount| 当处理过程发生错误,跳过处理的次数|
|filterCount| 被过滤规则拦截未处理的次数|
|writeSkipCount| 写数据失败,跳过处理的次数|

<br/><br/>

ExecutionContext

<br/>
前文已经多次提到ExecutionContext。可以简单的认为ExecutionContext提供了一个Key/Value机制,在StepExecution和JobExecution对象的任何位置都可以获取到ExecutionContext中的任何数据。最有价值的作用是记录数据的执行位置,以便发生重启时候从对应的位置继续执行:

executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition())

<br/>
比如在任务中有一个名为“loadData”的Step,他的作用是从文件中读取数据写入到数据库,当第一次执行失败后,数据库中有如下数据:

BATCH_JOB_INSTANCE:

| JOB_INST_ID| JOB_NAME|
|:–:|:–:|
| 1| EndOfDayJob|

<br/>

BATCH_JOB_EXECUTION_PARAMS:

| JOB_INST_ID| TYPE_CD| KEY_NAME| DATE_VAL|
|:–:|:–:|
| 1| DATE| schedule.Date| 2019-01-01|

<br/>

BATCH_JOB_EXECUTION:

| JOB_EXEC_ID| JOB_INST_ID| START_TIME| END_TIME| STATUS|
|:–:|:–:|
| 1| 1 | 2017-01-01 21:00| 2017-01-01 21:30| FAILED|

<br/>

BATCH_STEP_EXECUTION:

|STEP_EXEC_ID| JOB_EXEC_ID| STEP_NAME| START_TIME| END_TIME| STATUS|
|:–:|:–:|
|1| 1| loadData| 2017-01-01 21:00| 2017-01-01 21:30| FAILED|

<br/>

BATCH_STEP_EXECUTION_CONTEXT:

STEP_EXEC_ID SHORT_CONTEXT
1 {piece.count=40321}

<br/>

在上面的例子中,Step运行30分钟处理了40321个“pieces”,我们姑且认为“pieces”表示行间的行数(实际就是每个Step完成循环处理的个数)。

这个值会在每个commit之前被更新记录在ExecutionContext中(更新需要用到StepListener后文会详细说明)。

当我们再次重启这个Job时并记录在BATCH_STEP_EXECUTION_CONTEXT中的数据会加载到ExecutionContext中,这样当我们继续执行批处理任务时可以从上一次中断的位置继续处理。

例如下面的代码在ItemReader中检查上次执行的结果,并从中断的位置继续执行:

if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
    log.debug("Initializing for restart. Restart data is: " + executionContext);

    long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));

    LineReader reader = getReader();

    Object record = "";
    while (reader.getPosition() < lineCount && record != null) {
        record = readLine();
    }
}

<br/>
<br/>
ExecutionContext是根据JobInstance进行管理的,因此只要是相同的实例都会具备相同的ExecutionContext(无论是否停止)。此外通过以下方法都可以获得一个ExecutionContext:

ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();

<br/>

但是这2个ExecutionContext并不相同,前者是在一个Step中每次Commit数据之间共享,后者是在Step与Step之间共享。

<br/><br/>

JobRepository

<br/>
JobRepository是所有前面介绍的对象实例的持久化机制。他为JobLauncher、Job、Step的实现提供了CRUD操作。当一个Job第一次被启动时,一个JobExecution会从数据源中获取到,同时在执行的过程中StepExecution、JobExecution的实现都会记录到数据源中。使用@EnableBatchProcessing注解后JobRepository会进行自动化配置。

<br/>

JobLauncher

<br/>
JobLauncher为Job的启动运行提供了一个边界的入口,在启动Job的同时还可以定制JobParameters:

public interface JobLauncher {
    public JobExecution run(Job job, JobParameters jobParameters)
                throws JobExecutionAlreadyRunningException, JobRestartException,
                       JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}

参考:

https://my.oschina.net/miansh…

作者:陈晨辰