본문 바로가기
Development/Spring Batch

4. Spring Batch 청크 프로세스 활용 Step2 - Paging 기반의 ItemReader

by 개발여행자 2022. 10. 3.

지난 시간에는 Spring Batch 청크 프로세스 활용편으로 Cusor Based / Paging Based의 차이점과 Cursor 기반인 JdbcCursorItemReader와 JpaCursorItemReader에 대해 알아봤습니다. 이번 시간에는 Paging 기반인 JdbcPagingItemReader와 JpaPagingItemReader에 대해 알아보겠습니다.

  1. JdbcPagingItemReader
  • Paging 기반의 JDBC 구현체로서 쿼리에 시작 행 번호(offset)와 페이지에서 반환 할 행 수(limit)를 지정해서 SQL을 실행
  • 스프링 배치에서 offset과 limit을 PageSize에 맞게 자동으로 생성해 주며 페이징 단위로 데이터를 조회할 때 마다 새로운 쿼리가 실행
  • 페이지마다 새로운 쿼리를 실행하기 때문에 페이징시 결과 데이터의 순서가 보장될 수 있도록 order by 구문 작성이 필수
  • 멀티 스레드 환경에서 Thread 안전성을 보장하기 때문에 별도의 동기화를 필요할 필요가 없다
  • JdbcPagingItemReaderBuilder API를 통한 JdbcPagingItemReader 구현체 생성

  • JdbcPagingItemReader 소스 구현해보기
@Configuration
@RequiredArgsConstructor
public class JdbcPagingConfiguration {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private int chunkSize = 2;
    private final DataSource dataSource;

    @Bean
    public Job job() throws Exception {
        return jobBuilderFactory.get("JpaBatchJob")
                .start(step1())
                .build();
    }

    @Bean
    public Step step1() throws Exception {
        return stepBuilderFactory.get("step1")
                .<Customer, Customer>chunk(chunkSize)
                .reader(customItemReader())
                .writer(customItemWriter())
                .build();

    }

    @Bean
    public ItemReader<Customer> customItemReader() throws Exception {
        Map<String, Object> parameters = new HashMap<>();
        parameters.put("firstname", "A%");

        return new JdbcPagingItemReaderBuilder<Customer>()
                .name("jdbcPagingReader")
                .pageSize(chunkSize) // step의 chunk 사이즈와 동일하게 적용하자
                .dataSource(dataSource)
                .rowMapper(new BeanPropertyRowMapper<>(Customer.class))
                .queryProvider(createQueryProvider())
                .parameterValues(parameters)
                .build();
    }

    @Bean
    public PagingQueryProvider createQueryProvider() throws Exception {
        SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
        queryProvider.setDataSource(dataSource);
        queryProvider.setSelectClause("id, firstname, lastname, birthdate");
        queryProvider.setFromClause("from customer");
        queryProvider.setWhereClause("where firstname like :firstname");

        Map<String, Order> sortKeys = new HashMap<>();
        sortKeys.put("id", Order.ASCENDING);
        queryProvider.setSortKeys(sortKeys);

        return queryProvider.getObject();
    }

    @Bean
    public ItemWriter<? super Customer> customItemWriter() {
        return items -> {
            System.out.println("ItemWriter 호출됨");
            for(Customer item : items) {
                System.out.println(item.toString());
            }
        };
    }
}

======================================================================================
콘솔창 : 
ItemWriter 호출됨
Customer(id=17, firstname=Aimee, lastname=Cox, birthdate=1976-10-15 12:56:50)
Customer(id=32, firstname=Ali, lastname=Browning, birthdate=1979-03-27 17:09:37)
ItemWriter 호출됨
Customer(id=44, firstname=Astra, lastname=Winters, birthdate=1952-03-13 01:13:07)
Customer(id=62, firstname=Alexa, lastname=Walters, birthdate=1977-12-06 16:41:17)
ItemWriter 호출됨
Customer(id=76, firstname=Ariana, lastname=Cochran, birthdate=1959-12-04 01:18:36)
 

2. JpaPagingItemReader

  • Paging 기반의 JPA 구현체로서 EntityManagerFactory 객체가 필요하며 쿼리는 JPQL을 사용
  • JpaPagingItemReader 소스 구현해보기

1) Customer, Address Entity 등록 및 테이블 내 데이터 입력

@Data
@Entity
public class Customer {
    @Id @GeneratedValue
    private Long id;
    private String username;
    private int age;

    @OneToOne(mappedBy = "customer")
    private Address address;
}

@Data
@Entity
public class Address {
    @Id @GeneratedValue
    private Long id;
    private String location;

    @OneToOne
    @JoinColumn(name = "customer_id")
    private Customer customer;
}

insert into customer(id, username, age) values (1, 'user1', 31);
insert into customer(id, username, age) values (2, 'user2', 32);
insert into customer(id, username, age) values (3, 'user3', 33);
insert into customer(id, username, age) values (4, 'user4', 34);
insert into customer(id, username, age) values (5, 'user5', 35);
insert into customer(id, username, age) values (6, 'user6', 36);
insert into customer(id, username, age) values (7, 'user7', 37);
insert into customer(id, username, age) values (8, 'user8', 38);
insert into customer(id, username, age) values (9, 'user9', 39);
insert into customer(id, username, age) values (10, 'user10', 40);

insert into address(id, location, customer_id) values (1, 'location1', 1);
insert into address(id, location, customer_id) values (2, 'location2', 2);
insert into address(id, location, customer_id) values (3, 'location3', 3);
insert into address(id, location, customer_id) values (4, 'location4', 4);
insert into address(id, location, customer_id) values (5, 'location5', 5);
insert into address(id, location, customer_id) values (6, 'location6', 6);
insert into address(id, location, customer_id) values (7, 'location7', 7);
insert into address(id, location, customer_id) values (8, 'location8', 8);
insert into address(id, location, customer_id) values (9, 'location9', 9);
insert into address(id, location, customer_id) values (10, 'location10', 10);

 

2) JpaPagingConfiguration

@Configuration
@RequiredArgsConstructor
public class JpaPagingConfiguration {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final EntityManagerFactory entityManagerFactory;

    @Bean
    public Job job() {
        return jobBuilderFactory.get("JpaBatchJob")
                .start(step1())
                .build();
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<Customer, Customer>chunk(2)
                .reader(customItemReader())
                .writer(customItemWriter())
                .build();

    }

    @Bean
    public ItemReader<? extends Customer> customItemReader() {
        return new JpaPagingItemReaderBuilder<Customer>()
                .name("jpaCursorItemReader")
                .entityManagerFactory(entityManagerFactory)
                .pageSize(2)
                .queryString("select c from Customer c join fetch c.address")
                .build();
    }

    @Bean
    public ItemWriter<? super Customer> customItemWriter() {
        return items -> {
            System.out.println("itemWriter 호출됨");
            for(Customer customer : items) {
                System.out.println(customer.getAddress().getLocation());
            }
        };
    }
}