`
dreamoftch
  • 浏览: 486538 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

spring batch批处理 入门

阅读更多

 

参考:spring batch参考

 

spring batch的处理流程:

 

读取数据->处理数据->写数据

 

reader->process->writer

 

 

maven 依赖:

<properties>
		<spring.version>3.2.2.RELEASE</spring.version>
		<spring.batch.version>2.2.0.RELEASE</spring.batch.version>
		<mysql.driver.version>5.1.25</mysql.driver.version>
		<junit.version>4.11</junit.version>
	</properties>
 
	<dependencies>
 
		<!-- Spring Core -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-core</artifactId>
			<version>${spring.version}</version>
		</dependency>
 
		<!-- Spring jdbc, for database -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jdbc</artifactId>
			<version>${spring.version}</version>
		</dependency>
 
		<!-- Spring XML to/back object -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-oxm</artifactId>
			<version>${spring.version}</version>
		</dependency>
 
		<!-- MySQL database driver -->
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>${mysql.driver.version}</version>
		</dependency>
 
		<!-- Spring Batch dependencies -->
		<dependency>
			<groupId>org.springframework.batch</groupId>
			<artifactId>spring-batch-core</artifactId>
			<version>${spring.batch.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.batch</groupId>
			<artifactId>spring-batch-infrastructure</artifactId>
			<version>${spring.batch.version}</version>
		</dependency>
 
		<!-- Spring Batch unit test -->
		<dependency>
			<groupId>org.springframework.batch</groupId>
			<artifactId>spring-batch-test</artifactId>
			<version>${spring.batch.version}</version>
		</dependency>
 
		<!-- Junit -->
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>${junit.version}</version>
			<scope>test</scope>
		</dependency>
 
	</dependencies>

 

POJO类:

 

package com.tch.test.spring.batch.entity;
 
import java.text.SimpleDateFormat;
import java.util.Date;
 
public class Report {
 
	private SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy");
	
	private int id;
	private Date date;
 
	public int getId() {
		return id;
	}
 
	public void setId(int id) {
		this.id = id;
	}
 
	public Date getDate() {
		return date;
	}
 
	public void setDate(Date date) {
		this.date = date;
	}

	@Override
	public String toString() {
		return "Report [id=" + id + ", date=" + dateFormat.format(date) + "]";
	}
 
}

 

mapper类:

 

package com.tch.test.spring.batch;
import java.text.ParseException;
import java.text.SimpleDateFormat;

import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;

import com.tch.test.spring.batch.entity.Report;
 
public class ReportFieldSetMapper implements FieldSetMapper<Report> {
 
	private SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy");
 
	@Override
	public Report mapFieldSet(FieldSet fieldSet) throws BindException {
		Report report = new Report();
		report.setId(fieldSet.readInt(0));
		String date = fieldSet.readString(1);
		try {
			report.setDate(dateFormat.parse(date));
		} catch (ParseException e) {
			e.printStackTrace();
		}
		return report;
	}
 
}

 

 

 

processor类:

 

package com.tch.test.spring.batch;
import org.springframework.batch.item.ItemProcessor;

import com.tch.test.spring.batch.entity.Report;
 
public class CustomItemProcessor implements ItemProcessor<Report, Report> {
 
	@Override
	public Report process(Report report) throws Exception {
		System.out.println("Processing..." + report);
		return report;
	}
 
}

 

 

writer类:

 

package com.tch.test.spring.batch;
import java.util.List;

import org.springframework.batch.item.ItemWriter;

import com.tch.test.spring.batch.entity.Report;

public class ReportItemWriter implements ItemWriter<Report>{
	public void write(List<? extends Report> reports) throws Exception {
		for (Report m : reports) {
			System.out.println("write results : "+m);
		}
	}
}

 

 

beans.xml:(commit-interval表示批处理每次事务处理记录数,这里每次处理十条)

 

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/batch
		http://www.springframework.org/schema/batch/spring-batch-2.2.xsd
		http://www.springframework.org/schema/beans 
		http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
	">

	<bean id="itemProcessor" class="com.tch.test.spring.batch.CustomItemProcessor" />

	<batch:job id="helloWorldJob">
		<batch:step id="step1">
			<batch:tasklet>
				<batch:chunk reader="cvsFileItemReader" writer="reportWriter" processor="itemProcessor" commit-interval="10">
				</batch:chunk>
			</batch:tasklet>
		</batch:step>
	</batch:job>

	<bean id="cvsFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader">
		<property name="resource" value="classpath:report.txt" />
		<property name="lineMapper">
			<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
				<property name="lineTokenizer">
					<bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
					</bean>
				</property>
				<property name="fieldSetMapper">
					<bean class="com.tch.test.spring.batch.ReportFieldSetMapper" />
				</property>
			</bean>
		</property>

	</bean>

	<bean id="reportWriter" class="com.tch.test.spring.batch.ReportItemWriter"></bean>

	<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository" />
	</bean>

	<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
		<property name="transactionManager" ref="transactionManager" />
	</bean>

	<bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />

</beans>

 

 

report.txt:

 

1001, 29/7/2013
1002, 30/7/2013
1003, 31/7/2013
1004, 29/7/2013
1005,30/7/2013
1006, 31/7/2013
1007, 29/7/2013
1008,30/7/2013
1009, 31/7/2013
1010, 29/7/2013
1011,30/7/2013
1012, 31/7/2013
1013, 29/7/2013
1014,30/7/2013
1015, 31/7/2013
1016, 29/7/2013
1017,30/7/2013
1018, 31/7/2013

 

Test:

 

package com.tch.test.spring.batch;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Test {
	public static void main(String[] args) throws Exception {

		String[] springConfig = { "beans.xml" };

		ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(springConfig);
		
		JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
		Job job = (Job) context.getBean("helloWorldJob");

		JobExecution execution = jobLauncher.run(job, new JobParameters());
		System.out.println("Exit Status : " + execution.getStatus());
		System.out.println("Done");
		
		context.close();
	}
}

 

 

 

 

如果要读取多个文件,则只需要使用MultiResourceItemReader即可:

 

beans.xml:

 

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/batch
		http://www.springframework.org/schema/batch/spring-batch-2.2.xsd
		http://www.springframework.org/schema/beans 
		http://www.springframework.org/schema/beans/spring-beans-3.2.xsd">

	<bean id="itemProcessor" class="com.tch.test.spring.batch.CustomItemProcessor" />

	<batch:job id="helloWorldJob">
		<batch:step id="step1">
			<batch:tasklet>
				<batch:chunk reader="multiResourceReader" writer="reportWriter"
					processor="itemProcessor" commit-interval="10">
				</batch:chunk>
			</batch:tasklet>
		</batch:step>
	</batch:job>

	<bean id="multiResourceReader"
		class=" org.springframework.batch.item.file.MultiResourceItemReader">
		<property name="resources" value="classpath:report*.txt" />
		<property name="delegate" ref="flatFileItemReader" />
	</bean>

	<bean id="flatFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader">
		<property name="resource" value="classpath:report.txt" />
		<property name="lineMapper">
			<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
				<property name="lineTokenizer">
					<bean
						class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
					</bean>
				</property>
				<property name="fieldSetMapper">
					<bean class="com.tch.test.spring.batch.ReportFieldSetMapper" />
				</property>
			</bean>
		</property>

	</bean>

	<bean id="reportWriter" class="com.tch.test.spring.batch.ReportItemWriter"></bean>

	<bean id="jobLauncher"
		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository" />
	</bean>

	<bean id="jobRepository"
		class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
		<property name="transactionManager" ref="transactionManager" />
	</bean>

	<bean id="transactionManager"
		class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />

</beans>

 

 

 

spring batch 和 quartz 定时批处理

 

beans.xml:

 

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/batch
		http://www.springframework.org/schema/batch/spring-batch-2.2.xsd
		http://www.springframework.org/schema/beans 
		http://www.springframework.org/schema/beans/spring-beans-3.2.xsd">

	<!-- 处理数据 -->
	<bean id="itemProcessor" class="com.tch.test.spring.batch.CustomItemProcessor" />

	<!-- 处理数据的job -->
	<batch:job id="helloWorldJob">
		<batch:step id="step1">
			<batch:tasklet>
				<batch:chunk reader="multiResourceReader" writer="reportWriter"
					processor="itemProcessor" commit-interval="10">
				</batch:chunk>
			</batch:tasklet>
		</batch:step>
	</batch:job>

	<!-- 读取多个资源的reader -->
	<bean id="multiResourceReader"
		class=" org.springframework.batch.item.file.MultiResourceItemReader">
		<!-- 资源位置 -->
		<property name="resources" value="classpath:report*.txt" />
		<!-- 使用读取单个资源的reader -->
		<property name="delegate" ref="flatFileItemReader" />
	</bean>

	<!-- 读取单个资源的reader -->
	<bean id="flatFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader">
		<property name="lineMapper">
			<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
				<!-- 分词器 -->
				<property name="lineTokenizer">
					<bean
						class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
					</bean>
				</property>
				<!-- 数据和实体的映射处理 -->
				<property name="fieldSetMapper">
					<bean class="com.tch.test.spring.batch.ReportFieldSetMapper" />
				</property>
			</bean>
		</property>

	</bean>

	<!-- 数据处理完成之后,自定义写操作 -->
	<bean id="reportWriter" class="com.tch.test.spring.batch.ReportItemWriter"></bean>

	<!-- job启动 -->
	<bean id="jobLauncher"
		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository" />
	</bean>

	<!-- job仓库 -->
	<bean id="jobRepository"
		class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
		<property name="transactionManager" ref="transactionManager" />
	</bean>
	
	<!-- 事务管理 -->
	<bean id="transactionManager"
		class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />

	<!-- job注册 -->
	<bean
		class="org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor">
		<property name="jobRegistry" ref="jobRegistry" />
	</bean>

	<bean id="jobRegistry"
		class="org.springframework.batch.core.configuration.support.MapJobRegistry" />

	<!-- 使用quartz进行调度管理(每5秒钟执行一次) -->
	<bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
		<property name="triggers">
			<bean id="cronTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean">
				<!-- 执行的操作 -->
				<property name="jobDetail" ref="jobDetail" />
				<!-- 定时表达式 -->
				<property name="cronExpression" value="*/5 * * * * ?" />
			</bean>
		</property>
	</bean>

	<bean id="jobDetail" class="org.springframework.scheduling.quartz.JobDetailBean">
		<!-- 执行操作的class -->
		<property name="jobClass" value="com.tch.test.spring.batch.JobLauncherDetails" />
		<property name="group" value="quartz-batch" />
		<property name="jobDataAsMap">
			<map>
				<entry key="jobName" value="helloWorldJob" />
				<entry key="jobLocator" value-ref="jobRegistry" />
				<entry key="jobLauncher" value-ref="jobLauncher" />
				<entry key="param1" value="value1" />
				<entry key="param2" value="value2" />
			</map>
		</property>
	</bean>
</beans>

 

 

JobLauncherDetails:

 

package com.tch.test.spring.batch;

import java.util.Date;
import java.util.Map;
import java.util.Map.Entry;

import org.quartz.JobExecutionContext;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.configuration.JobLocator;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.scheduling.quartz.QuartzJobBean;

public class JobLauncherDetails extends QuartzJobBean {

	static final String JOB_NAME = "jobName";

	private JobLocator jobLocator;

	private JobLauncher jobLauncher;

	public void setJobLocator(JobLocator jobLocator) {
		this.jobLocator = jobLocator;
	}

	public void setJobLauncher(JobLauncher jobLauncher) {
		this.jobLauncher = jobLauncher;
	}

	@SuppressWarnings("unchecked")
	protected void executeInternal(JobExecutionContext context) {

		Map<String, Object> jobDataMap = context.getMergedJobDataMap();

		//拿到beans.xml中jobDetail里面配置的"jobName"对应的value
		String jobName = (String) jobDataMap.get(JOB_NAME);
		JobParameters jobParameters = getJobParametersFromJobMap(jobDataMap);
		try {
			//运行对应的job
			jobLauncher.run(jobLocator.getJob(jobName), jobParameters);
		} catch (JobExecutionException e) {
			e.printStackTrace();
		}
	}

	// 读取配置的参数
	private JobParameters getJobParametersFromJobMap(Map<String, Object> jobDataMap) {
		JobParametersBuilder builder = new JobParametersBuilder();
		for (Entry<String, Object> entry : jobDataMap.entrySet()) {
			String key = entry.getKey();
			Object value = entry.getValue();
			//过滤掉"jobName"
			if (value instanceof String && !key.equals(JOB_NAME)) {
				builder.addString(key, (String) value);
			} else if (value instanceof Float || value instanceof Double) {
				builder.addDouble(key, ((Number) value).doubleValue());
			} else if (value instanceof Integer || value instanceof Long) {
				builder.addLong(key, ((Number) value).longValue());
			} else if (value instanceof Date) {
				builder.addDate(key, (Date) value);
			} else {
				//过滤掉beans.xml中jobDetail的jobDataAsMap配置的"jobLocator"、"jobLauncher"属性
				// JobDataMap contains values which are not job parameters
				// (ignoring)
			}
		}
		// need unique job parameter to rerun the completed job
		builder.addDate("run date", new Date());
		return builder.toJobParameters();
	}

}

 

 

运行:

 

package com.tch.test.spring.batch;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Test {
	public static void main(String[] args) throws Exception {

		String[] springConfig = { "beans.xml" };
		
		context = new ClassPathXmlApplicationContext(springConfig);

	}
}

 

 

 

 

 

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics