sungwony

[JAVA] Parallel Stream 본문

language/java

[JAVA] Parallel Stream

일상이상삼상 2021. 6. 22. 10:59

JAVA8의 parallelStream은 멀티스레드 프로그래밍을 간편하게 해준다.

개발자가 직접 스레드 혹은 스레드풀을 생성하거나 관리할 필요없이 parallelStream(), parallel()만 사용하면 알아서 ForkJoinFramework를 이용하여 작업들을 분할하고 병렬적으로 처리하게 된다.

이번에 조회가 오래 걸리는 시스템을 개선하면서 조회 서비스를 다음과 같이 처리하였다.

public class StreamUtils {
   private static final int PARTITION_SIZE = 100;

   public static <R, T> List<R> getResultParallel(List<T> parameter, Function<List<T>, List<R>> mapper){
      if (parameter == null || parameter.size() < PARTITION_SIZE){
         return mapper.apply(parameter);
      }      return Lists.partition(parameter, PARTITION_SIZE)
               .parallelStream()
               .map(mapper)
               .filter(CollectionUtils::isNotEmpty)
               .flatMap(Collection::stream)
               .collect(Collectors.toList());
   }}


조회 기준 정보를 파티셔닝해서 작성된 mapper를 통해 조회쿼리를 날리게 된다.
단순히 병렬 처리로 조회의 결과를 모아주기 때문에병렬처리에 의한 사이드 이펙트가 없을 것으로 보고 성능을 높이고자 parallel stream을 적용하여 처리하였으나 코드 리뷰를 통해 parallel stream 사용시 유의점에 대하여 분석이 필요함을 느껴 Parallel Stream에 대해 분석하였다.

ParallelStream

 

Java Parallel Stream은 프로세서의 다중 코어를 활용하기 위한 Java 8 이상의 기능이다.
일반적으로 모든 Java 코드에는 순차적으로 실행되는 하나의 처리 스트림이 있다.
parallel 스트림을 사용하면 코드를 별도의 코어에서 병렬로 실행되는 여러 스트림으로 나눌 수 있으며 최종 결과는 개별 결과의 조합이 된다. 그러나 실행 순서는 우리가 통제 할 수 없다. 따라서 실행 순서가 결과에 영향을 주지 않고 한 요소의 상태가 다른 요소에 영향을 주지 않고 소스 데이터도 영향을 받지않는 경우는 병렬 스트림을 사용하는 것이 좋다.

 

이런 parallelStream을 사용하는데는 알아야 할 것이 하나 있다. parallelStream은 parallelStream별로 스레드풀을 만드는게 아니라는 점이다. 별도의 설정이 없다면 하나의 스레드 풀을 모든 parallelStream이 공유하게 된다.
이때 기본으로 사용하는 ForkJoin Thread pool은 ForkJoinPool.commonPool 이다.

Race Condition(경쟁 상태)

 

스레드가 서로 공유하는 자원에 대한 경합이 발생할 경우 side effect가 발생할 수 있음

import java.util.stream.IntStream;

public class ParallelStreamExample {   public static void main(String[] args) {
      System.out.println("가용 CPU core 수 : " + Runtime.getRuntime().availableProcessors());
      //anonymous class에서 접근을 위한 final variable 선언
      final int [] sum1 = {0}; 
      final int [] sum2 = {0};
      // single-core process
      IntStream.range(0, 100)
               .forEach(i -> sum1[0] += i);
      System.out.println("sum : "+sum1[0]);

      // multi-core process (side-effect)
      IntStream.range(0, 100)
               .parallel()
               .forEach(i -> sum2[0] += i);
      System.out.println("parallel sum : "+sum2[0]);

      // multi-core process (no side-effect)
      int sum3 = IntStream.range(0, 100)
                          .parallel()
                          .sum();
      System.out.println("parallel sum : "+sum3);
   }}

 



ForkJoin 프레임워크


ForkJoin 프레임워크는 자바에서 병렬 처리를 지원하는 프레임워크로 새롭게 소개된 개념이 아니라 Java 7 부터 포함된 개념이다. 하나의 작업을 여러개의 작은 Task로 split 한 다음 이 Task를 fork 하여(물리적인 프로세스로 fork 하는 것이 아니라 실제로는 Thread) 병렬로 처리하고 각 Task의 수행된 결과를 병합하기 위해 join 단계를 실행하는 프레임워크이다.

 

 

ForkJoinPool#commonPool


실제로 필요할 때 lazy하게 초기화되는 정적 스레드 풀이다. 기본 thread 수는 서버의 CPU 코어 수에 종속되는데Java에서는 Runtime.getRuntime().avaliableProcessoros()로 CPU 코어 숫자를 확인할 수 있다. 

 

JDK에는 commonPool을 사용하는 CompletableFuture과 Parallel Stream라는 두가지 주요한 개념이 있다. 이 두 기능은 작은 차이가 있는데 CompletableFuture는 commonpool의 thread를 사용하지 않고 별도의 thread pool을 명시할 수 있으나 ParallelStreams에서는 그것이 불가능하다. 왜 CommonPool을 항상 사용하지는 않는 것일까? CommonPool의 사용여부를 결정하는 과정에서 주의해야 할 것은 pool을 사용하는 작업의 목적이다. 일반적으로 작업에는 계산(computation)블락(blocking) 두 가지 유형이 있다.

계산 작업의 경우 I/O 작업(데이터베이스 호출, 동기화, thread sleep 등)과 같이 blocking을 절대적으로 피하는 작업을 만들어야 한다. commonPool을 blocking 작업에 사용하는 경우 몇 가지 결과를 고려햐아 한다. 사용 가능한 CPU가 3개 이상인 경우 commonPool에 자동으로 2개의 스레드 크기가 조정되며 스레드를 차단 된 상태로 유지하여 동시에 사용하는 시스템의 다른 부분의 실행을 차단할 수 있다. 그렇기 때문에 원칙적으로 개발자는 blocking 작업을 위한 별도의 스레드 풀을 생성하여 나머지 시스템을 분리되고 예측가능하도록 유지해야 한다.

ParallelStream에 별도 지정 스레드 풀을 할당하기


위에서 기재된 내용과 같이 blocking 작업을 parallelStream을 통해 수행한다면 thread의 block으로 인한 병목이 발생할 수 있다. 이를 개선하기 위해 앞서 Role에서 사용한 ParallelStream에 새로운 스레드 풀을 할당하여 소스를 개선해보았다. 별도의 ForkJoinPool을 생성해서설정하였으며 동시에 실행할 수 있는 스레드 개수를 20개로 지정하였다.

public class StreamUtils {
   private static final int PARTITION_SIZE = 100;
   private static final int PARALLISM = 20;
   private static final ForkJoinPool customThreadPool = new ForkJoinPool(PARALLISM);

   public static <R, T> List<R> getResultParallel(List<T> parameter, Function<List<T>, List<R>> mapper){
      if (parameter == null || parameter.size() < PARTITION_SIZE){
       return mapper.apply(parameter);
      }
      List<R> results = null;
      try {
            results = pool.submit(() ->
                       Lists.partition(parameter, PARTITION_SIZE)
                            .parallelStream()
                            .map(mapper)
                            .filter(CollectionUtils::isNotEmpty)
                            .flatMap(Collection::stream)
                            .collect(Collectors.toList())).get();
      } catch (InterruptedException | ExecutionException e){
            throw new RoleException(RoleExceptionCode.FORK_JOIN_POOL_ERROR);
      }return results;
   }}

 

결론


ParallelStream은 개발자로 하여금 편리하게 병렬 연산을 수행할 수 있도록 지원해준다. 하지만 별도의 설정이 없을 경우 기본 쓰레드 풀을모든 ParallelStream에서 공유하게 되므로 이 경우 blocking 작업을 진행할 때 병목현상이 발생할 수 있어 많은 주의가 필요하다. 기본적으로 순차 스트림을 고려하거나 blocking 작업이 필요할 때는 별도의 쓰레드 풀을 생성하여 할당하는 것이 바람직 하다.

 

출처 및 참고

 

https://multifrontgarden.tistory.com/254
https://www.geeksforgeeks.org/what-is-java-parallel-streams/
https://dzone.com/articles/be-aware-of-forkjoinpoolcommonpool
https://www.popit.kr/java8-stream%EC%9D%98-parallel-%EC%B2%98%EB%A6%AC/

'language > java' 카테고리의 다른 글

[JAVA8] 메소드 참조  (0) 2018.06.28
[JAVA8] Stream  (0) 2018.06.27
[JAVA8] java.util.function 패키지  (0) 2018.06.27
[JAVA8] 람다(Lambda) 표현식  (0) 2018.06.26
[JAVA8] 자바8 목차  (0) 2018.06.26