spring batch的处理流程:
读取数据->处理数据->写数据
reader->process->writer
maven 依赖:
<properties> <spring.version>3.2.2.RELEASE</spring.version> <spring.batch.version>2.2.0.RELEASE</spring.batch.version> <mysql.driver.version>5.1.25</mysql.driver.version> <junit.version>4.11</junit.version> </properties> <dependencies> <!-- Spring Core --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${spring.version}</version> </dependency> <!-- Spring jdbc, for database --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jdbc</artifactId> <version>${spring.version}</version> </dependency> <!-- Spring XML to/back object --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-oxm</artifactId> <version>${spring.version}</version> </dependency> <!-- MySQL database driver --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.driver.version}</version> </dependency> <!-- Spring Batch dependencies --> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-core</artifactId> <version>${spring.batch.version}</version> </dependency> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-infrastructure</artifactId> <version>${spring.batch.version}</version> </dependency> <!-- Spring Batch unit test --> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-test</artifactId> <version>${spring.batch.version}</version> </dependency> <!-- Junit --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> <scope>test</scope> </dependency> </dependencies>
POJO类:
package com.tch.test.spring.batch.entity; import java.text.SimpleDateFormat; import java.util.Date; public class Report { private SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy"); private int id; private Date date; public int getId() { return id; } public void setId(int id) { this.id = id; } public Date getDate() { return date; } public void setDate(Date date) { this.date = date; } @Override public String toString() { return "Report [id=" + id + ", date=" + dateFormat.format(date) + "]"; } }
mapper类:
package com.tch.test.spring.batch; import java.text.ParseException; import java.text.SimpleDateFormat; import org.springframework.batch.item.file.mapping.FieldSetMapper; import org.springframework.batch.item.file.transform.FieldSet; import org.springframework.validation.BindException; import com.tch.test.spring.batch.entity.Report; public class ReportFieldSetMapper implements FieldSetMapper<Report> { private SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy"); @Override public Report mapFieldSet(FieldSet fieldSet) throws BindException { Report report = new Report(); report.setId(fieldSet.readInt(0)); String date = fieldSet.readString(1); try { report.setDate(dateFormat.parse(date)); } catch (ParseException e) { e.printStackTrace(); } return report; } }
processor类:
package com.tch.test.spring.batch; import org.springframework.batch.item.ItemProcessor; import com.tch.test.spring.batch.entity.Report; public class CustomItemProcessor implements ItemProcessor<Report, Report> { @Override public Report process(Report report) throws Exception { System.out.println("Processing..." + report); return report; } }
writer类:
package com.tch.test.spring.batch; import java.util.List; import org.springframework.batch.item.ItemWriter; import com.tch.test.spring.batch.entity.Report; public class ReportItemWriter implements ItemWriter<Report>{ public void write(List<? extends Report> reports) throws Exception { for (Report m : reports) { System.out.println("write results : "+m); } } }
beans.xml:(commit-interval表示批处理每次事务处理记录数,这里每次处理十条)
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.2.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd "> <bean id="itemProcessor" class="com.tch.test.spring.batch.CustomItemProcessor" /> <batch:job id="helloWorldJob"> <batch:step id="step1"> <batch:tasklet> <batch:chunk reader="cvsFileItemReader" writer="reportWriter" processor="itemProcessor" commit-interval="10"> </batch:chunk> </batch:tasklet> </batch:step> </batch:job> <bean id="cvsFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader"> <property name="resource" value="classpath:report.txt" /> <property name="lineMapper"> <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> <property name="lineTokenizer"> <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"> </bean> </property> <property name="fieldSetMapper"> <bean class="com.tch.test.spring.batch.ReportFieldSetMapper" /> </property> </bean> </property> </bean> <bean id="reportWriter" class="com.tch.test.spring.batch.ReportItemWriter"></bean> <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" /> </bean> <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> <property name="transactionManager" ref="transactionManager" /> </bean> <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" /> </beans>
report.txt:
1001, 29/7/2013 1002, 30/7/2013 1003, 31/7/2013 1004, 29/7/2013 1005,30/7/2013 1006, 31/7/2013 1007, 29/7/2013 1008,30/7/2013 1009, 31/7/2013 1010, 29/7/2013 1011,30/7/2013 1012, 31/7/2013 1013, 29/7/2013 1014,30/7/2013 1015, 31/7/2013 1016, 29/7/2013 1017,30/7/2013 1018, 31/7/2013
Test:
package com.tch.test.spring.batch; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.context.support.ClassPathXmlApplicationContext; public class Test { public static void main(String[] args) throws Exception { String[] springConfig = { "beans.xml" }; ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(springConfig); JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher"); Job job = (Job) context.getBean("helloWorldJob"); JobExecution execution = jobLauncher.run(job, new JobParameters()); System.out.println("Exit Status : " + execution.getStatus()); System.out.println("Done"); context.close(); } }
如果要读取多个文件,则只需要使用MultiResourceItemReader即可:
beans.xml:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.2.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd"> <bean id="itemProcessor" class="com.tch.test.spring.batch.CustomItemProcessor" /> <batch:job id="helloWorldJob"> <batch:step id="step1"> <batch:tasklet> <batch:chunk reader="multiResourceReader" writer="reportWriter" processor="itemProcessor" commit-interval="10"> </batch:chunk> </batch:tasklet> </batch:step> </batch:job> <bean id="multiResourceReader" class=" org.springframework.batch.item.file.MultiResourceItemReader"> <property name="resources" value="classpath:report*.txt" /> <property name="delegate" ref="flatFileItemReader" /> </bean> <bean id="flatFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader"> <property name="resource" value="classpath:report.txt" /> <property name="lineMapper"> <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> <property name="lineTokenizer"> <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"> </bean> </property> <property name="fieldSetMapper"> <bean class="com.tch.test.spring.batch.ReportFieldSetMapper" /> </property> </bean> </property> </bean> <bean id="reportWriter" class="com.tch.test.spring.batch.ReportItemWriter"></bean> <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" /> </bean> <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> <property name="transactionManager" ref="transactionManager" /> </bean> <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" /> </beans>
beans.xml:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.2.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd"> <!-- 处理数据 --> <bean id="itemProcessor" class="com.tch.test.spring.batch.CustomItemProcessor" /> <!-- 处理数据的job --> <batch:job id="helloWorldJob"> <batch:step id="step1"> <batch:tasklet> <batch:chunk reader="multiResourceReader" writer="reportWriter" processor="itemProcessor" commit-interval="10"> </batch:chunk> </batch:tasklet> </batch:step> </batch:job> <!-- 读取多个资源的reader --> <bean id="multiResourceReader" class=" org.springframework.batch.item.file.MultiResourceItemReader"> <!-- 资源位置 --> <property name="resources" value="classpath:report*.txt" /> <!-- 使用读取单个资源的reader --> <property name="delegate" ref="flatFileItemReader" /> </bean> <!-- 读取单个资源的reader --> <bean id="flatFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader"> <property name="lineMapper"> <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> <!-- 分词器 --> <property name="lineTokenizer"> <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"> </bean> </property> <!-- 数据和实体的映射处理 --> <property name="fieldSetMapper"> <bean class="com.tch.test.spring.batch.ReportFieldSetMapper" /> </property> </bean> </property> </bean> <!-- 数据处理完成之后,自定义写操作 --> <bean id="reportWriter" class="com.tch.test.spring.batch.ReportItemWriter"></bean> <!-- job启动 --> <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" /> </bean> <!-- job仓库 --> <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> <property name="transactionManager" ref="transactionManager" /> </bean> <!-- 事务管理 --> <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" /> <!-- job注册 --> <bean class="org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor"> <property name="jobRegistry" ref="jobRegistry" /> </bean> <bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry" /> <!-- 使用quartz进行调度管理(每5秒钟执行一次) --> <bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean"> <property name="triggers"> <bean id="cronTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean"> <!-- 执行的操作 --> <property name="jobDetail" ref="jobDetail" /> <!-- 定时表达式 --> <property name="cronExpression" value="*/5 * * * * ?" /> </bean> </property> </bean> <bean id="jobDetail" class="org.springframework.scheduling.quartz.JobDetailBean"> <!-- 执行操作的class --> <property name="jobClass" value="com.tch.test.spring.batch.JobLauncherDetails" /> <property name="group" value="quartz-batch" /> <property name="jobDataAsMap"> <map> <entry key="jobName" value="helloWorldJob" /> <entry key="jobLocator" value-ref="jobRegistry" /> <entry key="jobLauncher" value-ref="jobLauncher" /> <entry key="param1" value="value1" /> <entry key="param2" value="value2" /> </map> </property> </bean> </beans>
JobLauncherDetails:
package com.tch.test.spring.batch; import java.util.Date; import java.util.Map; import java.util.Map.Entry; import org.quartz.JobExecutionContext; import org.springframework.batch.core.JobExecutionException; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.configuration.JobLocator; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.scheduling.quartz.QuartzJobBean; public class JobLauncherDetails extends QuartzJobBean { static final String JOB_NAME = "jobName"; private JobLocator jobLocator; private JobLauncher jobLauncher; public void setJobLocator(JobLocator jobLocator) { this.jobLocator = jobLocator; } public void setJobLauncher(JobLauncher jobLauncher) { this.jobLauncher = jobLauncher; } @SuppressWarnings("unchecked") protected void executeInternal(JobExecutionContext context) { Map<String, Object> jobDataMap = context.getMergedJobDataMap(); //拿到beans.xml中jobDetail里面配置的"jobName"对应的value String jobName = (String) jobDataMap.get(JOB_NAME); JobParameters jobParameters = getJobParametersFromJobMap(jobDataMap); try { //运行对应的job jobLauncher.run(jobLocator.getJob(jobName), jobParameters); } catch (JobExecutionException e) { e.printStackTrace(); } } // 读取配置的参数 private JobParameters getJobParametersFromJobMap(Map<String, Object> jobDataMap) { JobParametersBuilder builder = new JobParametersBuilder(); for (Entry<String, Object> entry : jobDataMap.entrySet()) { String key = entry.getKey(); Object value = entry.getValue(); //过滤掉"jobName" if (value instanceof String && !key.equals(JOB_NAME)) { builder.addString(key, (String) value); } else if (value instanceof Float || value instanceof Double) { builder.addDouble(key, ((Number) value).doubleValue()); } else if (value instanceof Integer || value instanceof Long) { builder.addLong(key, ((Number) value).longValue()); } else if (value instanceof Date) { builder.addDate(key, (Date) value); } else { //过滤掉beans.xml中jobDetail的jobDataAsMap配置的"jobLocator"、"jobLauncher"属性 // JobDataMap contains values which are not job parameters // (ignoring) } } // need unique job parameter to rerun the completed job builder.addDate("run date", new Date()); return builder.toJobParameters(); } }
运行:
package com.tch.test.spring.batch; import org.springframework.context.support.ClassPathXmlApplicationContext; public class Test { public static void main(String[] args) throws Exception { String[] springConfig = { "beans.xml" }; context = new ClassPathXmlApplicationContext(springConfig); } }
相关推荐
轻松引领进入数据批处理世界:基本特性...深度探索Spring Batch 批处理框架的核心概念:作业配置、作业步配置,以及Spring Batch 框架中经典的三步走策略。 快速提升数据批处理的能力:高性能、高可靠性、并行处理。
入门篇介绍了批处理、Spring Batch的基本特性和新特性,快速入门的Hello World等内容引领读者入门,从而进入数据批处理的世界。基本篇重点讲述了数据批处理的核心概念、典型的作业配置、作业步配置,以及Spring ...
一个SpringBatch的入门案例,SpringBatch是spring体系下的一个轻量级的批处理框架,使用简单方便,是非常优秀的一个批处理框架。如果你的工作中涉及到了批处理作业,可以考虑使用SpringBatch框架,本案例有详细的...
GKE上的Spring Batch Spring Batch Job在GKE上运行 描述 Spring批处理模型 工作 它是一个封装整个批处理过程的实体 它只是Step实例的容器 它组合了逻辑上属于流程的多个步骤,并允许配置所有步骤全局的属性 步它是...
Spring Batch入门的便捷方法 通过基于Web的用户界面启动/停止/重新启动作业 观察工作/步骤进度和状态。 步骤可以生成可以在UI中显示的输出。 通常是报告,脚本日志,堆栈跟踪等。 提供针对不同使用场景的...
9.2.1 Spring Batch 快速入门 362 9.2.2 Spring Boot 的支持 370 9.2.3 实战 371 9.3 异步消息 385 9.3.1 企业级消息代理 386 9.3.2 Spring 的支持 386 9.3.3 Spring Boot 的支持 386 9.3.4 JMS 实战 387 9.3.5 AMQP...
9.2.1 Spring Batch 快速入门 362 9.2.2 Spring Boot 的支持 370 9.2.3 实战 371 9.3 异步消息 385 9.3.1 企业级消息代理 386 9.3.2 Spring 的支持 386 9.3.3 Spring Boot 的支持 386 9.3.4 JMS 实战 387 9.3.5 AMQP...
9.2.1 Spring Batch 快速入门 362 9.2.2 Spring Boot 的支持 370 9.2.3 实战 371 9.3 异步消息 385 9.3.1 企业级消息代理 386 9.3.2 Spring 的支持 386 9.3.3 Spring Boot 的支持 386 9.3.4 JMS 实战 387 9.3.5 AMQP...
9.2 批处理Spring Batch .... ....................... 362 9.3 异步消息 .... ......................................... 385 9.4 系统集成Spring Integration .... ........... 395 第10 章 Spring Boot 开发部署...
9.2 批处理Spring Batch .... ....................... 362 9.3 异步消息 .... ......................................... 385 9.4 系统集成Spring Integration .... ........... 395 第10 章 Spring Boot 开发部署...
340 9.2 批处理Spring Batch .... ....................... 362 9.3 异步消息 .... ......................................... 385 9.4 系统集成Spring Integration .... ........... 395 第10 章 Spring Boot ...
Spring CLI示例快速入门 iii. 10.3. 从Spring Boot早期版本升级 iv. 11. 开发你的第一个Spring Boot应用 v. 11.1. 创建POM vi. 11.2. 添加classpath依赖 vii. 11.3. 编写代码 i. 11.3.1. @RestController和@...