본문 바로가기
Development/Spring Batch

3. Spring Batch 청크 프로세스 활용 Step1 - Cursor 기반의 ItemReader

by 개발여행자 2022. 10. 3.

지난 시간에 스프링 배치 청크 프로세스에 대한 전체적인 이해, ChunkOrientedTasklet 객체에 대한 설명, 그리고 ChunkProvider, ChunkProcessor에 대해 알아보는 시간을 갖었다. 이번 시간에는 ChunkProvider가 요청하는 ItemReader에 대해서 알아보자.

우선, ItemReader는 아래와 같이 5개의 구현체가 있습니다.

1) Flat Files - FlatFileItemReader

2) XML - StaxEventItemReader

3) Json - JsonItemReader

4) DB - JDBC ItemReader, JPA ItemReader

인프런 강의에는 전부 설명이 나와있지만, 현재 회사의 경우에는 DB - JDBC ItemReader, JPA ItemReader 구현체만 사용하고 있기 때문에 해당 강의를 듣고 들은 내용을 정리해볼 예정이다. DB - JDBC ItemReader, JPA ItemReader 구현체에 대해 알아보기 전에 Cursor Based, Paging Based에 대해서 먼저 정리해보자.

1. Cursor Based & Paging Based

  • 기본개념

- 배치 어플리케이션은 실시간적 처리가 어려운 대용량 데이터를 다루며, 이 때 DB I/O의 성능 문제와 메모리 자원의 효율성 문제를 해결할 수 있어야 한다.

- 스프링 배치에서는 대용량 데이터 처리를 위한 두 가지 해결방안을 제시하고 있다.

  • Cursor Based 처리

- JDBC ResultSet의 기본 메커니즘을 사용

- 현재 행에 커서를 유지하며 다음 데이터를 호출하면 다음 행으로 커서를 이동하며 데이터 반환이 이루어지는 Streaming 방식의 I/O이다

- ResultSet이 open될 때 마다 next() 메소드가 호출되어 Database의 데이터가 반환되고 객체와 매핑이 이루어진다.

- DB Connection이 연결되면 배치 처리가 완료될 때 까지 데이터를 읽어오기 때문에 DB와 SocketTimeout을 충분히 큰 값으로 설정 필요

- 모든 결과를 메모리에 할당하기 때문에 메모리 사용량이 많아지는 단점이 있다

- Connection 연결 유지 시간과 메모리 공간이 충분하다면 대량의 데이터 처리에 적합할 수 있다(fetchSize 조절)

  • Paging Based 처리

- 페이징 단위로 데이터를 조회하는 방식으로 Page Size 만큼 한번에 메모리로 가지고 온 다음 한개씩 읽는다

- 한 페이지를 읽을 때마다 Connection을 맺고 끊기 때문에 대량의 데이터를 처리하더라도 SocketTimeout 예외가 거의 일어나지 않는다

- 시작 행 번호를 지정하고 페이지에 반환시키고자 하는 행의 수를 지정한 후 사용 - Offset, Limit

- 페이징 단위의 결과만 메모리에 할당하기 때문에 메모리 사용량이 적어지는 장점이 있다

- Connection 연결 유지 시간이 길지 않고 메모리 공간을 효율적으로 사용해야 하는 데이터 처리에 적합

2. JdbcCursorItemReader

  • Cursor 기반의 JDBC 구현체로서 ResultSet과 함께 사용되면 Datasource에서 connection을 얻어와서 SQL을 실행
  • Thread 안정성을 보장하지 않기 때문에 멀티 스레드 환경에서 사용할 경우 동시성 이슈 발생-> 별도 동기화 처리 필요함
  • JdbcCursorItemReader 소스로 구현해보기

1) Customer 테이블 생성 및 Customer 테이블 내 데이터 입력, Customer 클래스 생성

CREATE TABLE `customer` (
   `id` mediumint(8) unsigned NOT NULL auto_increment,
   `firstname` varchar(255) default NULL,
   `lastname` varchar(255) default NULL,
   `birthdate` varchar(255),
   PRIMARY KEY (`id`)
) AUTO_INCREMENT=1;

INSERT INTO `customer` (`id`,`firstname`,`lastname`,`birthdate`) VALUES (1,"Reed","Edwards","1952-08-16 12:34:53"),(2,"Hoyt","Park","1981-02-18 08:07:58"),(3,"Leila","Petty","1972-06-11 08:43:55"),(4,"Denton","Strong","1989-03-11 18:38:31"),(5,"Zoe","Romero","1990-10-02 13:06:31"),(6,"Rana","Compton","1957-06-09 12:51:11"),(7,"Velma","King","1988-02-02 05:52:25"),(8,"Uriah","Carter","1972-08-31 07:32:05"),(9,"Michael","Graves","1958-04-13 18:47:44"),(10,"Leigh","Stone","1967-06-23 23:41:43");
INSERT INTO `customer` (`id`,`firstname`,`lastname`,`birthdate`) VALUES (11,"Iliana","Glenn","1965-02-27 14:33:56"),(12,"Harrison","Haley","1956-06-28 03:15:41"),(13,"Leonard","Zamora","1956-03-28 15:03:09"),(14,"Hiroko","Wyatt","1960-08-22 23:53:50"),(15,"Cameron","Carlson","1969-05-12 11:10:09"),(16,"Hunter","Avery","1953-11-19 12:52:42"),(17,"Aimee","Cox","1976-10-15 12:56:50"),(18,"Yen","Delgado","1990-02-06 10:25:36"),(19,"Gemma","Peterson","1989-04-02 23:42:09"),(20,"Lani","Faulkner","1970-09-18 17:22:14");
INSERT INTO `customer` (`id`,`firstname`,`lastname`,`birthdate`) VALUES (21,"Iola","Cannon","1954-01-12 16:56:45"),(22,"Whitney","Shaffer","1951-03-19 01:27:18"),(23,"Jerome","Moran","1968-03-16 05:26:22"),(24,"Quinn","Wheeler","1979-06-19 16:24:22"),(25,"Mira","Wilder","1961-12-27 12:11:07"),(26,"Tobias","Holloway","1968-08-13 20:36:19"),(27,"Shaine","Schneider","1958-03-08 09:47:10"),(28,"Harding","Gonzales","1952-04-11 02:06:25"),(29,"Calista","Nieves","1970-02-17 13:29:59"),(30,"Duncan","Norman","1987-09-13 00:54:49");
INSERT INTO `customer` (`id`,`firstname`,`lastname`,`birthdate`) VALUES (31,"Fatima","Hamilton","1961-06-16 14:29:11"),(32,"Ali","Browning","1979-03-27 17:09:37"),(33,"Erin","Sosa","1990-08-23 10:43:58"),(34,"Carol","Harmon","1972-01-14 07:19:39"),(35,"Illiana","Fitzgerald","1970-08-19 02:33:46"),(36,"Stephen","Riley","1954-06-05 08:34:03"),(37,"Hermione","Waller","1969-09-08 01:19:07"),(38,"Desiree","Flowers","1952-06-25 13:34:45"),(39,"Karyn","Blackburn","1977-03-30 13:08:02"),(40,"Briar","Carroll","1985-03-26 01:03:34");
INSERT INTO `customer` (`id`,`firstname`,`lastname`,`birthdate`) VALUES (41,"Chaney","Green","1987-04-20 18:56:53"),(42,"Robert","Higgins","1985-09-26 11:25:10"),(43,"Lillith","House","1982-12-06 02:24:23"),(44,"Astra","Winters","1952-03-13 01:13:07"),(45,"Cherokee","Stephenson","1955-10-23 16:57:33"),(46,"Yuri","Shaw","1958-07-14 15:10:07"),(47,"Boris","Sparks","1982-01-01 10:56:34"),(48,"Wilma","Blake","1963-06-07 16:32:33"),(49,"Brynne","Morse","1964-09-21 01:05:25"),(50,"Ila","Conley","1953-11-02 05:12:57");
INSERT INTO `customer` (`id`,`firstname`,`lastname`,`birthdate`) VALUES (51,"Sharon","Watts","1964-01-09 16:32:37"),(52,"Kareem","Vaughan","1952-04-18 15:37:10"),(53,"Eden","Barnes","1954-07-04 01:26:44"),(54,"Kenyon","Fulton","1975-08-23 22:17:52"),(55,"Mona","Ball","1972-02-11 04:15:45"),(56,"Moses","Cortez","1979-04-24 15:26:46"),(57,"Macy","Banks","1956-12-31 00:41:15"),(58,"Brenna","Mendez","1972-10-02 07:58:27"),(59,"Emerald","Ewing","1985-11-28 21:15:20"),(60,"Lev","Mcfarland","1951-05-20 14:30:07");
INSERT INTO `customer` (`id`,`firstname`,`lastname`,`birthdate`) VALUES (61,"Norman","Tanner","1959-07-29 15:41:45"),(62,"Alexa","Walters","1977-12-06 16:41:17"),(63,"Dara","Hyde","1989-08-04 14:06:43"),(64,"Hu","Sampson","1978-11-01 17:10:23"),(65,"Jasmine","Cardenas","1969-02-15 20:08:06"),(66,"Julian","Bentley","1954-07-11 03:27:51"),(67,"Samson","Brown","1967-10-15 07:03:59"),(68,"Gisela","Hogan","1985-01-19 03:16:20"),(69,"Jeanette","Cummings","1986-09-07 18:25:52"),(70,"Galena","Perkins","1984-01-13 02:15:31");
INSERT INTO `customer` (`id`,`firstname`,`lastname`,`birthdate`) VALUES (71,"Olga","Mays","1981-11-20 22:39:27"),(72,"Ferdinand","Austin","1956-08-08 09:08:02"),(73,"Zenia","Anthony","1964-08-21 05:45:16"),(74,"Hop","Hampton","1982-07-22 14:11:00"),(75,"Shaine","Vang","1970-08-13 15:58:28"),(76,"Ariana","Cochran","1959-12-04 01:18:36"),(77,"India","Paul","1963-10-10 05:24:03"),(78,"Karina","Doyle","1979-12-01 00:05:21"),(79,"Delilah","Johnston","1989-03-04 23:50:01"),(80,"Hilel","Hood","1959-08-22 06:40:48");
INSERT INTO `customer` (`id`,`firstname`,`lastname`,`birthdate`) VALUES (81,"Kennedy","Hoffman","1963-10-14 20:18:35"),(82,"Kameko","Bell","1976-06-08 15:35:54"),(83,"Lunea","Gutierrez","1964-06-07 16:21:24"),(84,"William","Burris","1980-05-01 17:58:23"),(85,"Kiara","Walls","1955-12-27 18:57:15"),(86,"Latifah","Alexander","1980-06-19 10:39:50"),(87,"Keaton","Ward","1964-10-12 16:03:18"),(88,"Jasper","Clements","1970-03-05 00:29:49"),(89,"Claire","Brown","1972-02-11 00:43:58"),(90,"Noble","Morgan","1955-09-05 05:35:01");

@Data
public class Customer {
    private Long id;
    private String firstname;
    private String lastname;
    private String birthdate;
}

 

2) JdbcCursorConfiguration을 통해서 JdbcCursorItemReader 구현해보기

@Configuration
@RequiredArgsConstructor
public class JdbcCursorConfiguration {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private int chunkSize = 10;
    private final DataSource dataSource;

    @Bean
    public Job job() {
        return jobBuilderFactory.get("batchJob")
                .start(step1())
                .build();
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<Customer, Customer>chunk(chunkSize)
                .reader(customItemReader())
                .writer(customItemWriter())
                .build();

    }

    @Bean
    public ItemReader<Customer> customItemReader() {
        return new JdbcCursorItemReaderBuilder<Customer>()
                .name("jdbcCursorItemReader")
                .fetchSize(chunkSize)
                .sql("select id, firstname, lastname, birthdate from customer where firstname like ? order by lastname, firstname")
                .beanRowMapper(Customer.class)
                .queryArguments("A%")
                .dataSource(dataSource)
                .build();
    }

    @Bean
    public ItemWriter<? super Customer> customItemWriter() {
        return items -> {
            for(Customer item : items) {
                System.out.println(item.toString());
            }
        };
    }
}

3. JpaCursorItemReader

  • Spring Batch 4.3 버전부터 지원함
  • Cursor 기반의 JPA 구현체로서 EntityManagerFactory 객체가 필요하며 쿼리는 JPQL을 사용한다
  • JpaCursorItemReader 소스로 구현해보기

1) Build.gradle 내 JPA관련 의존성 추가하기

dependencies {
	implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
}

 

2) Customer 클래스를 Entity로 등록하기

@Data
@Entity
public class Customer {
    @Id @GeneratedValue
    private Long id;
    private String firstname;
    private String lastname;
    private String birthdate;
}

 

 

3) JpaCursorConfiguration을 통해서 JpaCursorItemReader 구현해보기

@Configuration
@RequiredArgsConstructor
public class JpaCursorConfiguration {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final EntityManagerFactory entityManagerFactory;

    @Bean
    public Job job() {
        return jobBuilderFactory.get("JpaBatchJob")
                .start(step1())
                .build();
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<Customer, Customer>chunk(10)
                .reader(customItemReader())
                .writer(customItemWriter())
                .build();

    }

    @Bean
    public ItemReader<Customer> customItemReader() {
        Map<String, Object> parameters = new HashMap<>();
        parameters.put("firstname", "A%");

        return new JpaCursorItemReaderBuilder<Customer>()
                .name("jpaCursorItemReader")
                .entityManagerFactory(entityManagerFactory)
                .queryString("select c from Customer c where firstname like :firstname")
                .parameterValues(parameters)
                .build();
    }

    @Bean
    public ItemWriter<? super Customer> customItemWriter() {
        return items -> {
            for(Customer item : items) {
                System.out.println(item.toString());
            }
        };
    }
}

 

 

4) 콘솔창

Customer(id=17, firstname=Aimee, lastname=Cox, birthdate=1976-10-15 12:56:50)
Customer(id=32, firstname=Ali, lastname=Browning, birthdate=1979-03-27 17:09:37)
Customer(id=44, firstname=Astra, lastname=Winters, birthdate=1952-03-13 01:13:07)
Customer(id=62, firstname=Alexa, lastname=Walters, birthdate=1977-12-06 16:41:17)
Customer(id=76, firstname=Ariana, lastname=Cochran, birthdate=1959-12-04 01:18:36)