Saturday, December 27, 2014

Using Spring Batch to Read, Process and Write Entities in Mongo

Almost all of the enterprise applications require bulk processing to perform business operations in mission critical environments. These business operations include complex processing of large volumes of data at regular intervals without requiring any user interactions. Some Examples are Month-End Calculations, Quaterly Rollups, Data Migration and various other operations. 

Introduction
Spring Batch is an open source framework for batch processing – execution of a series of steps. Spring Batch helps developers to concentrate on business logic and framework takes care of the entire batch infrastructure.  
This example demonstrates use of Spring Batch with MongoItemReader and MongoItemWriter to read data from MongoDB, Process it and write back to Mongo DB.


Photo credit: Spring Source

Setup
Below is the configuration required for enabling spring batch:
  1. Add the following spring-batch dependencies in pom.xml:
    <dependency>
        <groupId>org.springframework.batch</groupId>
        <artifactId>spring-batch-core</artifactId>
        <version>3.0.2.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.batch</groupId>
        <artifactId>spring-batch-infrastructure</artifactId>
        <version>3.0.2.RELEASE</version>
    </dependency>
  2. Create spring-batch job configuration xml named as batchjob.xml:
    <?xml  version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:batch="http://www.springframework.org/schema/batch" <!-- Batch Namespace -->
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:mongo="http://www.springframework.org/schema/data/mongo"
        xmlns:util="http://www.springframework.org/schema/util"
        xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.2.xsd  
                            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
                            http://www.springframework.org/schema/data/mongo http://www.springframework.org/schema/data/mongo/spring-mongo.xsd
                            http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.2.xsd">
     
        <bean id="reportProcessor" class="com.javabydefault.batch.processor.ReportProcessor" />
     
        <batch:job id="ReportJob">
            <batch:step id="STEP1-ReportJob">
                <batch:tasklet allow-start-if-complete="true"> <!-- allow-start-if-complete let to start completed job again -->
                    <batch:chunk reader="reportItemReader" processor="reportProcessor" writer="reportWriter" commit-interval="5" /> <!-- commit-interval commits after writing 5 records -->
                </batch:tasklet>
            </batch:step>
        </batch:job>
      
        <!--  read from mongodb -->
        <bean id="reportItemReader" class="org.springframework.batch.item.data.MongoItemReader">
            <property name="template" ref="mongoTemplate"/>
            <property name="query" value="{_id : { $in : ['546dfb8ce4b0b13cf57ab501','546dfb8ce4b0b13cf57ab502','546dfb8ce4b0b13cf57ab503','546dfb8ce4b0b13cf57ab504','546dfb8ce4b0b13cf57ab505']}}"/>  
            <property name="sort" >
                <map>
                    <entry key="_id" value="#{T(org.springframework.data.domain.Sort.Direction).ASC}" />
                </map>
            </property>
            <property name="collection" value="order" />
            <property name="targetType" value="com.javabydefault.domain.entity.Order"/>
        </bean>
    
        <!-- write it to MongoDB, 'report' collection (table) -->
        <bean id="reportWriter" class="org.springframework.batch.item.data.MongoItemWriter">
            <property name="template" ref="mongoTemplate" />
            <property name="collection" value="report" />
        </bean>
        
        <bean id="mongoTemplate" class="org.springframework.data.mongodb.core.MongoTemplate">
            <constructor-arg name="mongoDbFactory" ref="mongoDbFactory" />
        </bean>
     
        <bean id="jobLauncher"
            class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
            <property name="jobRepository" ref="jobRepository" />
        </bean>
        
        <bean id="jobRepository" class="org.springframework.batch.core.repository.support.SimpleJobRepository">
            <constructor-arg>
                <bean class="com.github.nmorel.spring.batch.mongodb.repository.dao.MongoDbJobInstanceDao">
                    <property name="Db" value="#{mongoDbFactory.getDb()}"/> 
                    <property name="jobIncrementer" value="#{incrementerFactory.getIncrementer('BatchJobInstance')}"/>
                </bean>
            </constructor-arg>             
            <constructor-arg>   
                <bean class="com.github.nmorel.spring.batch.mongodb.repository.dao.MongoDbJobExecutionDao">
                    <property name="Db" value="#{mongoDbFactory.getDb()}"/>
                    <property name="jobExecutionIncrementer" value="#{incrementerFactory.getIncrementer('BatchJobExecution')}"/> 
                </bean>             
            </constructor-arg>             
            <constructor-arg>   
                <bean class="com.github.nmorel.spring.batch.mongodb.repository.dao.MongoDbStepExecutionDao">
                    <property name="Db" value="#{mongoDbFactory.getDb()}"/>
                    <property name="stepExecutionIncrementer" value="#{incrementerFactory.getIncrementer('BatchStepExecution')}"/> 
                </bean>             
            </constructor-arg>             
            <constructor-arg>   
                <bean class="com.github.nmorel.spring.batch.mongodb.repository.dao.MongoDbExecutionContextDao">
                    <property name="Db" value="#{mongoDbFactory.getDb()}"/> 
                </bean>             
            </constructor-arg>
        </bean>    
        <bean class="com.github.nmorel.spring.batch.mongodb.incrementer.MongoDbValueIncrementerFactory" id="incrementerFactory">
            <constructor-arg value="#{mongoDbFactory.getDb()}"/> 
        </bean>
    
        <!-- Mongo DB Database for Read/Write of data and also for Job meta-data info -->
        <bean class="java.net.URI" id="mongoUrl">
            <constructor-arg value="mongodb://username:password@localhost:10031/dbname" />
        </bean>
        
        <mongo:db-factory id="mongoDbFactory" host="#{@mongoUrl.getHost()}"
                port="#{@mongoUrl.getPort()}" dbname="#{@mongoUrl.getPath().substring(1)}"
                username="#{ @mongoUrl.getUserInfo().split(':')[0] }" password="#{ @mongoUrl.getUserInfo().split(':')[1] }" />
    </beans>
    
  3. Configure JobRepository. A persistent store for all the job meta-data information. This repository can either be created in a database or held in memory. Spring Batch comes pre-packaged with DAOs for JDBC compliant databases and also InMemory. For MongoDB, I have used open source DAOs at https://github.com/nmorel/spring-batch-extension-mongodbSimpleJobRepository implementation of JobRepository interface takes following DAOs from above library as constructor arguments:
    • JobInstanceDao
    • JobExecutionDao
    • StepExecutionDao
    • ExecutionContextDao
    These DAO’s create following Collections in MongoDB to store Job meta-data:
    • BatchJobInstance
    • BatchJobExecution
    • BatchJobParameter
    • BatchStepExecution
    • BatchExecutionContext
 Implementation
Below is the java code implemented for this example:

1. BatchLauncher.java reads the batchjob.xml and launches the job.
package com.javabydefault.process;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
 
public class BatchLauncher {
  public static void main(String[] args) {
 
    String[] springConfig  = 
        {   
            "batchjob.xml" 
        };
 
    ApplicationContext context = new ClassPathXmlApplicationContext(springConfig);
 
    JobLauncher jobLauncher = (JobLauncher) context.getBean("ReportJob");
    
    Job job = (Job) context.getBean("ReportJob");
 
    try {
        String dateParam = new Date().toString();
        JobParameters param = new JobParametersBuilder().addString("date", dateParam).toJobParameters();        
        JobExecution execution = jobLauncher.run(job, new JobParameters());
        System.out.println("Exit Status : " + execution.getExitStatus());
 
    } catch (Exception e) {
        e.printStackTrace();
    }
 
    System.out.println("Done");
 
  }
}
2. ReportItemReader : The example uses default MongoItemReader which comes packaged with spring-batch.  The configuration in batchjob.xml specify query to fire on the collection to be queried from MongoDB and converts the data into Order data type which will be used in Processor.
<bean id="reportItemReader" class="org.springframework.batch.item.data.MongoItemReader">
        <property name="template" ref="mongoTemplate"/>
        <property name="query" value="{_id : { $in : ['546dfb8ce4b0b13cf57ab501','546dfb8ce4b0b13cf57ab502','546dfb8ce4b0b13cf57ab503','546dfb8ce4b0b13cf57ab504','546dfb8ce4b0b13cf57ab505']}}"/>  <!-- query to pull the documents from db -->
        <property name="sort" > <!--sort criteria specified -->
            <map>
                <entry key="_id" value="#{T(org.springframework.data.domain.Sort.Direction).ASC}" />
            </map>
        </property>
        <property name="collection" value="order" /> <!--name of the collection -->
        <property name="targetType" value="com.javabydefault.domain.entity.Order"/> <!--Java object to which it will be mapped -->
</bean>
3. ReportProcessor java is responsible for processing the document read by the Item Reader. It accepts input from Reader and pass on output to Writer. ItemProcessor is a Spring interface which takes one object, transforms it and return transformed object.
package com.javabydefault.processor;
 
import org.springframework.batch.item.ItemProcessor;
import com.javabydefault.domain.entity.Order;
import com.javabydefault.domain.entity.Report;
 
public class ReportProcessor implements ItemProcessor<Order, Report> { //Input and Output Object types of ItemProcessor 
 
    @Override
    public Report process(Order order) throws Exception {
 
        System.out.println("Processing..." + order);
        Report report = new Report(order);
        return report;
    }
}
4. MongoItemWriter java is responsible for writing processed object to Mongo DB. It will attempt to write out the list of items passed in. The list size is the commit-interval size set in batch:chunk in the batch xml. The items are written to DB and committed together at the end.
<!-- write it to MongoDB, 'report' collection (table) -->
<bean id="reportWriter" class="org.springframework.batch.item.data.MongoItemWriter">
    <property name="template" ref="mongoTemplate" />
    <property name="collection" value="report" /> <!--name of the collection to write -->
</bean>
Summary
While open source community has focused greatly on web-based and SOA messaging-based frameworks, there has been a notable lack of focus on reusable batchframeworks. Spring batch has been greatly helpful for developer community to worry less about batch infrastructure and focus more on writing business logic. This blog attempts to demonstrate a very simple use case using spring batch. There is so much flexibility in spring batch that it can help accommodate almost all batch processing requirements. It is just a beginning and I will publish more blogs as I explore more about this beautiful framework. 

4 comments:

  1. Could you please provide complete example foe the example that you have shown

    ReplyDelete
  2. getting this error on setting sort property (though Direction enum is present in class path)

    org.springframework.expression.spel.SpelEvaluationException: EL1005E:(pos 0): Type cannot be found 'org.springframework.data.domain.Sort.Direction'
    at org.springframework.expression.spel.support.StandardTypeLocator.findType(StandardTypeLocator.java:79)
    at org.springframework.expression.spel.ExpressionState.findType(ExpressionState.java:136)
    at org.springframework.expression.spel.ast.TypeReference.getValueInternal(TypeReference.java:45)
    at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:52)
    at org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:93)
    at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:88)
    at org.springframework.context.expression.StandardBeanExpressionResolver.evaluate(StandardBeanExpressionResolver.java:139)
    ... 29 more


    ReplyDelete
    Replies
    1. Could you please provide complete example for this? thanks in advance.

      Delete
  3. Could you please guide on http://stackoverflow.com/questions/41530697/org-springframework-dao-dataaccessresourcefailureexception-unable-to-write-to-f?

    ReplyDelete