좀 자세히 정리를 하고 싶지만, 코드 리뷰 중 오갔던 대화 내용 정도만 정리하기로...
This comment has been minimized.
Show comment
Hide comment
This comment has been minimized.
leewin12
3 hours ago
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
여기선 default forkJoinPool이 사용될 것 같은데, 맞을까요?
CustomForkJoinPool과 default ForkJoinPool이 섞여 있는 것 같습니다.
This comment has been minimized.
Show comment
Hide comment
This comment has been minimized.
starrybleu
2 hours ago
Author
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment has been minimized.
Show comment
Hide comment
This comment has been minimized.
starrybleu
36 minutes ago
Author
디버깅을 해보다 이상한 동작 방식과 중요한 사실을 하나 알게 됐습니다.
parallelStream
을 비동기로 처리하기 위한 Executor
는 무조건 ForkJoinPool
의 인스턴스로 생성이 되어야 지정한 수 만큼의 스레드를 생성하여 처리가 된다는 사실입니다.
CompletableFuture
의 여러 메소드는 그냥 Executor
인터페이스를 파라미터로 받고 있기 때문에 Compile 단계에서는 아무 이상 없이 통과를 하는 게 당연한데,
parallelStream 에서는 내부적으로 병렬 처리를 ForkJoinPool 로 하도록 되어 있기 때문에, CompletableFuture
에서 사용한 Executor
의 인스턴스 타입이 ForkJoinPool
이 아니라 다른 ThreadPool
일 경우(예, Executors.newCachedThreadPool()
, Executors.newFixedThreadPool()등), 그 다른 Pool 에서 1개의 스레드를 생성하여 가져다 쓰기는 쓰는 것이 확인 됐지만 지정한 Executor 를 제대로 활용하지 않고
ForkJoinPool.commonPool()` 을 가져다가 비동기 처리함을 볼 수 있었습니다.
만약 Executor 의 인스턴스 타입이 ForkJoinPool
이라면, 별도의 commonPool()
을 가져다 쓰지 않고, 기대한대로 개발자가 지정한 custom ForkJoinPool
을 사용하게 됩니다.
아마 parallelStream 은 병렬로 처리하기 위해 ForkJoinPool and Spliterator
를 모두 사용하도록 만들어졌기 때문인 것이 그 원인인 것 같습니다. (참고 : https://java-8-tips.readthedocs.io/en/stable/parallelization.html#conclusion)
starrybleu
36 minutes ago
Author
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
디버깅을 해보다 이상한 동작 방식과 중요한 사실을 하나 알게 됐습니다.
parallelStream
을 비동기로 처리하기 위한 Executor
는 무조건 ForkJoinPool
의 인스턴스로 생성이 되어야 지정한 수 만큼의 스레드를 생성하여 처리가 된다는 사실입니다.
CompletableFuture
의 여러 메소드는 그냥 Executor
인터페이스를 파라미터로 받고 있기 때문에 Compile 단계에서는 아무 이상 없이 통과를 하는 게 당연한데,
parallelStream 에서는 내부적으로 병렬 처리를 ForkJoinPool 로 하도록 되어 있기 때문에, CompletableFuture
에서 사용한 Executor
의 인스턴스 타입이 ForkJoinPool
이 아니라 다른 ThreadPool
일 경우(예, Executors.newCachedThreadPool()
, Executors.newFixedThreadPool()등), 그 다른 Pool 에서 1개의 스레드를 생성하여 가져다 쓰기는 쓰는 것이 확인 됐지만 지정한 Executor 를 제대로 활용하지 않고
ForkJoinPool.commonPool()` 을 가져다가 비동기 처리함을 볼 수 있었습니다.
만약 Executor 의 인스턴스 타입이 ForkJoinPool
이라면, 별도의 commonPool()
을 가져다 쓰지 않고, 기대한대로 개발자가 지정한 custom ForkJoinPool
을 사용하게 됩니다.
아마 parallelStream 은 병렬로 처리하기 위해 ForkJoinPool and Spliterator
를 모두 사용하도록 만들어졌기 때문인 것이 그 원인인 것 같습니다. (참고 : https://java-8-tips.readthedocs.io/en/stable/parallelization.html#conclusion)
This comment has been minimized.
Show comment
Hide comment
This comment has been minimized.
starrybleu
35 minutes ago
Author
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment has been minimized.
Show comment
Hide comment
This comment has been minimized.
leewin12
24 minutes ago
// CompletableFuture.java
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
if (f == null) throw new NullPointerException();
CompletableFuture<Void> d = new CompletableFuture<Void>();
e.execute(new AsyncRun(d, f));
return d;
}
처리 방식이 실제로 완전히 다르군요
둘이 동작방식이 전혀 다른데, CompletableFuture는 처음 설계부터 유연하게 pool 구현체 동작에 맞춰서 동작되도록 설계된 모양입니다.
// ForkJoinPool.java
public void execute(Runnable task) {
if (task == null)
throw new NullPointerException();
ForkJoinTask<?> job;
if (task instanceof ForkJoinTask<?>) // avoid re-wrap
job = (ForkJoinTask<?>) task;
else
job = new ForkJoinTask.RunnableExecuteAction(task);
externalPush(job);
}
// Executors.java
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
leewin12
24 minutes ago
•
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// CompletableFuture.java
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
if (f == null) throw new NullPointerException();
CompletableFuture<Void> d = new CompletableFuture<Void>();
e.execute(new AsyncRun(d, f));
return d;
}
처리 방식이 실제로 완전히 다르군요
둘이 동작방식이 전혀 다른데, CompletableFuture는 처음 설계부터 유연하게 pool 구현체 동작에 맞춰서 동작되도록 설계된 모양입니다.
// ForkJoinPool.java
public void execute(Runnable task) {
if (task == null)
throw new NullPointerException();
ForkJoinTask<?> job;
if (task instanceof ForkJoinTask<?>) // avoid re-wrap
job = (ForkJoinTask<?>) task;
else
job = new ForkJoinTask.RunnableExecuteAction(task);
externalPush(job);
}
// Executors.java
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
This comment has been minimized.
Show comment
Hide comment
This comment has been minimized.
starrybleu
9 minutes ago
Author
네, CompletableFuture
가 parallelStream 처리 뿐만 아니라 다른 비동기 작업이 훨씬 넓은 범위니까 그렇게 유연하게 만들어진 것 같습니다.
parallelStream
자체가 spliterator
로 작업을 분할하여 ForkJoinPool
의 worker 가 각각 갖고 있는 Queue 를 사용되도록 구현이 되어 있어서 그 밖에서 다른 종류의 Executor
를 넘겨줬다 하더라도 ForkJoinPool.commonPool()
을 가져다 쓰게 되네요.
starrybleu
9 minutes ago
Author
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
네, CompletableFuture
가 parallelStream 처리 뿐만 아니라 다른 비동기 작업이 훨씬 넓은 범위니까 그렇게 유연하게 만들어진 것 같습니다.
parallelStream
자체가 spliterator
로 작업을 분할하여 ForkJoinPool
의 worker 가 각각 갖고 있는 Queue 를 사용되도록 구현이 되어 있어서 그 밖에서 다른 종류의 Executor
를 넘겨줬다 하더라도 ForkJoinPool.commonPool()
을 가져다 쓰게 되네요.
여기선 default forkJoinPool이 사용될 것 같은데, 맞을까요?
CustomForkJoinPool과 default ForkJoinPool이 섞여 있는 것 같습니다.