java 8에 추가된 parallelStream은 멀티쓰레드를 매우 쉽게 사용할 수 있다.

직접 쓰레드를 혹은 쓰레드풀을 생성하거나 관리할 필요없이 parallelStream(), paallel()만 사용하면 알아서 ForkJoinFramework를 이용하여 작업들을 분할하고, 병렬적으로 처리하게 된다.

// parallel
        val start1 = System.currentTimeMillis()
        val sum = LongStream.range(0, 1_000_000_000)
            .parallel()
            .sum()

        val end1 = System.currentTimeMillis()
        println("parallel Time: ${end1 - start1}, $sum")

        // single
        val start2 = System.currentTimeMillis()
        val sum2 = LongStream.range(0, 1_000_000_000)
            .sum()
        val end2 = System.currentTimeMillis()

        println("single Time: ${end2 - start2}, $sum2")

			// parallel Time: 153, 499999999500000000
			// single Time: 538, 499999999500000000
}

Thread Pool

parallelStream은 내부적으로 쓰레드 풀을 만들어서 작업을 병렬화 시킨다. 여기서 중요한 점은 paralleStream 별로 스레드 풀을 만드는게 아니라는 점이다. 별도의 설정이 없다면 하나의 쓰레드 풀을 모든 parallelStream이 공유하게 된다.

parallelStream을 이용해 멀티 쓰레드를 이용하고, 코드 내부에서 DB호출과 http 호출이 있는 경우 API 응답이 느려지는 경우가 발생할 수 있다. 이렇게 되면 추적하기도 힘들기 떄문에 주의가 필요하다.

위에 언급한듯 ParallelStream은 쓰레드풀을을 공유하기 때문에 blocking io가 발생하는 작업을 하게 되면 쓰레드 풀 내부의 쓰레드를이 block 상태가 된다. 이렇게 쌓인 쓰레드풀의 병목 현상이 발생할 수 있다.

위 문제를 개션하기 위해선 스레드 풀을 default로 사용하지 않고 parallelStream 마다 각각 커스텀하게 지정해주면 된다.

fun main() {
	val start = System.currentTimeMillis()
  
  val sum = ForkJoinPool(100).submit<Long> {
      LongStream.range(0, 1_000_000_000).parallel().sum()
  }.get()
  
  val end = System.currentTimeMillis()

  println("time: ${end - start}, sum: $sum")
}

References:

parallelStream 남용으로인한 장애경험기