Where communities thrive


  • Join over 1.5M+ people
  • Join over 100K+ communities
  • Free without limits
  • Create your own community
People
Activity
    Elias Khattar
    @KhattarElias_twitter
    yup staging, so first file comes in I insert all the data and then when I read from the DB I read only fields lets say X,Y,Z and name the output file as X.csv, then I second file comes in I insert the new data lets say A,B,C and now I have in the DB two rows then I read from the DB those two rows extracting only desired fields (X,Y,Z and A,B,C) but the output file will be A.csv now with two rows written in it. @benas
    Mahmoud Ben Hassine
    @benas
    In that case, you can do it by calculating the file name in a preparatory step, here is a quick example:
        @Bean
        public Tasklet tasklet(DataSource dataSource) {
            return new Tasklet() {
    
                private JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
    
                @Override
                public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                    // calculate fileName
                    String fileName = jdbcTemplate.queryForObject("select ... LIMIT 1 ", String.class); // or something similar
                    // put the info in job execution context to share it with next step
                    chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext()
                            .put("fileName", fileName);
                    return RepeatStatus.FINISHED;
                }
            };
        }
    The writer of the second step can then be configured with the info from the job execution context:
        @Bean
        @StepScope
        public FlatFileItemWriter itemWriter(@Value("#{jobExecutionContext['fileName']}") String fileName) {
            FlatFileItemWriter csvFileWriter = new FlatFileItemWriter();
            csvFileWriter.setResource(new FileSystemResource(fileName + ".csv"));
            // set other props
            return csvFileWriter;
        }
    so to sum up:
    • step1 (tasklet): implements the logic to determine the output file name
    • step2 (chunk oriented): reads data from the table and writes it to the file
    Elias Khattar
    @KhattarElias_twitter
    Awesome trying it in the coming few min and will update here...thanks Mahmoud for the support as always
    I have first step1 that reads the coming data and insert in the DB
    I have step2 that writes in the csv
    Mahmoud Ben Hassine
    @benas
    yeah let's call it step 0 :smile:
    Elias Khattar
    @KhattarElias_twitter
    so I add a middle step right
    Mahmoud Ben Hassine
    @benas
    yes. That said, If you can get the output file name during the first phase (read the file and write it the db), then you can put the file name in the context during that phase
    Elias Khattar
    @KhattarElias_twitter
        @Bean
         public Job ipUserJob(@Qualifier("databaseToCsvFileStep") Step csvIpStep) throws Exception {
        return jobBuilderFactory.get("ipJob")
                .incrementer(new RunIdIncrementer())
                .start(step1())// pointing to the correct line where the reader should start reading in ipStep.
                .next(ipStep())//insert in the DB
                // passing the file name here should be I guess
                .next(csvIpStep) // write in the csv file
                //.flow(sampleStep)
                //.end()
                .build();
    }
    I think might need to pass it between the steps
    Mahmoud Ben Hassine
    @benas
    // passing the file name here should be I guess: yes, this is where you would introduce the step (with the tasklet shown above) to prepare the info for the next step csvIpStep
    Elias Khattar
    @KhattarElias_twitter
    after trying this, I got the below error.
      2019-11-14 14:39:59.871  INFO 1320 --- [ask-scheduler-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [outputFile]
     2019-11-14 14:39:59.892  INFO 1320 --- [ask-scheduler-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [databaseToCsvFileStep]
     2019-11-14 14:39:59.919 ERROR 1320 --- [ask-scheduler-1] o.s.batch.core.step.AbstractStep         : Encountered an error executing step databaseToCsvFileStep in job ipJob
    
    org.springframework.batch.item.WriterNotOpenException: Writer must be open before it can be written to
    here is the step
    @Bean
    public Job ipUserJob(@Qualifier("databaseToCsvFileStep") Step csvIpStep) throws Exception {
        return jobBuilderFactory.get("ipJob")
                .incrementer(new RunIdIncrementer())
                .start(step1())// pointing to the correct line where the reader should start reading in ipStep.
                .next(ipStep())//insert in the DB
                .next(outputFileName())// passing the file name here should be I guess
                .next(csvIpStep) // write in the csv file
                //.flow(sampleStep)
                //.end()
                .build();
    }
    rmarcoux1
    @rmarcoux1
        public Step step2() {
            return stepBuilderFactory.get("process-records").<String,String>chunk(100)
                    .reader(reader(dateFromContext)) //HERE
                    .writer(writer)
                    .listener(promotionListener())
                    .build();
        }
    In my batch job, I'm reading a date from a table in step 1 and successfully saving it to the StepExecutionContext. Is there a way to pull from that StepExecution on Step2 in order to pass it in as a parameter for an ItemReader? I've tried a tasklet as a middle step to save the date to a local variable and that seems to get wiped out before the step where it's needed. Any help would be great. Thanks!
    Mahmoud Ben Hassine
    @benas

    @KhattarElias_twitter You need to make the method that defines your item writer return the most specific type so that Spring Batch can see the ItemStream methods (open/update/close) and correctly create the proxy. So in your example, the following:

    @Bean
    public ItemWriter<IpCsvOutput> databaseCsvItemWriter(Environment environment){
        FlatFileItemWriter csvFileWriter = new FlatFileItemWriter();
        //  ...
        return csvFileWriter;
    }

    should be:

    @Bean
    public FlatFileItemWriter<IpCsvOutput> databaseCsvItemWriter(Environment environment){
        FlatFileItemWriter csvFileWriter = new FlatFileItemWriter();
        //  ...
        return csvFileWriter;
    }
    @rmarcoux1 I see you are using a promotion listener. This is the way to go. Are you sure you are promoting the right key? If yes, then the reader of the next step should be step scoped and configured with the value from the job execution context, something like:
        @Bean
        @StepScope
        public ItemReader<YourType> itemReader(@Value("#{jobExecutionContext['key']}") String key) {
            // use "key" to configure the reader
            return null; // TODO
        }
    Elias Khattar
    @KhattarElias_twitter
    @benas works like a charm, thank you very much
    Mahmoud Ben Hassine
    @benas
    @KhattarElias_twitter Glad it helped!
    rmarcoux1
    @rmarcoux1

    Hi @benas ,

        @Bean
        @StepScope
        public Step printDateFromContext(@Value("#{jobExecutionContext[test]}") String passDate) {
            return stepBuilderFactory.get("post-pass-date")
                    .tasklet((contribution, chunkContext) -> {
                        String passDateFromTable = chunkContext.getStepContext().getStepExecution()
                                .getJobExecution().getExecutionContext()
                                .getString("test");
    
                        System.out.println("Date from argument= " + passDate);  //Printing NULL
                        System.out.println("Date from context= " + passDateFromTable); //Displaying actual date
                        return RepeatStatus.FINISHED;
                    })
                    .build();
        }

    Can you please tell me if the value pulled from "test" is coming from the same place in the example above? Looks like I am setting what I expect with the correct key on the context, but unable to retrieve it using the @Value. Thanks, Ryan

    Mahmoud Ben Hassine
    @benas
    You are annotating a step definition with StepScope which is not correct, using @JobScope should fix the issue, but I don't recommend using it. I would rather extract the tasklet definition and make it step scoped, then inject it in the step, something like:
        @Bean
        @StepScope
        public Tasklet tasklet(@Value("#{jobExecutionContext[test]}") String passDate) {
            return (contribution, chunkContext) -> {
                System.out.println("Date from argument= " + passDate);
                return RepeatStatus.FINISHED;
            };
        }
    
        @Bean
        public Step printDateFromContext() {
            return stepBuilderFactory.get("post-pass-date")
                    .tasklet(tasklet(null))
                    .build();
        }
    rmarcoux1
    @rmarcoux1
    @benas - I must be missing something as your example above still didn't print out the date for me. Is there something else I can try? I still need to pass that date to another step in my flow. Thank you in advance.
    Mahmoud Ben Hassine
    @benas
    Here is a complete example:
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.JobParameters;
    import org.springframework.batch.core.Step;
    import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepScope;
    import org.springframework.batch.core.launch.JobLauncher;
    import org.springframework.batch.core.step.tasklet.Tasklet;
    import org.springframework.batch.repeat.RepeatStatus;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    @EnableBatchProcessing
    public class MyJob {
    
        @Autowired
        private JobBuilderFactory jobBuilderFactory;
    
        @Autowired
        private StepBuilderFactory stepBuilderFactory;
    
        @Bean
        public Step step1() {
            return stepBuilderFactory.get("step1")
                    .tasklet((contribution, chunkContext) -> {
                        chunkContext.getStepContext()
                                .getStepExecution().getJobExecution()
                                .getExecutionContext().put("test", "foo");
                        return RepeatStatus.FINISHED;
                    })
                    .build();
        }
    
        @Bean
        public Step step2() {
            return stepBuilderFactory.get("step2")
                    .tasklet(tasklet(null))
                    .build();
        }
    
        @Bean
        @StepScope
        public Tasklet tasklet(@Value("#{jobExecutionContext[test]}") String passDate) {
            return (contribution, chunkContext) -> {
                System.out.println("Date from argument= " + passDate);
                return RepeatStatus.FINISHED;
            };
        }
    
    
        @Bean
        public Job job() {
            return jobBuilderFactory.get("job")
                    .start(step1())
                    .next(step2())
                    .build();
        }
    
        public static void main(String[] args) throws Exception {
            ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
            JobLauncher jobLauncher = context.getBean(JobLauncher.class);
            Job job = context.getBean(Job.class);
            jobLauncher.run(job, new JobParameters());
        }
    
    }
    this prints:
    Date from argument= foo
    Mahmoud Ben Hassine
    @benas
    The same example works if you want to pass the test attribute through the step execution and promote it to the job execution using a promotion listener:
        @Bean
        public Step step1() {
            return stepBuilderFactory.get("step1")
                    .tasklet((contribution, chunkContext) -> {
                        chunkContext.getStepContext()
                                .getStepExecution()
                                .getExecutionContext().put("test", "foo");
                        return RepeatStatus.FINISHED;
                    })
                    .listener(promotionListener())
                    .build();
        }
    
        public ExecutionContextPromotionListener promotionListener() {
            ExecutionContextPromotionListener promotionListener = new ExecutionContextPromotionListener();
            promotionListener.setKeys(new String[]{"test"});
            return promotionListener;
        }
    @rmarcoux1 does this help?
    rmarcoux1
    @rmarcoux1
    @benas - There must be something wiping out that jobExecutionContext as the step is running because nothing is working :(. I even copied your example and still see null from that tasklet step.
    Mahmoud Ben Hassine
    @benas
    In that case, please share a minimal project that reproduces the issue and I will take a look.
    Borna Safa
    @jarmaker
    Good day all, Is it recommended to pass shared objects around using the JobExecutionContext ? Thank you so much for your anticipated help and feedback.
    rmarcoux1
    @rmarcoux1
    @benas - I'm thinking it has to do with the way I create the jobLauncher. In the example below, I have it autowired which doesn't quite match what you have. When I debug your example, I can see the AnnotationConfigApplicationContext which is not present when I debug my application. Do you have a better suggestion on how I can create and use the jobLauncher as shown below? Thanks!
        @Autowired
        private JobLauncher jobLauncher;
    
            @Autowired
        private JobCreator jobCreator;
           .......
        try {
                Job job = jobCreator.createJob();
                JobParameters parameters = new JobParametersBuilder()
                        .addString(JobExecutionType.JOB_ID.getType(), "jobId")
                        .addString(JobExecutionType.JOB_NAME.getType(), "jobName")
                        .toJobParameters();
    
                execution = jobLauncher.run(job, parameters);
            }
    Mahmoud Ben Hassine
    @benas
    @rmarcoux1 I don't think passing info between steps through the job execution context is related to the job launcher. You should have something wiping out the key from the execution context in between steps. To be able to help you in an efficient way, please share a project or a self contained example that reproduces the issue.
    Mahmoud Ben Hassine
    @benas

    @jarmaker

    Is it recommended to pass shared objects around using the JobExecutionContext ?

    That could be fine if objects are small enough (like a list of IDs or filenames) but is not recommended for large objects. The job execution context is persisted between steps, and if it contains a large amount of data, the serialization/deserialization process can be expensive.

    Borna Safa
    @jarmaker
    @benas thank you for your response. I am passing in a DataHolder Object which is used in my processor. for every item and I am grabbing it from the context using a @BeforeStep annotated lisener method in my processor. I have made all my reader, writer , processor StepScoped and The object in the context is for read in the processor and does not change state between items. With that I guess it should be fine. Thanks again @benas , I appreciate all your help and postings on this channel.
    rmarcoux1
    @rmarcoux1
    @benas - Thank you for your example above. After careful review, I realized that I missed the @Configuration annotation at the top of the class where I was trying to access the JobExecutionContext. Once I added that in, I was seeing exactly what I was expecting. Thanks again!
    Mahmoud Ben Hassine
    @benas
    @jarmaker @rmarcoux1 You are welcome. Glad to be of help!
    Naseem Ahamed
    @NaseemA18810948_twitter
    Hi, With an upgrade of springboot parent from 2.1.3-Release to springboot 2.2.1-Release, spring.batch.enabled=false is not working anymore and my job is executing on server start-up. Can someone please shed a light on this if you had the chance to resolve it?
    spring.batch.job.enabled*
    Naseem Ahamed
    @NaseemA18810948_twitter
    This flag works until 2.2.0-Release. 2.2.1-Release seems to be having this issue.
    Mahmoud Ben Hassine
    @benas
    Hi let me check
    Mahmoud Ben Hassine
    @benas
    I'm not able to reproduce the issue. With both 2.2.0 and 2.2.1 the flag spring.batch.job.enabled is working as expected.
    Naseem Ahamed
    @NaseemA18810948_twitter
    @benas Thank you for the information. I apologise for causing a raucous. Indeed it is working as expected. We had an issue related to default batch table creation during server-startup and it was confused with the table referred in the batch query.
    Mahmoud Ben Hassine
    @benas
    ok no problem. glad you solved your issue.
    venkatasreekanth
    @venkatasreekanth
    ```java
    2019-11-13 18:00:33.849 ERROR 16104 --- [           main] o.s.batch.core.job.AbstractJob           : Encountered fatal error executing job
    
    org.springframework.dao.DataAccessResourceFailureException: Could not increment identity; nested exception is com.microsoft.sqlserver.jdbc.SQLServerException: Transaction (Process ID 2253) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.
            at org.springframework.jdbc.support.incrementer.AbstractIdentityColumnMaxValueIncrementer.getNextKey(AbstractIdentityColumnMaxValueIncrementer.java:113) ~[spring-jdbc-5.0.10.RELEASE.jar!/:5.0.10.RELEASE]
    
    2019-11-13 18:00:26.339 ERROR 16043 --- [           main] o.s.batch.core.job.AbstractJob           : Encountered fatal error executing job
    
    org.springframework.dao.DataAccessResourceFailureException: Could not increment identity; nested exception is com.microsoft.sqlserver.jdbc.SQLServerException: Transaction (Process ID 1874) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.
            at org.springframework.jdbc.support.incrementer.AbstractIdentityColumnMaxValueIncrementer.getNextKey(AbstractIdentityColumnMaxValueIncrementer.java:113) ~[spring-jdbc-5.0.10.RELEASE.jar!/:5.0.10.RELEASE]
            at org.springframework.jdbc.support.incrementer.AbstractDataFieldMaxValueIncrementer.nextLongValue(AbstractDataFieldMaxValueIncrementer.java:128) ~[spring-jdbc-5.0.10.RELEASE.jar!/:5.0.10.RELEASE]
    ```
    I changed the isolation level to ISOLATION_REPEATABLE_READ, but I still see deadlocks
    These tasks are running on SCDF 1.7.2
    @venkatasreekanth
    @cppwfs the DBA says that this is where the issue is DELETE STATEMENTS:
    SPID 2253(victim): delete from BATCH_STEP_EXECUTION_SEQ where ID < 131203
    SPID 1964: delete from BATCH_STEP_EXECUTION_SEQ where ID < 131201
    SPID 1874(victim): delete from BATCH_STEP_EXECUTION_SEQ where ID < 131200
    we actually had 3 jobs deadlock on this
    We enables RCSI on sql server and now we are encountering issues with id's
    Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
    2019-12-04 20:31:01.877 ERROR 23018 --- [           main] o.s.c.t.listener.TaskLifecycleListener   : An event to end a task has been received for a task that has not yet started.
    2019-12-04 20:31:01.882 ERROR 23018 --- [           main] o.s.boot.SpringApplication               : Application run failed
    
    org.springframework.context.ApplicationContextException: Failed to start bean 'taskLifecycleListener'; nested exception is java.lang.IllegalArgumentException: Invalid TaskExecution, ID 180017 not found
            at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) ~[spring-context-5.0.10.RELEASE.jar!/:5.0.10.RELEASE]
            at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.0.10.RELEASE.jar!/:5.0.10.RELEASE]
            at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.0.10.RELEASE.jar!/:5.0.10.RELEASE]
            at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) ~[spring-context-5.0.10.RELEASE.jar!/:5.0.10.RELEASE]
            at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) ~[spring-context-5.0.10.RELEASE.jar!/:5.0.10.RELEASE]
            at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:883) ~[spring-context-5.0.10.RELEASE.jar!/:5.0.10.RELEASE]
            at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:551) ~[spring-context-5.0.10.RELEASE.jar!/:5.0.10.RELEASE]
            at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:754) [spring-boot-2.0.6.RELEASE.jar!/:2.0.6.RELEASE]
            at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:386) [spring-boot-2.0.6.RELEASE.jar!/:2.0.6.RELEASE]
            at org.springframework.boot.SpringApplication.run(SpringApplication.java:307) [spring-boot-2.0.6.RELEASE.jar!/:2.0.6.RELEASE]
            at org.springframework.boot.SpringApplication.run(SpringApplication.java:1242) [spring-boot-2.0.6.RELEASE.jar!/:2.0.6.RELEASE]
            at org.springframework.boot.SpringApplication.run(SpringApplication.java:1230) [spring-boot-2.0.6.RELEASE.jar!/:2.0.6.RELEASE]
            at com.digikey.batch.PimBatchApplication.main(PimBatchApplication.java:22) [classes!/:20191014.1-master]
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_191]
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_191]
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_191]
            at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_191]
            at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:48) [PIMBatch-20191014.1-master.jar:20191014.1-master]
            at org.springframework.boot.loader.Launcher.launch(Launcher.java:87) [PIMBatch-20191014.1-master.jar:20191014.1-master]
            at org.springframework.boot.loader.Launcher.launch(Launcher.java:50) [PIMBatch-20191014.1-master.jar:20191014.1-master]
            at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:51) [PIMBatch-20191014.1-master.jar:20191014.1-master]
    Caused by: java.lang.IllegalArgumentException: Invalid TaskExecution, ID 180017 not found
            at org.springframework.util.Assert.notNull(Assert.java:193) ~[spring-core-5.0.10.RELEASE.jar!/:5.0.10.RELEASE]
            at org.springframework.cloud.task.listener.TaskLifecycleListener.doTaskStart(TaskLifecycleListener.java:233) ~[spring-cloud-task-core-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
            at org.springframework.cloud.task.listener.TaskLifecycleListener.start(TaskLifecycleListener.java:355) ~[spring-cloud-task-core-2.0.0