第49回 マルチスレッド編 CompletionServiceインターフェース
今回のテーマは「CompletionService」です。
今回は複数の非同期処理の実行を考えてみます。 複数の非同期処理を実行した場合、その処理が完了する順番はわかりません。 Futureのgetメソッドは非同期処理の完了を待機するので、複数の非同期処理を実行した場合、遅い処理の完了を待機する可能性があります。 CompletionServiceインターフェースを実装するExecutorCompletionServiceは処理結果をキューに格納することで、完了した順に取得することができます。 このキューにはBlockingQueueが使用されます。
次のサンプルコードはファイルを読み込むタスクを実行し、読み込んだ結果を出力するプログラムです。(J2SE5.0以上対応)
import java.io.BufferedReader;
import java.io.File; import java.io.FileReader;
import java.util.LinkedList; import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Main {
public static void main(String[] args) throws Exception {
File[] files = { new File("input1.csv"), new File("input2.csv") };
ExecutorService executorService = Executors.newCachedThreadPool();
CompletionService<List<String>> completionService
= new ExecutorCompletionService<List<String>>(executorService);
for (File file : files) {
completionService.submit(new ICallable(file));
}
executorService.shutdown();
for (int i = 0; i < files.length; i++) {
Future<List<String>> future = completionService.take();
List<String> list = future.get();
for (String string : list) {
System.out.println(string);
}
}
}
}
class ICallable implements Callable<List<String>> {
private File file;
public ICallable(File file) {
this.file = file;
}
public List<String> call() throws Exception {
List<String> list = new LinkedList<String>();
BufferedReader bufferedReader = null;
try {
bufferedReader = new BufferedReader(new FileReader(file));
while (bufferedReader.ready()) {
list.add(bufferedReader.readLine());
}
} finally {
if (bufferedReader != null) {
bufferedReader.close();
}
}
return list;
}
}
今回は2つのファイルを読み込みます。 takeメソッドは完了済みタスクを表すFutureを取得し、そのFutureをキューから削除します。 どちらのファイルの読み込みが先に完了するかはわかりませんが、完了した順にキューに格納されるので、完了した順に読み込んだ結果を出力できます。

