EeBlog(テクニカルブログ)

第52回 シンクロナイザ編 Semaphore

今回のテーマは「Semaphore」です。

セマフォとは複数のプロセスが共有資源を利用する際に、その資源に同時にアクセスされるのを防ぐために使用される排他制御方法です。 1つのプロセスのみがアクセスできる2値セマフォと、複数のプロセスがアクセスできる計数セマフォがあります。 Semaphoreは同時に実行できるスレッドの数を制限する計数セマフォです。 acquire メソッドは実行権を取得するメソッドです。 実行可能数のスレッドがacquire メソッドを実行している場合において他のスレッドがacquire メソッドを実行するとブロックされます。 releaseメソッドは実行権を返却するメソッドです。 また、Semaphoreのコンストラクタの第2引数は公平性を設定します。 公平性が true に設定されると、acquire メソッドを実行したスレッドがその実行順で実行権を取得すること保証します。

次のサンプルコードは共有リソースを、複数のスレッドが使用するプログラムです。(J2SE5.0以上対応)

import java.util.Queue; 
import java.util.concurrent.ConcurrentLinkedQueue; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Semaphore; 
import java.util.concurrent.TimeUnit;

public class Main {
    public static void main(String[] args) throws Exception { 
        ExecutorService executorService = Executors.newCachedThreadPool(); 
        executorService.execute(new IRunnable("A")); 
        executorService.execute(new IRunnable("B")); 
        executorService.execute(new IRunnable("C")); 
        executorService.execute(new IRunnable("D")); 
        executorService.execute(new IRunnable("E")); 
        executorService.execute(new IRunnable("F")); 
        executorService.execute(new IRunnable("G"));
        executorService.execute(new IRunnable("H")); 
        executorService.execute(new IRunnable("I")); 
        executorService.shutdown(); 
    }
}

class IRunnable implements Runnable {

    private String name;

    public IRunnable(String name) { 
        this.name = name; 
    }

    public void run() { 
        ResourcePool.getInstance().useResource(name); 
    }
}

class ResourcePool {

    private static final int AVAILABLE = 3; 
    private static ResourcePool resourcePool = new ResourcePool(); 
    private Semaphore semaphore;     private Queue<Resource> queue;

    private ResourcePool() { 
        semaphore = new Semaphore(AVAILABLE, true); 
        queue = new ConcurrentLinkedQueue<Resource>(); 
        for (int i = 0; i < AVAILABLE; i++) { 
            queue.offer(new Resource(i)); 
        } 
    }

    public static ResourcePool getInstance() { 
        return resourcePool; 
    }

    public void useResource(String name) { 
        try { 
            semaphore.acquire(); 
            Resource resource = queue.poll(); 
            resource.start(name); 
            queue.offer(resource); 
            semaphore.release(); 
        } catch (InterruptedException e) { 
            e.printStackTrace(); 
        } 
    }

    private class Resource {

    private static final String MESSAGE_FORMAT 
                    = "%1$sさんの処理が完了しました。リソース%2$d"; 
    private int resourceNo;

    private Resource(int resourceNo) { 
            this.resourceNo = resourceNo; 
    }

    private void start(String name) { 
            try { 
                TimeUnit.SECONDS.sleep(5); 
            } catch (InterruptedException e) { 
                e.printStackTrace(); 
            } 
            System.out.println(String.format(MESSAGE_FORMAT, name, resourceNo)); 
        }
    }
}

ResourcePoolは3つのResourceインスタンスを保持するクラスで、ResourcePoolインスタンスはシングルトンです。
3つのResourceインスタンスはスレッドセーフなキューに格納します。
各スレッドはuseResourceメソッドを実行しますが、Semaphoreによって同時に3つのスレッドだけがResourceインスタンスを取得できるように制限しています。
Resourceインスタンスは複数のスレッドで共有しますが、同時にstartメソッドを実行できるのは1つのスレッドだけです。
ちなみに使用するキューを容量制限のあるBlockingQueueにするとSemaphoreを使用しなくても排他制御は行えます。
その場合はtakeメソッド、putメソッドがacquire メソッド、releaseメソッドの役割をします。