Spring Batch初探

Spring batch是spring提供的批处理的框架,但不包括日程管理(schedule)的部分,最近项目用到,初试牛刀。

Spring Batch的核心概念

如下图,JobLancher启动job,一个job包含若干step,每个step又包含一个ItemReader(读数据),ItemProcessor(处理数据),和ItemWriter(输出数据),job的元数据和运行状态则存储在JobRepository中。


Spring Batch初探
 

Job运行时概念

Job的一次完整运行称为一个JobInstance,由JobParameter区分(Spring认为相同的Job不应该多次运行),即如果JobParameter相同则为同一个Job,而一次运行如果中途失败或者抛异常,再次运行仍为一个JobInstance,而其中的每次运行称为一个JobExecution。

执行一个step称为StepExecution

关系如下:

Job 1->n JobInstance 1->n JobExecution 1->n StepExecution

JobExecution和StepExecution各包含一个ExecutionContext,其中存储了key-value对,可以用来存储运行状态。

 实例

看spring自带的一个例子吧,billing,先看配置(有人说,spring是在用xml写代码,谁说不是呢?!并且xml没有语法检查的哦),

 job配置

<job id="billingJob" restartable="true">
		<step id="billingStep" next="payStep">
			<tasklet start-limit="java.lang.Integer.MAX_VALUE">
				<chunk reader="userDbReader" processor="billingProcessor"
					writer="billDbWriter" commit-interval="5" chunk-completion-policy="">
				</chunk>
			</tasklet>
		</step>
		<step id="payStep">
			<tasklet>
				<chunk reader="billDbReader" processor="payProcessor" writer="payDbWriter"
					commit-interval="5" chunk-completion-policy="" skip-limit="100">
					<skippable-exception-classes>
						<include
							class="org.springframework.batch.sample.MoneyNotEnoughException" />
					</skippable-exception-classes>
				</chunk>
			</tasklet>
			<next on="COMPLETED WITH SKIPS" to="messageStep"/>
			<listeners>
				<listener ref="payStepCheckingListener"></listener>
			</listeners>
		</step>
		<step id="messageStep">
			<tasklet>
				<chunk reader="billArrearsDbReader" processor="messageProcessor"
					writer="messageDbWriter" commit-interval="5"
					chunk-completion-policy="">
				</chunk>
			</tasklet>
		</step>
	</job>
 <job>标签定义了一个job,其中有若干<step>,每个<step>又包含一个<tasklet>。

spring batch的write是一批批写入的,因此<tasklet>里面是<chunk>标签,commit-interval指定了批写入的大小,chunk-completion-policy指定什么情况下处理完成,默认是返回null。

该job的step有一个分支:

<next on="COMPLETED WITH SKIPS" to="messageStep"/>

当payStep,返回状态为:COMPLETED WITH SKIPS,则跳到messageStep执行。

监听,可以实现不同的接口监听item,task,job级别的处理这里payStepCheckingListener监听payStep的执行状态。

<skippable-exception-classes>,表示如果改step抛该异常时忽略,继续执行

 由于spring batch已经实现了众多ItemReader和ItemWritter(从文件读写,从数据库读写等),因此基本上不用写实现他们的代码,配置好bean就好了。ItemProcessor是你需要实现的。

Job运行的参数

JobInstance是根据不同的参数来区分的,同一个JobInstance不能被多次启动。

另外可以在运行Job时,指定参数:

<bean id="cachingListener" class="org.jamee.demo.springbatch.CachingListener" scope="step">
        <property name="logCount" value="#{jobParameters[logCount]}" />
    </bean>

该logCount参数会在实例化时注入到cachingListener中,注意这里的scop="step",必须指定,以便延迟绑定,这个bean启动前才会实例化。

配置JobRepository

由于spring batch要把运行时信息写入到JobRepository,以便出错时可以重新从上次运行的地方启动,因此需要配置一个transaction-manager和data-source,job启动后,会在data-source创建一系列已BATCH_为前缀的表,记录Job运行状态和参数等。

<beans:bean id="jobRepository"
		class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
		<beans:property name="dataSource" ref="dataSource" />
		<beans:property name="transactionManager" ref="transactionManager" />
	</beans:bean>

 如果是测试环境,则可以配置为内存:

<beans:bean id="jobRepository"
        class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
        <beans:property name="transactionManager" ref="transactionManager" />
    </beans:bean>

    <beans:bean id="transactionManager"
        class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />

 启动Job

启动Job是通过JobLancher启动的,JobLancher是一个接口:

public interface JobLauncher {
    public JobExecution run(Job job, JobParameters jobParameters) 
                throws JobExecutionAlreadyRunningException, JobRestartException;
}
 Spring batch提供了一个SimpleJobLauncher启动job,注意setTaskExecutor方法设置了一个同步执行的executor,即可以设置异步或者并行的executor。
ClassPathXmlApplicationContext c = new ClassPathXmlApplicationContext(
				"billing_job.xml");
		SimpleJobLauncher launcher = new SimpleJobLauncher();
		launcher.setJobRepository((JobRepository) c.getBean("jobRepository"));
		launcher.setTaskExecutor(new SyncTaskExecutor());
		try {
			Map<String, JobParameter> parameters = new HashMap<String, JobParameter>();
			parameters.put(RUN_MONTH_KEY, new JobParameter("2011-2"));
			JobExecution je = launcher.run((Job) c.getBean("billingJob"),
					new JobParameters(parameters));
			System.out.println(je);
			System.out.println(je.getJobInstance());
			System.out.println(je.getStepExecutions());
		} catch (Exception e) {
			e.printStackTrace();
		}
 

相关推荐