Published: March 31, 2017

Processing large data sets is often a complex activity that software developers have to deal with. Developers require advanced tools and technologies to process large chunks of data sets. Although, there are quite a few tools available to process large data sets, Spring Batch stands out from the competition. Spring Batch is a lightweight and robust framework that can be easily integrated with the Grails framework. This blog post explains how ‘chunk oriented processing’ can be integrated with ‘Spring Integration’ module.

Spring Batch Processing

Spring batch processing can be accomplished in two different ways.

  • Chunk Oriented Processing
  • Task Oriented Processing

This blog also explores the process of stopping a Spring Batch job effectively without compromising on performance, while executing big data sets. Further, it talks about restarting a failed/stopped batch job. Spring Integration provides a simple model for implementing complex enterprise integration solutions and facilitates asynchronous, message-driven behavior within a Spring-based application.

Chunk Oriented Processing

Chunk oriented processing refers to the process of reading data, one piece at a time and creating ‘chunks’ that are written to I/O stream within a transaction. In this process, an item is read from ‘ItemReader’ and handed over to an ‘ItemProcessor’ followed by ‘ItemWriter’. When the number of items read, matches commit interval the entire chunk is written out via ‘ItemWriter’ thereby committing the transaction.
Chunk oriented processing is applied when reading and writing is required for at least one data item. Subsequently, if reading/writing is required for data items then ‘TaskletStep Oriented Processing’ is used. The ‘Chunk Oriented Processing’ model has three important interfaces ‘ItemReader’, ‘ItemProcessor’ and ‘ItemWriter’, which are part of org.springframework.batch.item package.

  • ItemReader – The ‘ItemReader’ interface is used for providing data; it reads the data to be processed.
  • ItemProcessor – The ‘ItemProcessor’ interface is used for item transformation; it processes input object from ‘ItemReader’, transforming it to an output object.
  • ItemWriter – The ‘ItemWriter’ interface is used for generic output operations. It writes the data, which is transformed by ‘ItemProcessor’ i.e. data that is written to a database, memory, or output stream.

To provide clarity on Spring Batch integration, we have illustrated a sample application along with code, which we will write to a Microsoft Excel spreadsheet. Here are the steps and code that has been written in Grails 2.4 version. Further, the configuration/steps can easily function in any framework similar to Spring with minimal changes.

Step 1 – Resource.xml

The resource.xml defines all Spring Batch beans such as transaction manager and job repository to establishing a job context. Further, the message channel, message handler, message endpoint beans are defined for the message queue. Subsequently, bean job explorer and job registry are used to stop a running job and restart any failed/stopped jobs.

[xml]
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:jdbc="http://www.springframework.org/schema/jdbc"
xmlns:task="http://www.springframework.org/schema/task"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration/jdbc http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc-4.1.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.1.xsd
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-3.0.xsd
http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.2.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">
<!-- **************************************************
*Spring Batch stuff
************************************************** -->
<bean id="batchTxManager"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource_rimdb" />
</bean>
<bean id="batchJobRepository"
class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
<property name="dataSource" ref="dataSource_rimdb" />
<property name="transactionManager" ref="batchTxManager" />
<property name="isolationLevelForCreate" value="ISOLATION_READ_COMMITTED" />
<property name="validateTransactionState" value="false" />
</bean>
<!-- **************************************************
* Spring Integration stuff
************************************************** -->
<bean id="msgStore" class="org.springframework.integration.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource_rimdb"/>
<property name="channelMessageStoreQueryProvider" ref="msgStoreQueryProvider" />
</bean>
<!-- See also http://docs.spring.io/spring-integration/reference/htmlsingle/#channel-configuration-queuechannel
and http://docs.spring.io/spring-integration/docs/latest-ga/reference/html/system-management-chapter.html#message-store -->
<int:channel id="msgStoreChannel">
<int:queue message-store="msgStore"/>
</int:channel>
<bean id="msgTxManager"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource_rimdb" />
</bean>
<bean id="msgHandler" class="com.test.batch.report.ReportJobDispatcher" />
<!-- We tie together the filter and polling consumer in a chain so we need not define
additional channels. -->
<int:chain id="msgEndpointChain" input-channel="msgStoreChannel">
<!-- Messages that are discarded becaus the thread pool is saturated will be queued again
by setting the discard-channel to our input message queue -->
<int:filter id="msgFilter" discard-channel="msgStoreChannel">
<bean class="com.test.batch.integration.MsgFilter" />
</int:filter>
<!-- See also http://docs.spring.io/spring-integration/docs/latest-ga/reference/html/messaging-endpoints-chapter.html#endpoint
Adjust as needed. max-messages-per-poll can also be increased, each message is still handled within its own transaction
See also http://docs.spring.io/spring-integration/docs/latest-ga/reference/html/transactions.html#transaction-poller -->
<int:poller max-messages-per-poll="1" fixed-rate="1000">
<int:transactional
transaction-manager="msgTxManager"
propagation="REQUIRED"
isolation="DEFAULT"
timeout="10000"
read-only="false" />
</int:poller>
<int:outbound-channel-adapter id="msgOutboundChannelAdapter" ref="msgHandler" method="handleMessage" />
</int:chain>
<bean id="jobExplorer" class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean">
<property name="dataSource" ref="dataSource_rimdb" />
</bean>
<bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry" />
</beans>
[/xml]

Step 2 – BatchReportConfig

BatchReportConfig is a simple class that is used to obtain any bean from the Grails context created in resource.xml. Further, it also helps in accessing any Grails bean outside the Grails context i.e. other than the Grails controller/service classes.
Another option is the convenience class, which can be used for accessing batch-related configuration. This restricts the rest of application from being made aware of the Spring configuration thus centralizing the access to config.groovy.

[xml]import org.codehaus.groovy.grails.web.mapping.LinkGenerator
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.messaging.MessageChannel;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
class BatchReportConfig {
private static JobRepository jobRepository
private static PlatformTransactionManager txManager
private static MessageChannel msgChannel
private static ThreadPoolTaskExecutor taskExecutor
private static LinkGenerator linkGenerator
private static ConfigObject configObj
private static JobExplorer jobExplorer
private static JobRegistry jobRegistry
private BatchReportConfig() {}
// ****************************************************
// Convenience methods for Spring Beans
// ****************************************************
public static synchronized MessageChannel getMessageChannel() {
if (!msgChannel)
msgChannel = ApplicationContextHolder.getBean("msgStoreChannel")
return msgChannel
}
public static synchronized PlatformTransactionManager getTxManager() {
if (!txManager)
txManager = ApplicationContextHolder.getBean("batchTxManager")
return txManager
}
public static synchronized JobRepository getJobRepository() {
if (!jobRepository) {
jobRepository = ApplicationContextHolder.getBean("batchJobRepository")
}
return jobRepository
}
public static synchronized JobExplorer getJobExplorer() {
if (!jobExplorer)
jobExplorer = ApplicationContextHolder.getBean("jobExplorer")
return jobExplorer
}
public static synchronized JobRegistry getJobRegistry() {
if (!jobRegistry)
jobRegistry = ApplicationContextHolder.getBean("jobRegistry")
return jobRegistry
}
public static synchronized LinkGenerator getLinkGenerator() {
if (!linkGenerator)
linkGenerator = ApplicationContextHolder.getBean("grailsLinkGenerator")
return linkGenerator
}
}[/xml]

Step 3 – ReportJobFactory

ReportJobFactory is an abstract class that declares abstract methods, so that they can be override/implemented in order to return appropriate ‘ItemReader’, ‘ItemProcessor’ and ‘ItemWriter’. A concrete method ‘buildJob’ is used to build a report job with job parameters and appropriate ‘ItemReader’, ‘ItemProcessor’ and ‘ItemWriter’ as specified in below ReportJob class. Additionally, the abstract class helps in creating a report job based on submitted job request type. However, this class must be subclassed for each new type of tool report.

[xml]import java.util.List
import java.util.Map
import com.test.batch.report.ReportJob
import com.test.batch.report.ReportJobConfig
import com.test.batch.integration.InputSource
import org.springframework.batch.core.JobParameters
import org.springframework.batch.item.ItemProcessor
import org.springframework.batch.item.ItemReader
import org.springframework.batch.item.ItemWriter
abstract class ReportJobFactory {
private JobParameters jobParams
private ReportJobConfig jobConfig
private List inputList
public ReportJobFactory(JobParameters jobParams, ReportJobConfig jobConfig, List inputList) {
assert jobParams && jobConfig
this.jobParams = jobParams
this.jobConfig = jobConfig
this.inputList = inputList
}
protected abstract ItemReader newListItemReader(JobParameters jobParams, List inputList)
protected abstract ItemReader newFileItemReader(JobParameters jobParams)
protected abstract ItemProcessor newItemProcessor(JobParameters jobParams)
protected abstract ItemWriter newItemWriter(JobParameters jobParams)
public ReportJob buildJob() {
ItemReader itemReader
if (InputSource.LIST.toString () == jobParams.getString("inputSource"))
itemReader = newListItemReader(jobParams, inputList)
else
itemReader = newFileItemReader(jobParams)
ItemProcessor itemProcessor = newItemProcessor(jobParams)
ItemWriter itemWriter = newItemWriter(jobParams)
return new ReportJob(jobConfig, jobParams, itemReader, itemProcessor, itemWriter)
}
}
[/xml]

Step 4 – ReportJob

Wrapper class is used to build a report tool that is specific to batch job/step configuration. Further, this class generalizes necessary functionality, thereby making it easy to add new tool reports i.e. Developers need not have exhaustive knowledge of Spring Batch.
The wrapper class builds two steps:

  1. First is the ‘report step’ which executes ‘ItemReader’, ‘ItemProcessor’ and ‘ItemWriter’ with a chunk size (i.e. commit interval) that is used to segregate data into chunks/pieces. Moreover, the chunk size/commit interval has a greater importance in stopping a running job.
  2. The second is an optional email notification sent to an email after completion of ‘report step’ process with a message ‘success or failure’.
[xml]import com.test.batch.integration.ReportDelivery
import groovy.json.JsonBuilder
import java.util.concurrent.Executors
import org.apache.tomcat.jdbc.pool.DataSource
import org.springframework.batch.core.Job
import org.springframework.batch.core.JobExecution
import org.springframework.batch.core.JobExecutionListener
import org.springframework.batch.core.JobParameter
import org.springframework.batch.core.JobParameters
import org.springframework.batch.core.JobParametersBuilder
import org.springframework.batch.core.Step
import org.springframework.batch.core.job.SimpleJob
import org.springframework.batch.core.job.builder.JobBuilder
import org.springframework.batch.core.job.builder.SimpleJobBuilder
import org.springframework.batch.core.launch.support.RunIdIncrementer
import org.springframework.batch.core.repository.JobRepository
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean
import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean
import org.springframework.batch.core.repository.support.SimpleJobRepository
import org.springframework.batch.core.step.builder.SimpleStepBuilder
import org.springframework.batch.core.step.builder.StepBuilder
import org.springframework.batch.core.step.builder.TaskletStepBuilder
import org.springframework.batch.core.step.tasklet.Tasklet
import org.springframework.batch.core.step.tasklet.TaskletStep
import org.springframework.batch.item.ItemProcessor
import org.springframework.batch.item.ItemReader
import org.springframework.batch.item.ItemWriter
import org.springframework.batch.item.support.IteratorItemReader
class ReportJob {
private static final String REPORT_STEP_NAME = "report"
private static final String NOTIFICATION_STEP_NAME = "notification"
private static final String CLEANUP_STEP_NAME = "cleanup"
private ReportJobConfig config
private ItemReader itemReader
private ItemProcessor itemProcessor
private ItemWriter itemWriter
private SimpleJob job
private JobParameters jobParams
public ReportJob(ReportJobConfig config, JobParameters jobParameters, ItemReader itemReader, ItemProcessor itemProcessor, ItemWriter itemWriter) {
assert config && jobParameters && itemReader && itemProcessor && itemWriter
this.config = config
this.jobParams = jobParameters
this.itemReader = itemReader
this.itemProcessor = itemProcessor
this.itemWriter = itemWriter
job = buildJob()
job.registerJobExecutionListener(new ReportJobExecutionListener())
}
public Job getJob() {
return job
}
public JobParameters getJobParameters() {
return jobParams
}
public JobRepository getJobRepository() {
return config.jobRepository
}
public String getToolName() {
return jobParams.getString("toolName")
}
protected Step newReportStep() {
SimpleStepBuilder stepBuilder = new StepBuilder(REPORT_STEP_NAME)
./*<String, Map>*/ chunk(jobParams.getLong("chunkSize").intValue()) // <input type, output type>
.reader(itemReader)
.writer(itemWriter)
.processor(itemProcessor)
.repository(config.jobRepository)
.transactionManager(config.txManager)
.startLimit(3) // give up after 3 tries
TaskletStep step = stepBuilder.build()
return step
}
protected Step newNotificationStep() {
Tasklet tasklet = new MailNotificationTasklet(jobParams.getString("userEmail"),jobParams.getString("ccEmail"), jobParams.getString("toolDescr"),
jobParams.getString("jobRef"), config.fileDir, Enum.valueOf(ReportDelivery, jobParams.getString("reportDelivery")))
TaskletStepBuilder stepBuilder = new StepBuilder(NOTIFICATION_STEP_NAME)
.tasklet(tasklet)
.repository(config.jobRepository)
.transactionManager(config.txManager)
TaskletStep step = stepBuilder.build()
return step
}
protected SimpleJob buildJob() {
SimpleJobBuilder jobBuilder = new JobBuilder(jobParams.getString("toolName"))
.start(newReportStep())
.next(newNotificationStep())
.repository(config.jobRepository)
.incrementer(new RunIdIncrementer()) // do we need this?
return jobBuilder.build()
}
}
[/xml]

Step 5 – ReportJobDispatcher

ReportJobDispatcher handles single job requests retrieved from a message queue. This is called from a polling outbound channel adapter in transaction contexts. If the method throws an exception (i.e. if the task executor is saturated) then message remains in the queue. Furthermore, it creates a new job instance from the given job request and instantly launches a job request. If the launch fails (i.e. due to executor threads being busy) then an execution is raised to retain message in the queue.

[xml]import com.test.batch.report.factory.ReportJobFactory
import com.test.batch.report.factory.ReportJobFactoryFactory
import com.test.batch.integration.ReportJobRequest
import org.springframework.batch.core.JobParameters
import groovy.json.JsonSlurper
import org.springframework.messaging.Message
import org.springframework.messaging.MessageHandler
import org.springframework.messaging.MessagingException
class ReportJobDispatcher implements MessageHandler {
@Override
public void handleMessage(Message<ReportJobRequest> message) throws MessagingException {
assert message
ReportJobRequest jobRequest = new ReportJobRequest(message.getPayload())
dispatch(jobRequest)
}
protected void dispatch(ReportJobRequest jobRequest) {
assert jobRequest
ReportJobConfig jobConfig = BatchReportConfig.newDefaultJobConfig()
JobParameters jobParams = ReportJobParamsBuilder.build(jobRequest)
ReportJobFactory reportJobFactory = ReportJobFactoryFactory.newReportJobFactory(jobParams, jobConfig, jobRequest.inputList)
ReportJob reportJob = reportJobFactory.buildJob()
if(jobRequest.jobExecutionId){
if(jobRequest.jobRquestType == "Restart"){
ReportJobLauncher.restart(reportJob, jobRequest.jobExecutionId)
}else if(jobRequest.jobRquestType == "Stop"){
ReportJobLauncher.stop(reportJob, jobRequest.jobExecutionId)
}else{
}
}else{
ReportJobLauncher.submit(reportJob)
}
}
}
[/xml]

Step 6 – ReportJobFactoryFactory

This is a factory class used for creating a report job factory based on the tool name.

[xml]import com.test.batch.domainreports.gadatafeed.GaDataFeedReportJobFactory;
import com.test.batch.domainreports.livesitereport.LiveSiteReportReportJobFactory;
import com.test.batch.report.ReportJobConfig
import org.springframework.batch.core.JobParameters
class ReportJobFactoryFactory {
private ReportJobFactoryFactory() {}
public static ReportJobFactory newReportJobFactory(JobParameters jobParams, ReportJobConfig jobConfig, List inputList) {
assert jobParams && jobConfig
ReportJobFactory factory
String toolName = jobParams.getString("toolName")
switch(toolName) {
case "abc":
factory = new AbcReportJobFactory(jobParams, jobConfig, inputList)
break
case "xyz":
factory = new XyzReportJobFactory(jobParams, jobConfig, inputList)
break
default:
throw new Exception("Report factory for tool [${toolName}] not found")
}
assert factory
return factory
}
}
[/xml]

A new job factory instantiated/created in the above class extends to ‘ReportJobfactory’ and overrides all the methods to return appropriate ‘ItemReader’, ‘ItemProcessor’ and ‘ItemWriter’.

[xml]import java.util.List
import java.util.Map
import com.test.batch.report.PassthroughFlatFileItemReader
import com.test.batch.report.ReportJobConfig
import com.test.batch.report.factory.ReportJobFactory
import org.springframework.batch.core.JobParameters
import org.springframework.batch.item.ItemProcessor
import org.springframework.batch.item.ItemReader
import org.springframework.batch.item.ItemWriter
import org.springframework.batch.item.support.IteratorItemReader
class AbcReportJobFactory extends ReportJobFactory {
public AbcReportJobFactory(JobParameters jobParams, ReportJobConfig jobConfig, List inputList) {
super(jobParams, jobConfig, inputList)
}
@Override
protected ItemReader newListItemReader(JobParameters jobParams, List inputList) {
return new IteratorItemReader(inputList)
}
@Override
protected ItemReader newFileItemReader(JobParameters jobParams) {
PassthroughFlatFileItemReader<String> itemReader = new PassthroughFlatFileItemReader<String>()
itemReader.setSaveState(false)
return itemReader
}
@Override
protected ItemProcessor newItemProcessor(JobParameters jobParams) {
return new AbcItemProcessor()
}
@Override
protected ItemWriter newItemWriter(JobParameters jobParams) {
AbcLineItemWriter itemWriter = new AbcLineItemWriter()
return itemWriter
}
}
[/xml]

ItemReader

The ‘ItemReader’ that is returned here is either a generic ‘IteratorItemReader’ or a ‘FlatFileItemReader’, which is adjudged depending on the data input.

  1. If the input data is a list, then it returns ‘IteratorItemReader’
  2. If it is a text file, then it returns ‘FlatFileItemReader’.

ItemProcessor

The ‘ItemProcessor’ implements a generic ItemProcessor with string as an input, which in turn returns a data map. The implementation process takes place in the process method and ‘ItemProcessor’ is called for each and every entry returned from ‘Itemreader’.

[xml]import java.util.Map
import org.springframework.batch.core.JobParameters
import org.springframework.batch.core.StepExecution
import org.springframework.batch.core.annotation.BeforeStep
import org.springframework.batch.item.ItemProcessor
class AbcItemProcessor implements ItemProcessor<String, Map> {
private boolean showIpAddr, showIp6Addr
@BeforeStep
void beforeStep(StepExecution stepExecution) {
}
@Override
public Map process(String domain) throws Exception {
return [:]
}
}
[/xml]

ItemWriter

The ‘ItemWriter’ implements generic ItemWriter data map, which is called after per set or chunk of data is processed by ‘ItemProcessor’ in order to write any output stream.

[xml]import com.test.batch.report.BatchReportFileUtils;
import java.io.File;
import java.io.IOException;
import java.io.Writer;
import java.util.HashMap;
import java.util.List;
import org.springframework.batch.core.JobParameters
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.AfterStep;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.item.ExecutionContext
import org.springframework.batch.item.ItemWriter;
class AbcLineItemWriter implements ItemWriter<HashMap> {
@BeforeStep
void beforeStep(StepExecution stepExecution) {
}
@AfterStep
void afterStep(StepExecution stepExecution) {
}
@Override
public void write(List<? extends HashMap> lstNameServerMap) throws Exception {
}
}
[/xml]

ReportJobLauncher

The ReportJobLauncher picks a report job and launches/restarts/stops a job using Spring Batch job launcher with appropriate thread pool. We can use one thread pool per different tool to simplify the implementation process.

Submitting/Running a Job

Synchronize the ‘submit method’ to ensure no issues are reported while accessing some executor pool properties (although there should not be more than one instance of a caller). Submit method is called within transaction context of polling message handler to extract a job message from the queue.  If this throws an exception, then transaction is rolled back and message is retained in the queue for subsequent processing.
What we need for execution is a non-queuing thread pool with a maximum number of threads (i.e. direct hand-off; we do not want queuing by the thread pool, as it is not persistent. All persistent queuing is taken care by the message queue.), which can be achieved using a synchronous queue.
Note: If the maximum pool size is too small then the perseverant queue will build over time. For further details, please refer thread pool bean configuration under Resource.xml section.

[xml]import java.util.Map;
import java.util.concurrent.BlockingQueue
import java.util.concurrent.SynchronousQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import com.test.ApplicationContextHolder
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution
import org.springframework.batch.core.JobParameters
import org.springframework.batch.core.configuration.JobRegistry
import org.springframework.batch.core.configuration.support.MapJobRegistry
import org.springframework.batch.core.configuration.support.ReferenceJobFactory
import org.springframework.batch.core.explore.JobExplorer
import org.springframework.batch.core.launch.support.SimpleJobLauncher
import org.springframework.batch.core.launch.support.SimpleJobOperator
import org.springframework.batch.core.repository.JobRepository
import org.springframework.core.task.TaskExecutor;
import org.springframework.core.task.TaskRejectedException
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
import org.springframework.batch.core.ExitStatus
class ReportJobLauncher {
private static final Logger log = LoggerFactory.getLogger(ReportJobLauncher.class)
public static synchronized void submit(ReportJob reportJob) {
assert reportJob
ThreadPoolTaskExecutor taskExecutor = ReportJobThreadPoolManager.getInstance().getExecutor(reportJob.getToolName())
if (taskExecutor.getThreadPoolExecutor().isShutdown())
throw new TaskRejectedException("Task executor is shut down")
if (taskExecutor.getThreadPoolExecutor().getActiveCount() >= BatchReportConfig.getToolMaxThreads(reportJob.getToolName()))
throw new TaskRejectedException("No threads for batch execution of tool [${reportJob.getToolName()}] available")
SimpleJobLauncher launcher = new SimpleJobLauncher()
launcher.setJobRepository(reportJob.getJobRepository())
launcher.setTaskExecutor(taskExecutor)
launcher.afterPropertiesSet()
JobExecution jobExecution = launcher.run(reportJob.getJob(), reportJob.getJobParameters())
if (jobExecution.getExitStatus() == ExitStatus.FAILED)
// We should never reach this because we check for available threads above
throw new Exception(jobExecution.ExitStatus.getExitDescription())
}
}[/xml]

Restarting a Job

Here is the code snippet to restart a failed/stopped job using the job execution id.

[xml]import java.util.Map;
import java.util.concurrent.BlockingQueue
import java.util.concurrent.SynchronousQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import com.test.ApplicationContextHolder
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution
import org.springframework.batch.core.JobParameters
import org.springframework.batch.core.configuration.JobRegistry
import org.springframework.batch.core.configuration.support.MapJobRegistry
import org.springframework.batch.core.configuration.support.ReferenceJobFactory
import org.springframework.batch.core.explore.JobExplorer
import org.springframework.batch.core.launch.support.SimpleJobLauncher
import org.springframework.batch.core.launch.support.SimpleJobOperator
import org.springframework.batch.core.repository.JobRepository
import org.springframework.core.task.TaskExecutor;
import org.springframework.core.task.TaskRejectedException
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
import org.springframework.batch.core.ExitStatus
class ReportJobLauncher {
private static final Logger log = LoggerFactory.getLogger(ReportJobLauncher.class)
public static synchronized void restart(ReportJob reportJob, Long jobExecutionId) {
assert reportJob
try {
ThreadPoolTaskExecutor taskExecutor = ReportJobThreadPoolManager.getInstance().getExecutor(reportJob.getToolName())
if (taskExecutor.getThreadPoolExecutor().isShutdown())
throw new TaskRejectedException("Task executor is shut down")
if (taskExecutor.getThreadPoolExecutor().getActiveCount() >= BatchReportConfig.getToolMaxThreads(reportJob.getToolName())){
throw new TaskRejectedException("No threads for batch execution of tool [${reportJob.getToolName()}] available")
}
JobExplorer jobExplorer = BatchReportConfig.getJobExplorer()
JobRegistry jobRegistry = new MapJobRegistry()
JobRepository jobRepository = BatchReportConfig.getJobRepository()
JobExecution jobExecution = jobExplorer.getJobExecution(jobExecutionId)
if(jobExecution){
jobRegistry.register(new ReferenceJobFactory(reportJob.getJob()))
if(!(jobExecution.getStatus().equals(BatchStatus.COMPLETED))){
SimpleJobOperator simpleJobOperator = new SimpleJobOperator()
simpleJobOperator.setJobExplorer(jobExplorer)
simpleJobOperator.setJobRegistry(jobRegistry)
simpleJobOperator.setJobRepository(jobRepository)
SimpleJobLauncher launcher = new SimpleJobLauncher()
launcher.setJobRepository(jobRepository)
launcher.setTaskExecutor(taskExecutor)
launcher.afterPropertiesSet()
simpleJobOperator.setJobLauncher(launcher)
simpleJobOperator.restart(jobExecution.getId())
}
}
} catch (Exception e) {
throw new Exception(e)
}
}
}[/xml]

Stopping a Job

To stop a job effectively with minimal time and greater performance, set the chunk size/commit interval as small as possible. The chunk size/commit interval is defined and used in ‘ReportJob’ class to build a new report step function.
Note: When writing data to a CSV, Excel or other file systems, retaining smaller chunk size/commit interval value is a great option. However, if we are using a data source (database) for writing then this impacts the performance (as smaller the commit interval, higher the database interaction).

[xml]import java.util.Map;
import java.util.concurrent.BlockingQueue
import java.util.concurrent.SynchronousQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import com.test.ApplicationContextHolder
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution
import org.springframework.batch.core.JobParameters
import org.springframework.batch.core.configuration.JobRegistry
import org.springframework.batch.core.configuration.support.MapJobRegistry
import org.springframework.batch.core.configuration.support.ReferenceJobFactory
import org.springframework.batch.core.explore.JobExplorer
import org.springframework.batch.core.launch.support.SimpleJobLauncher
import org.springframework.batch.core.launch.support.SimpleJobOperator
import org.springframework.batch.core.repository.JobRepository
import org.springframework.core.task.TaskExecutor;
import org.springframework.core.task.TaskRejectedException
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
import org.springframework.batch.core.ExitStatus
class ReportJobLauncher {
private static final Logger log = LoggerFactory.getLogger(ReportJobLauncher.class)
public static synchronized void stop(ReportJob reportJob, Long jobExecutionId) {
assert reportJob
try {
ThreadPoolTaskExecutor taskExecutor = ReportJobThreadPoolManager.getInstance().getExecutor(reportJob.getToolName())
if (taskExecutor.getThreadPoolExecutor().isShutdown())
throw new TaskRejectedException("Task executor is shut down")
if (taskExecutor.getThreadPoolExecutor().getActiveCount() >= BatchReportConfig.getToolMaxThreads(reportJob.getToolName())){
throw new TaskRejectedException("No threads for batch execution of tool [${reportJob.getToolName()}] available")
}
JobExplorer jobExplorer = BatchReportConfig.getJobExplorer()
JobRegistry jobRegistry = new MapJobRegistry()
JobRepository jobRepository = BatchReportConfig.getJobRepository()
JobExecution jobExecution = jobExplorer.getJobExecution(jobExecutionId)
if(jobExecution){
jobRegistry.register(new ReferenceJobFactory(reportJob.getJob()))
if(!(jobExecution.getStatus().equals(BatchStatus.COMPLETED))){
jobExecution.stop()
SimpleJobOperator simpleJobOperator = new SimpleJobOperator()
simpleJobOperator.setJobExplorer(jobExplorer)
simpleJobOperator.setJobRegistry(jobRegistry)
simpleJobOperator.setJobRepository(jobRepository)
SimpleJobLauncher launcher = new SimpleJobLauncher()
launcher.setJobRepository(jobRepository)
launcher.setTaskExecutor(taskExecutor)
launcher.afterPropertiesSet()
simpleJobOperator.setJobLauncher(launcher)
simpleJobOperator.stop(jobExecution.getId())
}
}
} catch (Exception e) {
throw new Exception(e)
}
}
}
[/xml]

Create a utility class to submit/restart/stop a job. To set up a job in a message queue use the below class. The message queue can pick a job and submit it to report job dispatcher to start/restart/dispatch a job.

[xml]import java.util.List;
import java.util.Map;
import com.test.batch.report.BatchReportConfig
import com.test.batch.report.BatchReportFileUtils
import com.test.security.SecurityUtils
import org.apache.commons.io.FilenameUtils;
import org.codehaus.groovy.grails.web.mapping.LinkGenerator
import org.springframework.batch.core.JobExecution
import org.springframework.batch.core.JobParameters
import org.springframework.batch.core.explore.JobExplorer
import org.springframework.messaging.MessageChannel
import org.springframework.messaging.support.GenericMessage
import org.springframework.web.multipart.MultipartFile;
class IntegrationUtils {
private IntegrationUtils() {}
private static ReportJobRequest postReportJobRequest(String toolName, String email, Date submissionDate, List items, String jobRef = null, Map params = null, Long jobExecutionId = null, String jobRquestType = null) {
assert toolName && email && submissionDate
ReportJobRequest jobRequest = new ReportJobRequest()
jobRequest.submissionTime = submissionDate.getTime()
jobRequest.toolName = toolName
jobRequest.toolDescr = BatchReportConfig.getToolDescr(toolName)
jobRequest.userEmail = email
jobRequest.jobRef = jobRef
jobRequest.toolParams = params
jobRequest.inputSource = items ? InputSource.LIST : InputSource.FILE
jobRequest.inputList = items
jobRequest.reportDelivery = ReportDelivery.LINK
jobRequest.chunkSize = BatchReportConfig.getConfigObj().batch.report.chunkSize
jobRequest.jobExecutionId = jobExecutionId
jobRequest.jobRquestType = jobRquestType
jobRequest.validate()
MessageChannel msgChannel = BatchReportConfig.getMessageChannel()
msgChannel.send(new GenericMessage<String>(jobRequest.toJson()))
return jobRequest
}
}
[/xml]

Summing Up

The primary goal of this blog post is to explain how ‘chunk oriented processing’ is integrated with the ‘Spring Integration’ module. Please feel free to drop in your comments and questions (if any) in the comments section. 

An Open Source Solutions Partnership with Evoke

We at Evoke Technologies bring more than a decade’s experience as an IT leader to the design and implementation of open source solutions for commercial enterprises. Our dedicated open source experts will understand your company’s most pressing challenges and guide you in developing an OSS plan to meet them. Whether e-commerce, CRM, content management or quality assurance, Evoke has open source expertise to benefit your business.
Contact Evoke Technologies at (937) 202-4161, and learn how we, as your open source solution provider, can start making your company’s software development and operations budget go farther today!

Author

Sathish Jannarapu is working as a Senior Technical Associate at Evoke Technologies. He is currently focused on Java/J2EE based technologies including Spring, Hibernate and Web Services. In his spare time Sathish likes listening to music and reading articles on emerging technologies.
Please follow and share

Leave a comment