重入锁---ReentrantLock

in java并发 with 0 comment

重入锁ReentrantLock,顾名思义就是支持重新进入的锁,它表示该锁能够支持一个线程对资源对重复加锁。
ReentrantLock是一种递归无阻塞的同步机制。它可以等同于synchronized的使用,但是ReentrantLock提供了比synchronized更强大、灵活的锁机制,可以减少死锁发生的概率。

内部实现

ReentrantLock内部有个抽象类Sync,它继承自AQS
image.png

ReentrantLock里面大部分的功能都是委托给Sync来实现的,同时Sync内部定义了lock()抽象方法由其子类去实现,其子类有NonfairSync和FairSync分别为非公平锁和公平锁。

公平与非公平锁

平锁与非公平锁的区别在于公平锁的锁获取是有顺序的。但是公平锁的效率往往没有非公平锁的效率高,在许多线程访问的情况下,公平锁表现出较低的吞吐量。默认创建的是非公平锁。

获取锁

获得锁的时候,我们都是调用ReentrantLock的Lock方法

Lock lock = new ReentrantLock();
lock.lock();

进入lock方法

public void lock() {
        sync.lock();
    }

调用内部继承AQS类的lock方法。

非公平获得锁

  final void lock() {
        //尝试获取锁
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            //获取失败,调用AQS的acquire(int arg)方法
            acquire(1);
    }

第一次会先尝试获取锁,如获取失败,则调用AQS的acquire方法
进入acquire方法

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

NonfairSync自己实现了tryAcquire方法

protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }

    final boolean nonfairTryAcquire(int acquires) {
        //当前线程
        final Thread current = Thread.currentThread();
        //获取同步状态
        int c = getState();
        //state == 0,表示没有该锁处于空闲状态
        if (c == 0) {
            //获取锁成功,设置为当前线程所有
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        //判断锁持有的线程是否为当前线程,即线程重入
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

又一次在尝试获得锁,可见逻辑之强。判断同步状态state == 0 ?,如果是表示该锁还没有被线程持有,直接通过CAS获取同步状态,如果成功返回true。如果state != 0,则判断当前线程是否为获取锁的线程,如果是则获取锁,成功返回true。成功获取锁的线程再次获取锁,这是增加了同步状态state。

非公平释放锁

释放锁我们使用ReentrantLock的unLock方法。

public void unlock() {
        sync.release(1);
    }

调用的AQS中的release方法

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

NonfairSync自己实现了tryAcquire方法

protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

protected final boolean tryRelease(int releases) {
        //减掉releases
        int c = getState() - releases;
        //如果释放的不是持有锁的线程,抛出异常
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        //state == 0 表示已经释放完全了,其他线程可以获取同步状态了
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

只有当state==0 才彻底释放锁(因为重入的关系)。

公平锁和非公平锁的区别

在代码层面上讲,公平锁在获取同步状态时多了一个限制条件:hasQueuedPredecessors()方法

protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

public final boolean hasQueuedPredecessors() {
        Node t = tail;  //尾节点
        Node h = head;  //头节点
        Node s;

        //头节点 != 尾节点
        //同步队列第一个节点不为null
        //当前线程是同步队列第一个节点
        return h != t &&
                ((s = h.next) == null || s.thread != Thread.currentThread());
    }

该方法主要做一件事情:主要是判断当前线程是否位于CLH同步队列中的第一个。如果是则返回true,否则返回false。
公平锁与非公平锁的区别在于获取锁的时候是否按照FIFO的顺序来

ReentrantLock和Synchronized的区别

  1. 与synchronized相比,ReentrantLock提供了更多,更加全面的功能,具备更强的扩展性。例如:时间锁等候,可中断锁等候,锁投票。
  2. ReentrantLock还提供了条件Condition,对线程的等待、唤醒操作更加详细和灵活,所以在多个条件变量和高度竞争锁的地方,ReentrantLock更加适合(以后会阐述Condition)。
  3. ReentrantLock提供了可轮询的锁请求。它会尝试着去获取锁,如果成功则继续,否则可以等到下次运行时处理,而synchronized则一旦进入锁请求要么成功要么阻塞,所以相比synchronized而言,ReentrantLock会不容易产生死锁些。
  4. ReentrantLock支持更加灵活的同步代码块,但是使用synchronized时,只能在同一个synchronized块结构中获取和释放。注:ReentrantLock的锁释放一定要在finally中处理,否则可能会产生严重的后果。
  5. ReentrantLock支持中断处理,且性能较synchronized会好些。

Condition接口

任意一个Java对象,都拥有一组监视器方法,主要包括wait()、wait(long timeout)、notify()以及notifyAll()方法。这些方法与synchronized同步关键字配合可以实现等待/通知模式。Condition接口也提供了类似Object的监视器方法,与Lock配合实现等待/通知模式。

Lock提供了条件Condition,对线程的等待、唤醒操作更加详细和灵活。

Condition是一种广义上的条件队列。他为线程提供了一种更为灵活的等待/通知模式,线程在调用await方法后执行挂起操作,直到线程等待的某个条件为真时才会被唤醒。Condition必须要配合锁一起使用,因为对共享状态变量的访问发生在多线程环境下。一个Condition的实例必须与一个Lock绑定,因此Condition一般都是作为Lock的内部实现。

Lock中的Condition也是来自AQS类。先看下Condition在AQS中的定义。

public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;

    //头节点
    private transient Node firstWaiter;
    //尾节点
    private transient Node lastWaiter;
}

等待队列

可以看出Condition拥有首节点(firstWaiter),尾节点(lastWaiter)。当前线程调用await()方法,将会以当前线程构造成一个节点(Node),并将节点加入到该队列的尾部。Node定义与AQS的CLH同步队列的节点使用的都是同一个类。

Condition方法

  1. await() :造成当前线程在接到信号或被中断之前一直处于等待状态。
    await(long time, TimeUnit unit) :造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
  2. awaitNanos(long nanosTimeout) :造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。返回值表示剩余时间,如果在nanosTimesout之前唤醒,那么返回值 = nanosTimeout - 消耗时间,如果返回值 <= 0 ,则可以认定它已经超时了。
  3. awaitUninterruptibly() :造成当前线程在接到信号之前一直处于等待状态。【注意:该方法对中断不敏感】。
  4. awaitUntil(Date deadline) :造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。如果没有到指定时间就被通知,则返回true,否则表示到了指定时间,返回返回false。
  5. signal() :唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁。
  6. signal()All :唤醒所有等待线程。能够从等待方法返回的线程必须获得与Condition相关的锁。

ReentrantLock实现等待/通知模式

回顾下等待/通知模式
等待方(消费者)遵循如下原则:

  1. 获取对象锁。。
  2. 如果对象不满足,那么线程等待,被通知后仍要检查条件。
  3. 条件满足则执行相应逻辑

通知方(生产者)遵循如下原则:

  1. 获取对象锁。
  2. 改变条件
  3. 通知所有等待在线程上的对象。

下面代码实现通知方一直生产一个随机数,然后等待方获得随机数,然后输出

通知方(生产者)代码

public class Product implements Runnable {
    private List list;
    private Lock lock;
    private Condition condition;

    public Product(List list, Lock lock, Condition condition) {
        this.list = list;
        this.lock = lock;
        this.condition = condition;
    }


    @Override
    public void run() {
        try {
            while (true) {
                Random r = new Random();
                // 锁
                lock.lock();
                // 如果list长度大于0,则停止生产,即开始等待
                while (list.size() > 0) {
                    try {
                        condition.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                try {
                    // 模拟生产时间
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // add一个随机数
                list.add(r.nextInt(50));
                System.out.println(Thread.currentThread().getName() + "线程生产了" + list.get(0));
                // 通知其他线程
                condition.signalAll();
            }
        }finally {
            // 释放锁
            lock.unlock();
        }

    }

}

生产者随机生产一个数

等待方(消费者)

public class Consumer implements Runnable {
    private List list;
    private Lock lock;
    private Condition condition;
    public Consumer(List list, Lock lock,Condition condition) {
        this.list = list;
        this.lock = lock;
        this.condition=condition;
    }


    @Override
    public void run() {
        try {
            while (true) {
                // 锁
                lock.lock();
                // 如果list长度小于等于0,则等待通知方通知
                while (list.size() <= 0) {
                    try {
                        // 等待
                        condition.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                try {
                    // 模拟消费时间
                    Thread.sleep(300);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 输出
                System.err.println(Thread.currentThread().getName() + "线程消费了" + list.get(0));
                // 消费结束 remove掉
                list.remove(0);
                // 通知 通知方继续生产
                condition.signalAll();
            }
        }finally {
            // 释放锁
            lock.unlock();
        }
    }
}

消费者消费生产者生产的一个随机数

运行及运行结果

主方法

public class Main {
    public static void main(String[] args) {
        Lock lock = new ReentrantLock();
        // 线程安全的ArrayList 其内部是一个ReentrantLock
        List list = new CopyOnWriteArrayList();
        Condition condition = lock.newCondition();
        new Thread(new Product(list,lock,condition),"product").start();
        new Thread(new Consumer(list,lock,condition),"consumer").start();
    }
}

结果
image.png