mercredi 1 juillet 2015

Spring Batch partitioning a step

I have multiple CSV files to read. I want the processing to be done one file at a time. Rather than reading all the records till it reaches commit level.

I have put together a job which uses partition but on running the job I see that there are two entries for every row. As if the job is running twice.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://ift.tt/GArMu6"
    xmlns:context="http://ift.tt/GArMu7" xmlns:p="http://ift.tt/1jdM0fE"
    xmlns:batch="http://ift.tt/1fayA4Z" xmlns:mvc="http://ift.tt/1bHqwjR"
    xmlns:xsi="http://ift.tt/ra1lAU"
    xsi:schemaLocation="http://ift.tt/GArMu6  
http://ift.tt/1cnl1uo  
http://ift.tt/GArMu7  
http://ift.tt/1ldEMZY  
http://ift.tt/1bHqwjR  
http://ift.tt/1kF4x7W  
http://ift.tt/1fayA4Z   
http://ift.tt/1snNyHz">

    <import resource="classpath:/database.xml" />



     <bean id="asyncTaskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" >
     <property name="concurrencyLimit" value="1"></property>
     </bean>  

     <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="corePoolSize" value="5" />
    </bean>

     <bean id="partitioner" class="org.springframework.batch.core.partition.support.MultiResourcePartitioner" scope="step">
        <property name="resources" value="file:#{jobParameters[filePath]}/*.dat" />
    </bean>

    <bean id="multiResourceReader"
        class="org.springframework.batch.item.file.MultiResourceItemReader"
        scope="step">
        <property name="resources" value="file:#{jobParameters[filePath]}/*.dat"></property>
        <property name="delegate" ref="logItFileReader"></property>
    </bean>



    <batch:job id="remediationJob">
        <batch:step id="partitionedStep" >
            <batch:partition step="readWriteContactsPartitionedStep" partitioner="partitioner">
                <batch:handler task-executor="asyncTaskExecutor" />
            </batch:partition>
        </batch:step>
    </batch:job>

    <batch:step id="readWriteContactsPartitionedStep">
        <batch:tasklet>
            <batch:transaction-attributes isolation="READ_UNCOMMITTED"/>
            <batch:chunk reader="multiResourceReader" writer="rawItemDatabaseWriter" commit-interval="10" skip-policy="pdwUploadSkipPolicy"/>
        <batch:listeners>
                    <batch:listener ref="customItemReaderListener"></batch:listener>
                    <batch:listener ref="csvLineSkipListener"></batch:listener>
                    <batch:listener ref="getCurrentResourceChunkListener"></batch:listener>

                </batch:listeners>
        </batch:tasklet>    
    </batch:step>


    <bean id="logItFileReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
        <!-- Read a csv file -->

        <property name="strict" value="false"></property>
        <property name="lineMapper">
            <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
                <!-- split it -->
                <property name="lineTokenizer">
                    <bean
                        class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                        <property name="delimiter" value="@##@" />
                        <property name="strict" value="true" />
                    </bean>
                </property>
                <property name="fieldSetMapper">
                    <!-- map to an object -->
                    <bean class="org.kp.oppr.remediation.batch.vo.CSVDataVOFieldMapper">

                    </bean>
                </property>
            </bean>
        </property>
    </bean>

    <bean id="rawItemDatabaseWriter" class="org.kp.oppr.remediation.batch.csv.RawItemDatabaseWriter"
        scope="step">
    </bean>

    <bean id="pdwUploadSkipPolicy"
        class="org.springframework.batch.core.step.skip.AlwaysSkipItemSkipPolicy" />

    <bean id="csvDataVO" class="org.kp.oppr.remediation.batch.vo.CSVDataVO"
        scope="prototype"></bean>


    <!-- BATCH LISTENERS -->

    <bean id="pdwFileMoverListener"
        class="org.kp.oppr.remediation.batch.listener.PdwFileMoverListener"
        scope="step">
    </bean>

    <bean id="csvLineSkipListener"
        class="org.kp.oppr.remediation.batch.listener.CSVLineSkipListener"
        scope="step">
    </bean>

    <bean id="customItemReaderListener"
        class="org.kp.oppr.remediation.batch.listener.CustomItemReaderListener"></bean>

     <bean id="getCurrentResourceChunkListener" 
          class="org.kp.oppr.remediation.batch.listener.GetCurrentResourceChunkListener">
        <property name="proxy" ref ="multiResourceReader" />
    </bean>
    <!-- 
    <bean id="stepListener" class="org.kp.oppr.remediation.batch.listener.ExampleStepExecutionListener">
        <property name="resources" ref="multiResourceReader"/>
    </bean>
     -->
    <!-- Skip Policies -->

</beans>  

Is there something I am missing here ?

Aucun commentaire:

Enregistrer un commentaire