Semaphore

Semaphore


Semaphore翻译成字面意思为 信号量,Semaphore可以控同时访问的线程个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。

Semaphore类位于java.util.concurrent包下,它提供了2个构造器:

//参数permits表示许可数目,即同时可以允许多少线程进行访问
public Semaphore(int permits) {}
//这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可
public Semaphore(int permits, boolean fair) {}

下面说一下Semaphore类中比较重要的几个方法,首先是acquire()、release()方法:

public void acquire() throws InterruptedException {  }     //获取一个许可
public void acquire(int permits) throws InterruptedException { }    //获取permits个许可
public void release() { }          //释放一个许可
public void release(int permits) { }    //释放permits个许可

acquire()用来获取一个许可,若无许可能够获得,则会一直等待,直到获得许可。

release()用来释放许可。注意,在释放许可之前,必须先获获得许可。

这4个方法都会被阻塞,如果想立即得到执行结果,可以使用下面几个方法:

                            //尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
public boolean tryAcquire() { };
                            //尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { };
                            //尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
public boolean tryAcquire(int permits) { };
                            //尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { };

另外还可以通过availablePermits()方法得到可用的许可数目。

代码:

you can find the code on the github, get the result as you try:

public class SemaphoreLearn {

    public static void main(String[] args) {

        CountDownLatch countDownLatch = new CountDownLatch(10);
        Semaphore semaphore = new Semaphore(3);

        for(int i = 0;i<10;i++) {
            new Thread(new SemaphoreThread(countDownLatch,semaphore)).start();
        }

        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("all worker has finish the work");

    }

}

public class SemaphoreThread implements Runnable {

    private CountDownLatch countDownLatch;
    private Semaphore semaphore;

    public SemaphoreThread(CountDownLatch countDownLatch, Semaphore semaphore) {
        this.countDownLatch = countDownLatch;
        this.semaphore = semaphore;
    }

    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object's
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * @see Thread#run()
     */
    @Override
    public void run() {
        try {
            semaphore.acquire();
            System.out.println("thread: " +Thread.currentThread().getName()+"is asking a new acquirement.");
            Thread.sleep(1000);
            System.out.println("thread: " +Thread.currentThread().getName()+"release the acquirement.");

        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            countDownLatch.countDown();
            semaphore.release();
        }

    }
}

源码分析

构造方法:

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

这里和 ReentrantLock 类似,用了公平策略和非公平策略。

看 acquire 方法:

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
public void acquireUninterruptibly() {
    sync.acquireShared(1);
}
public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}
public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
}

这几个方法也是老套路了,大家基本都懂了吧,这边多了两个可以传参的 acquire 方法,不过大家也都懂的吧,如果我们需要一次获取超过一个的资源,会用得着这个的。

我们接下来看不抛出 InterruptedException 异常的 acquireUninterruptibly() 方法吧:

public void acquireUninterruptibly() {
    sync.acquireShared(1);
}
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

前面说了,Semaphore 分公平策略和非公平策略,我们对比一下两个 tryAcquireShared 方法:

// 公平策略:
protected int tryAcquireShared(int acquires) {
    for (;;) {
        // 区别就在于是不是会先判断是否有线程在排队,然后才进行 CAS 减操作
        if (hasQueuedPredecessors())
            return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
// 非公平策略:
protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

也是老套路了,所以从源码分析角度的话,我们其实不太需要关心是不是公平策略还是非公平策略,它们的区别往往就那么一两行。

我们再回到 acquireShared 方法,

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

由于 tryAcquireShared(arg) 返回小于 0 的时候,说明 state 已经小于 0 了(没资源了),此时 acquire 不能立马拿到资源,需要进入到阻塞队列等待,虽然贴了很多代码,不在乎多这点了:

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

这个方法我就不介绍了,线程挂起后等待有资源被 release 出来。接下来,我们就要看 release 的方法了:

// 任务介绍,释放一个资源
public void release() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        // 溢出,当然,我们一般也不会用这么大的数
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

tryReleaseShared 方法总是会返回 true,然后是 doReleaseShared,这个也是我们熟悉的方法了,我就贴下代码,不分析了,这个方法用于唤醒所有的等待线程:

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

Semphore 的源码确实很简单,基本上都是分析过的老代码的组合使用了。