spring batch

SpringBatch是一个基于Spring的企业级批处理框架,所有基于Spring的框架都是使用了spring的IoC特性,然后加上自己的一些处理规则。因此,要理解SpringBatch的设计和使用,首先需要理解批处理的机制和特点。

所谓企业批处理就是指在企业级应用中,不需要人工干预,定期读取数据,进行相应的业务处理之后,再进行归档的这类操作。从上面的描述中可以看出,批处理的整个流程可以明显的分为3个阶段:

1、读数据

2、业务处理

3、归档结果数据

另外,从定义中可以发现批处理的一个重要特色就是无需人工干预、定期执行,因此一个批处理框架,需要关注事务的粒度,日志监控,执行方式,资源管理,读数据,处理数据,写数据的解耦等方面。

SpringBatch为我们提供了什么呢?

1、统一的读写接口

2、丰富的任务处理方式、

3、灵活的事务管理及并发处理

4、日志、监控、任务重启与跳过等特性

注意,SpringBatch未提供关于批处理任务调度的功能,因此如何周期性的调用批处理任务需要自己想办法解决,就Java来说,Quartz是一个不错的解决方案,或者写脚本处理之。

前面讲了很多SpringBatch的特性,接下来就通过一个小例子来看看SpringBatch是如何实现批处理的读数据-》处理数据-》归档结果这一过程的。

首先,搭建项目框架,推荐大家使用Maven或者Gradle结构的项目,不会的,赶紧学学,对于学习新技术省很多时间。一个Spring项目需要依赖的lib(可能有多,大家可以试探性的删掉一些不必要的包)如下:

Xml代码

1.<dependency>

2.<groupId>org.springframework</groupId>

3.<artifactId>spring-beans</artifactId>

4.<version>${springframework.core.version}</version>

5.</dependency>

6.<dependency>

7.<groupId>org.springframework</groupId>

8.<artifactId>spring-aop</artifactId>

9.<version>${springframework.core.version}</version>

10.</dependency>

11.<dependency>

12.<groupId>org.springframework</groupId>

13.<artifactId>spring-context</artifactId>

14.<version>${springframework.core.version}</version>

15.</dependency>

16.<dependency>

17.<groupId>org.springframework</groupId>

18.<artifactId>spring-core</artifactId>

19.<version>${springframework.core.version}</version>

20.</dependency>

21.<dependency>

22.<groupId>org.springframework</groupId>

23.<artifactId>spring-jdbc</artifactId>

24.<version>${springframework.core.version}</version>

25.</dependency>

26.<dependency>

27.<groupId>org.springframework</groupId>

28.<artifactId>spring-test</artifactId>

29.<version>${springframework.core.version}</version>

30.<scope>test</scope>

31.</dependency>

32.<dependency>

33.<groupId>org.springframework</groupId>

34.<artifactId>spring-tx</artifactId>

35.<version>${springframework.core.version}</version>

36.</dependency>

37.<dependency>

38.<groupId>org.springframework.batch</groupId>

39.<artifactId>spring-batch-core</artifactId>

40.<version>${spring.batch.version}</version>

41.</dependency>

42.<dependency>

43.<groupId>org.springframework.batch</groupId>

44.<artifactId>spring-batch-infrastructure</artifactId>

45.<version>${spring.batch.version}</version>

46.</dependency>

47.<dependency>

48.<groupId>org.springframework.batch</groupId>

49.<artifactId>spring-batch-test</artifactId>

50.<version>${spring.batch.version}</version>

51.<scope>test</scope>

52.</dependency>

53.<dependency>

54.<groupId>junit</groupId>

55.<artifactId>junit</artifactId>

56.<version>4.10</version>

57.<scope>test</scope>

58.</dependency>

<dependency>

<groupId>org.springframework</groupId>

<artifactId>spring-beans</artifactId>

<version>${springframework.core.version}</version>

</dependency>

<dependency>

<groupId>org.springframework</groupId>

<artifactId>spring-aop</artifactId>

<version>${springframework.core.version}</version>

</dependency>

<dependency>

<groupId>org.springframework</groupId>

<artifactId>spring-context</artifactId>

<version>${springframework.core.version}</version>

</dependency>

<dependency>

<groupId>org.springframework</groupId>

<artifactId>spring-core</artifactId>

<version>${springframework.core.version}</version>

</dependency>

<dependency>

<groupId>org.springframework</groupId>

<artifactId>spring-jdbc</artifactId>

<version>${springframework.core.version}</version>

</dependency>

<dependency>

<groupId>org.springframework</groupId>

<artifactId>spring-test</artifactId>

<version>${springframework.core.version}</version>

<scope>test</scope>

</dependency>

<dependency>

<groupId>org.springframework</groupId>

<artifactId>spring-tx</artifactId>

<version>${springframework.core.version}</version>

</dependency>

<dependency>

<groupId>org.springframework.batch</groupId>

<artifactId>spring-batch-core</artifactId>

<version>${spring.batch.version}</version>

</dependency>

<dependency>

<groupId>org.springframework.batch</groupId>

<artifactId>spring-batch-infrastructure</artifactId>

<version>${spring.batch.version}</version>

</dependency>

<dependency>

<groupId>org.springframework.batch</groupId>

<artifactId>spring-batch-test</artifactId>

<version>${spring.batch.version}</version>

<scope>test</scope>

</dependency>

<dependency>

<groupId>junit</groupId>

<artifactId>junit</artifactId>

<version>4.10</version>

<scope>test</scope>

</dependency>

项目构建好以后,首先开始写读取数据的逻辑,SpringBatch针对读、写操作提供了很多实现方式,包括文件,数据库,对于数据库的操作还提供了很多ORM框架(Hibernate,iBatis,JPA)的支持,这儿为了简单,以读文件作为例子,假设我们需要读取一个文件中所有人的信息,大于16岁的需要发信息需要发信息通知它去公安局办理身份证。简化文件如下:

Txt代码

1.TWer1,15

2.TWer2,21

3.TWer3,13

4.TWer4,16

5.TWer5,25

6.TWer6,45

7.TWer7,16

TWer1,15

TWer2,21

TWer3,13

TWer4,16

TWer5,25

TWer6,45

TWer7,16

,这儿需要的SpringBatch的读文件功能就是把文件中的每一行都能转化为一个内存对象,其对应的类就是User.java

publicclassUser{

Stringname;

intage;

publicStringgetName(){

returnname;

}

publicvoidsetName(Stringname){

this.name=name;

}

publicintgetAge(){

returnage;

}

publicvoidsetAge(intage){

this.age=age;

}

}另外,需要在message_job.xml中配置如下内容

Xml代码

<beanid="messageReader"class="org.springframework.batch.item.file.FlatFileItemReader">

<propertyname="lineMapper"ref="lineMapper"/>

<propertyname="resource"value="/message/user.txt"/>

</bean>

<beanid="lineMapper"class="org.springframework.batch.item.file.mapping.DefaultLineMapper">

<propertyname="lineTokenizer"ref="lineTokenizer"/>

<propertyname="fieldSetMapper"ref="fieldSetMapper"/>

</bean>

<beanid="fieldSetMapper"class="com.ning.demo.UserMapper"/>

<beanid="lineTokenizer"class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"/>,该配置文件中除了UserMapper外,都是SpringBatch默认提供的。UserMapper.java代码如下:

publicclassUserMapperimplementsFieldSetMapper<User>{

@Override

publicUsermapFieldSet(FieldSetfieldSet)throwsBindException{

Useruser=newUser();

user.setName(fieldSet.readString(0));

user.setAge(fieldSet.readInt(1));

returnuser;

}

}

这样,文件中的每一行数据都会变成一个User类的instance。

接下来,是处理数据的过程,判断每个user的年龄,如果大于16,就生成一条Message。

Java代码

publicclassMessageProcessorimplementsItemProcessor<User,Message>{

@Override

publicMessageprocess(Useritem)throwsException{

Messagemessage=null;

if(item.getAge()>16){

message=newMessage();

message.setContent(item.getName()+",Pleasecometopolicestation!");

}

returnmessage;

}

}

该类实现了SpringBatch的ItemProcessor接口,

最后,把处理后得到的所有Message打印到Console上,

Java代码

publicclassMessageWriterimplementsItemWriter<Message>{

@Override

publicvoidwrite(List<?extendsMessage>items)throwsException{

System.out.println("Results:");

for(Messageitem:items){

System.out.println(item.getContent());

}

}

}该类实现了SpringBatch的ItemWriter接口。SpringBatch本身提供了多种Writer实现。

通过上面的几个步骤,把读数据,处理数据,写数据都构造出来了,那么那么是如何串联起来的呢?答案是配置文件,

Xml代码

<batch:jobid="messageJob">

<batch:stepid="messageStep">

<batch:tasklet>

<batch:chunkreader="messageReader"processor="messageProcessor"writer="messageWriter"

commit-interval="10"

chunk-completion-policy="">

</batch:chunk>

</batch:tasklet>

</batch:step>

</batch:job>

<beanid="jobRepository"class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">

<propertyname="transactionManager"ref="transactionManager"/>

</bean>

<beanid="messageReader"class="org.springframework.batch.item.file.FlatFileItemReader">

<propertyname="lineMapper"ref="lineMapper"/>

<propertyname="resource"value="/message/user.txt"/>

</bean>

<beanid="lineMapper"class="org.springframework.batch.item.file.mapping.DefaultLineMapper">

<propertyname="lineTokenizer"ref="lineTokenizer"/>

<propertyname="fieldSetMapper"ref="fieldSetMapper"/>

</bean>

<beanid="fieldSetMapper"class="com.ning.demo.UserMapper"/>

<beanid="lineTokenizer"class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"/>

<beanid="messageProcessor"class="com.ning.demo.MessageProcessor"/>

<beanid="messageWriter"class="com.ning.demo.MessageWriter"/>

<beanid="transactionManager"class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>

SpringBatch将批处理任务称为一个Job,同时,Job下分为多个Step。Step是一个独立的、顺序的处理步骤,包含该步骤批处理中需要的所有信息。多个批处理Step按照一定的流程组成一个Job。通过这样的设计方式,我们可以灵活配置Job的处理过程。

接下来的问题是如何运行构建好的BatchJob呢?SpringBatch提供了JobLauncher接口用于运行Job,并提供了一个默认的SimpleJobLauncher实现。

Java代码

publicclassMain{

publicstaticvoidmain(String[]args){

ClassPathXmlApplicationContextc=

newClassPathXmlApplicationContext("message_job.xml");

SimpleJobLauncherlauncher=newSimpleJobLauncher();

launcher.setJobRepository((JobRepository)c.getBean("jobRepository"));

launcher.setTaskExecutor(newSimpleAsyncTaskExecutor());

try{

launcher.run((Job)c.getBean("messageJob"),newJobParameters());

}catch(Exceptione){

e.printStackTrace();

}

}

}运行BatchJob时需要为JobLauncher指定一个JobRepository,该类负责创建一个JobExecution对象来执行Job,其次,需要指定一个任务执行器,我们使用SpringBatch提供的SimpleAsyncTaskExecutor。最后,通过run方法来执行指定的Job。运行结果如下:

Results:

TWer2,Pleasecometopolicestation!

TWer5,Pleasecometopolicestation!

TWer6,Pleasecometopolicestation!

相关推荐