Spring Batch初探
Spring batch是spring提供的批处理的框架,但不包括日程管理(schedule)的部分,最近项目用到,初试牛刀。
Spring Batch的核心概念
如下图,JobLancher启动job,一个job包含若干step,每个step又包含一个ItemReader(读数据),ItemProcessor(处理数据),和ItemWriter(输出数据),job的元数据和运行状态则存储在JobRepository中。
Job运行时概念
Job的一次完整运行称为一个JobInstance,由JobParameter区分(Spring认为相同的Job不应该多次运行),即如果JobParameter相同则为同一个Job,而一次运行如果中途失败或者抛异常,再次运行仍为一个JobInstance,而其中的每次运行称为一个JobExecution。
执行一个step称为StepExecution
关系如下:
Job 1->n JobInstance 1->n JobExecution 1->n StepExecution
JobExecution和StepExecution各包含一个ExecutionContext,其中存储了key-value对,可以用来存储运行状态。
实例
看spring自带的一个例子吧,billing,先看配置(有人说,spring是在用xml写代码,谁说不是呢?!并且xml没有语法检查的哦),
job配置
<job id="billingJob" restartable="true"> <step id="billingStep" next="payStep"> <tasklet start-limit="java.lang.Integer.MAX_VALUE"> <chunk reader="userDbReader" processor="billingProcessor" writer="billDbWriter" commit-interval="5" chunk-completion-policy=""> </chunk> </tasklet> </step> <step id="payStep"> <tasklet> <chunk reader="billDbReader" processor="payProcessor" writer="payDbWriter" commit-interval="5" chunk-completion-policy="" skip-limit="100"> <skippable-exception-classes> <include class="org.springframework.batch.sample.MoneyNotEnoughException" /> </skippable-exception-classes> </chunk> </tasklet> <next on="COMPLETED WITH SKIPS" to="messageStep"/> <listeners> <listener ref="payStepCheckingListener"></listener> </listeners> </step> <step id="messageStep"> <tasklet> <chunk reader="billArrearsDbReader" processor="messageProcessor" writer="messageDbWriter" commit-interval="5" chunk-completion-policy=""> </chunk> </tasklet> </step> </job><job>标签定义了一个job,其中有若干<step>,每个<step>又包含一个<tasklet>。
spring batch的write是一批批写入的,因此<tasklet>里面是<chunk>标签,commit-interval指定了批写入的大小,chunk-completion-policy指定什么情况下处理完成,默认是返回null。
该job的step有一个分支:
<next on="COMPLETED WITH SKIPS" to="messageStep"/>
当payStep,返回状态为:COMPLETED WITH SKIPS,则跳到messageStep执行。
监听,可以实现不同的接口监听item,task,job级别的处理这里payStepCheckingListener监听payStep的执行状态。
<skippable-exception-classes>,表示如果改step抛该异常时忽略,继续执行
由于spring batch已经实现了众多ItemReader和ItemWritter(从文件读写,从数据库读写等),因此基本上不用写实现他们的代码,配置好bean就好了。ItemProcessor是你需要实现的。
Job运行的参数
JobInstance是根据不同的参数来区分的,同一个JobInstance不能被多次启动。
另外可以在运行Job时,指定参数:
<bean id="cachingListener" class="org.jamee.demo.springbatch.CachingListener" scope="step"> <property name="logCount" value="#{jobParameters[logCount]}" /> </bean>
该logCount参数会在实例化时注入到cachingListener中,注意这里的scop="step",必须指定,以便延迟绑定,这个bean启动前才会实例化。
配置JobRepository
由于spring batch要把运行时信息写入到JobRepository,以便出错时可以重新从上次运行的地方启动,因此需要配置一个transaction-manager和data-source,job启动后,会在data-source创建一系列已BATCH_为前缀的表,记录Job运行状态和参数等。
<beans:bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean"> <beans:property name="dataSource" ref="dataSource" /> <beans:property name="transactionManager" ref="transactionManager" /> </beans:bean>
如果是测试环境,则可以配置为内存:
<beans:bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> <beans:property name="transactionManager" ref="transactionManager" /> </beans:bean> <beans:bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
启动Job
启动Job是通过JobLancher启动的,JobLancher是一个接口:
public interface JobLauncher { public JobExecution run(Job job, JobParameters jobParameters) throws JobExecutionAlreadyRunningException, JobRestartException; }Spring batch提供了一个SimpleJobLauncher启动job,注意setTaskExecutor方法设置了一个同步执行的executor,即可以设置异步或者并行的executor。
ClassPathXmlApplicationContext c = new ClassPathXmlApplicationContext( "billing_job.xml"); SimpleJobLauncher launcher = new SimpleJobLauncher(); launcher.setJobRepository((JobRepository) c.getBean("jobRepository")); launcher.setTaskExecutor(new SyncTaskExecutor()); try { Map<String, JobParameter> parameters = new HashMap<String, JobParameter>(); parameters.put(RUN_MONTH_KEY, new JobParameter("2011-2")); JobExecution je = launcher.run((Job) c.getBean("billingJob"), new JobParameters(parameters)); System.out.println(je); System.out.println(je.getJobInstance()); System.out.println(je.getStepExecutions()); } catch (Exception e) { e.printStackTrace(); }