侧边栏壁纸
博主头像
敢敢雷博主等级

永言配命,自求多福

  • 累计撰写 57 篇文章
  • 累计创建 0 个标签
  • 累计收到 2 条评论

目 录CONTENT

文章目录

控制并发线程数的Semaphore

敢敢雷
2020-02-05 / 0 评论 / 0 点赞 / 1,098 阅读 / 1,115 字
温馨提示:
部分素材来自网络,若不小心影响到您的利益,请联系我删除。

Semaphore是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
先演示一个简单小Demo,模拟停车位的场景

Demo

public class SemaphoreDemo {
    public static void main(String[] args) {
        // 模拟五个停车位
        Semaphore semaphore = new Semaphore(5);
        // 模拟十辆车
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                try {
                    // 模拟停车时间
                    long time = (long) (Math.random()*1000);
                    // 获得停车位
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + "进入停车,停车时间为" + time);
                    Thread.sleep(time);
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+"离开车位");
                // 离开停车位
                semaphore.release();
            },"T"+i).start();
        }
    }
}

运行结果:
image.png

通过结果可以看出,10个线程(十辆车),一直只有5个线程在执行(五个停车位),这个例子里说的车就是线程,停车的车表示线程在执行,离开车位就表示线程执行完成,停车位满了就表示线程被阻塞,不能执行。

Semaphore介绍

信号量Semaphore是一个控制访问多个共享资源的计数器,和CountDownLatch一样,其本质上是一个“共享锁”

一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。

Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。

Semaphore原理解析

image.png
上图可以看出Semaphore内部包含公平锁(FairSync)和非公平锁(NonfairSync),继承内部类Sync,其中Sync继承AQS

Semaphore构造方法

Semaphore提供了两个构造函数:

  • Semaphore(int permits) :创建具有给定的许可数和非公平的公平设置的 Semaphore。
  • Semaphore(int permits, boolean fair) :创建具有给定的许可数和给定的公平设置的 Semaphore。

代码如下

public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

Semaphore默认选择非公平锁。

当信号量Semaphore = 1 时,它可以当作互斥锁使用。其中0、1就相当于它的状态,当=1时表示其他线程可以获取,当=0时,排他,即其他线程必须要等待。

Semaphore.acquire()方法,信号获取

Semaphore提供了acquire()方法来获取一个许可。

 public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

调用AQS的acquireSharedInterruptibly(int arg),该方法以共享模式获取同步状态:

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

acquireSharedInterruptibly(int arg)中,tryAcquireShared(int arg)由子类来实现,对于Semaphore而言,如果我们选择非公平模式,则调用NonfairSync的tryAcquireShared(int arg)方法,否则调用FairSync的tryAcquireShared(int arg)方法。

公平

protected int tryAcquireShared(int acquires) {
        for (;;) {
            //判断该线程是否位于CLH队列的列头
            if (hasQueuedPredecessors())
                return -1;
            //获取当前的信号量许可
            int available = getState();

            //设置“获得acquires个信号量许可之后,剩余的信号量许可数”
            int remaining = available - acquires;

            //CAS设置信号量
            if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                return remaining;
        }
    }

非公平

对于非公平而言,因为它不需要判断当前线程是否位于CLH同步队列列头,所以相对而言会简单些。

protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }

        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

Semaphore.release()方法,释放信号量

获取了许可,当用完之后就需要释放,Semaphore提供release()来释放许可。

public void release() {
        sync.releaseShared(1);
    }

内部调用AQS的releaseShared(int arg)方法:

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

调用Semaphore内部类Sync的tryReleaseShared(int arg)方法

protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            //信号量的许可数 = 当前信号许可数 + 待释放的信号许可数
            int next = current + releases;
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            //设置可获取的信号许可数为next
            if (compareAndSetState(current, next))
                return true;
        }
    }
0

评论区