第49回 マルチスレッド編 CompletionServiceインターフェース
今回のテーマは「CompletionService」です。
今回は複数の非同期処理の実行を考えてみます。 複数の非同期処理を実行した場合、その処理が完了する順番はわかりません。 Futureのgetメソッドは非同期処理の完了を待機するので、複数の非同期処理を実行した場合、遅い処理の完了を待機する可能性があります。 CompletionServiceインターフェースを実装するExecutorCompletionServiceは処理結果をキューに格納することで、完了した順に取得することができます。 このキューにはBlockingQueueが使用されます。
次のサンプルコードはファイルを読み込むタスクを実行し、読み込んだ結果を出力するプログラムです。(J2SE5.0以上対応)
import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.util.LinkedList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class Main { public static void main(String[] args) throws Exception { File[] files = { new File("input1.csv"), new File("input2.csv") }; ExecutorService executorService = Executors.newCachedThreadPool(); CompletionService<List<String>> completionService = new ExecutorCompletionService<List<String>>(executorService); for (File file : files) { completionService.submit(new ICallable(file)); } executorService.shutdown(); for (int i = 0; i < files.length; i++) { Future<List<String>> future = completionService.take(); List<String> list = future.get(); for (String string : list) { System.out.println(string); } } } } class ICallable implements Callable<List<String>> { private File file; public ICallable(File file) { this.file = file; } public List<String> call() throws Exception { List<String> list = new LinkedList<String>(); BufferedReader bufferedReader = null; try { bufferedReader = new BufferedReader(new FileReader(file)); while (bufferedReader.ready()) { list.add(bufferedReader.readLine()); } } finally { if (bufferedReader != null) { bufferedReader.close(); } } return list; } }
今回は2つのファイルを読み込みます。 takeメソッドは完了済みタスクを表すFutureを取得し、そのFutureをキューから削除します。 どちらのファイルの読み込みが先に完了するかはわかりませんが、完了した順にキューに格納されるので、完了した順に読み込んだ結果を出力できます。