您的位置:  首页 > 技术杂谈 > 正文

ReentrantLock源码解析 | 京东云技术团队

2023-07-04 12:00 https://my.oschina.net/u/4090830/blog/10086268 京东云开发者 次阅读 条评论

并发指同一时间内进行了多个线程。并发问题是多个线程对同一资源进行操作时产生的问题。通过加锁可以解决并发问题,ReentrantLock是锁的一种。

1 ReentrantLock

1.1 定义

ReentrantLock是Lock接口的实现类,可以手动的对某一段进行加锁。ReentrantLock可重入锁,具有可重入性,并且支持可中断锁。其内部对锁的控制有两种实现,一种为公平锁,另一种为非公平锁.

1.2 实现原理

ReentrantLock的实现原理为volatile+CAS。想要说明volatile和CAS首先要说明JMM。

1.2.1 JMM

JMM(java 内存模型 Java Memory Model 简称JMM) 本身是一个抽象的概念,并不在内存中真实存在的,它描述的是一组规范或者规则,通过这组规范定义了程序中各个变量的访问方式.

由于 JMM 运行程序的实体是线程.而每个线程创建时JMM都会为其创建一个自己的工作内存(栈空间),工作内存是每个线程的私有 数据区域.而java内存模型中规定所有的变量都存储在主内存中,主内存是共享内存区域,所有线程都可以访问,但线程的变量的操作(读取赋值等)必须在自己的工作内存中去进行,首先要 将变量从主存拷贝到自己的工作内存中,然后对变量进行操作,操作完成后再将变量操作完后的新值写回主内存,不能直接操作主内存的变量,各个线程的工作内存中存储着主内存的变量拷贝的副本,因不同的线程间无法访问对方的工作内存,线程间的通信必须在主内存来完成。

如图所示:线程A对变量A的操作,只能是从主内存中拷贝到线程中,再写回到主内存中。

1.2.2 volatile

volatile 是JAVA的关键字用于修饰变量,是java虚拟机的轻量同步机制,volatile不能保证原子性。
作用:

  • 线程可见性:一个变量在某个线程里修改了它的值,如果使用了volatile关键字,那么别的线程可以马上读到修改后的值。
  • 指令重排序:没加之前,指令是并发执行的,第一个线程执行到一半另一个线程可能开始执行了。加了volatile关键字后,不同线程是按照顺序一步一步执行的。1.2.3 CASCAS是Compare and Swap,就是比较和交换,而比较和交换是一个原子操作。线程基于CAS修改数据的方式:先获取主内存数据,在修改之前,先比较数据是否一致,如果一致修改主内存数据,如果不一致,放弃这次修改。

作用:CAS会使用现代处理器上提供的高效机器级别原子指令,这些原子指令以原子方式对内存执行读-改-写操作。1.2.4 AQSAQS的全称是AbstractQueuedSynchronizer(抽象的队列式的同步器),AQS定义了一套多线程访问共享资源的同步器框架。

AQS主要包含两部分内容:共享资源和等待队列。AQS底层已经对这两部分内容提供了很多方法。

  • 共享资源:共享资源是一个volatile的int类型变量。
  • 等待队列:等待队列是一个线程安全的队列,当线程拿不到锁时,会被park并放入队列。

2 源码解析

ReentrantLock在包java.util.concurrent.locks下,实现Lock接口。

2.1 lock方法

lock分为公平锁和非公平锁。

公平锁:

final void lock() {
    acquire(1);
}

非公平锁:上来先尝试将state从0修改为1,如果成功,代表获取锁资源。如果没有成功,调用acquire。state是AQS中的一个由volatile修饰的int类型变量,多个线程会通过CAS的方式修改state,在并发情况下,只会有一个线程成功的修改state。

final void lock() {
//通过原子方式修改值
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}


 /**
 * 获取锁的线程.
 */
 private transient Thread exclusiveOwnerThread;


  /**
  * 设置拥有锁的线程
  */
  protected final void setExclusiveOwnerThread(Thread thread) {
      exclusiveOwnerThread = thread;

2.2 acquire方法

acquire是一个业务方法,里面并没有实际的业务处理,都是在调用其他方法。

public final void acquire(int arg) {
    //调用tryAcquire方法:尝试获取锁资源(非公平、公平),拿到锁资源,返回true,直接结束方法
    if (!tryAcquire(arg) &&
    //当没有获取锁资源后,会先调用addWaiter:会将没有获取到锁资源的线程封装为Node对象,
    //并且插入到AQS的队列的末尾.
   //继续调用acquireQueued方法,查看当前排队的Node是否在队列的前面,如果在前面,尝试获取锁资源
    //如果没在前面,尝试将线程挂起,阻塞起来!
     acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
     selfInterrupt();
}

2.3 tryAcquire方法

tryAcquire分为公平和非公平两种。

公平:

 protected final boolean tryAcquire(int acquires) {
   //拿到当前线程
    final Thread current = Thread.currentThread();
   //拿到AQS的state
     int c = getState();
   // 如果state == 0,说明没有线程占用着当前的锁资源
     if (c == 0) {
   //如果没有线程排队,直接直接CAS尝试获取锁资源
      if (!hasQueuedPredecessors() &&
      compareAndSetState(0, acquires)) {
     //如果获取资源成功,将当前线程设置为持有锁资源的线程
       setExclusiveOwnerThread(current);
        return true;
          }
        }
     //如果有线程持有锁资源,判断持有锁资源的线程是否是当前线程
         else if (current == getExclusiveOwnerThread()) {
      //增加AQS的state的值
            int nextc = c + acquires;
             if (nextc < 0)
             throw new Error("Maximum lock count exceeded");
              setState(nextc);
               return true;
            }
          return false;
        }
    }

非公平:

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
   //拿到当前线程
    final Thread current = Thread.currentThread();
   //拿到AQS的state
     int c = getState();
   // 如果state == 0,说明没有线程占用着当前的锁资源
      if (c == 0) {
   //获取锁资源
      if (compareAndSetState(0, acquires)) {
   //将当前占用这个互斥锁的线程属性设置为当前线程
      setExclusiveOwnerThread(current);
       return true;
       }
       }
   //如果有线程持有锁资源,判断持有锁资源的线程是否是当前线程
       else if (current == getExclusiveOwnerThread()) {
          int nextc = c + acquires;
          if (nextc < 0) // overflow
          throw new Error("Maximum lock count exceeded");
           setState(nextc);
           return true;
          }
         return false;
  }

2.4 addWaiter方法

在获取锁资源失败后,需要将当前线程封装为Node对象,并且插入到AQS队列的末尾。

private Node addWaiter(Node mode) {
    // 将当前线程封装为Node对象,mode为null,代表互斥锁
    Node node = new Node(Thread.currentThread(), mode);
    // pred是tail节点
    Node pred = tail;
    // 如果pred不为null,有线程正在排队
    if (pred != null) {
        // 将当前节点的prev,指定tail尾节点
        node.prev = pred;
        // 以CAS的方式,将当前节点变为tail节点
        if (compareAndSetTail(pred, node)) {
            // 之前的tail的next指向当前节点
            pred.next = node;
            return node;
        }
    }
    // 添加的流程为,  自己prev指向、tail指向自己、前节点next指向我
    // 如果上述方式,CAS操作失败,导致加入到AQS末尾失败,如果失败,就基于enq的方式添加到AQS队列
    enq(node);
    return node;
}
// enq,无论怎样都添加进入
private Node enq(final Node node) {
    for (;;) {
        // 拿到tail
        Node t = tail;
        // 如果tail为null,说明当前没有Node在队列中
        if (t == null) { 
            // 创建一个新的Node作为head,并且将tail和head指向一个Node
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 和上述代码一致!
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

2.5 acquireQueued方法

// acquireQueued方法
// 查看当前排队的Node是否是head的next,
// 如果是,尝试获取锁资源,
// 如果不是或者获取锁资源失败那么就尝试将当前Node的线程挂起(unsafe.park())
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        for (;;) {
            // 拿到上一个节点
            final Node p = node.predecessor();
            if (p == head && // 说明当前节点是head的next
                tryAcquire(arg)) { // 竞争锁资源,成功:true,失败:false
                // 进来说明拿到锁资源成功
                // 将当前节点置位head,thread和prev属性置位null
                setHead(node);
                // 帮助快速GC
                p.next = null; 
                // 设置获取锁资源成功
                failed = false;
                // 不管线程中断。
                return interrupted;
            }
            // 如果不是或者获取锁资源失败,尝试将线程挂起
            // 第一个事情,当前节点的上一个节点的状态正常!
            // 第二个事情,挂起线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                // 通过LockSupport将当前线程挂起
                parkAndCheckInterrupt())
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}


// 确保上一个节点状态是正确的
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 拿到上一个节点的状态
    int ws = pred.waitStatus;
    // 如果上一个节点为 -1
    if (ws == Node.SIGNAL)
        // 返回true,挂起线程
        return true;
    // 如果上一个节点是取消状态
    if (ws > 0) {
        // 循环往前找,找到一个状态小于等于0的节点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 将小于等于0的节点状态该为-1
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

2.6 unlock方法

释放锁资源,将state减1,如果state减为0了,唤醒在队列中排队的Node。

public final boolean release(int arg) {
    // 核心的释放锁资源方法
    if (tryRelease(arg)) {
        // 释放锁资源释放干净了。  (state == 0)
        Node h = head;
        // 如果头节点不为null,并且头节点的状态不为0,唤醒排队的线程
        if (h != null && h.waitStatus != 0)
            // 唤醒线程
            unparkSuccessor(h);
        return true;
    }
    // 释放锁成功,但是state != 0
    return false;
}
// 核心的释放锁资源方法
protected final boolean tryRelease(int releases) {
    // 获取state - 1
    int c = getState() - releases;
    // 如果释放锁的线程不是占用锁的线程,抛异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    // 是否成功的将锁资源释放利索 (state == 0)
    boolean free = false;
    if (c == 0) {
        // 锁资源释放干净。
        free = true;
        // 将占用锁资源的属性设置为null
        setExclusiveOwnerThread(null);
    }
    // 将state赋值
    setState(c);
    // 返回true,代表释放干净了
    return free;
}


// 唤醒节点
private void unparkSuccessor(Node node) {
    // 拿到头节点状态
    int ws = node.waitStatus;
    // 如果头节点状态小于0,换为0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 拿到当前节点的next
    Node s = node.next;
    // 如果s == null ,或者s的状态为1
    if (s == null || s.waitStatus > 0) {
        // next节点不需要唤醒,需要唤醒next的next
        s = null;
        // 从尾部往前找,找到状态正常的节点。(小于等于0代表正常状态)
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 经过循环的获取,如果拿到状态正常的节点,并且不为null
    if (s != null)
        // 唤醒线程
        LockSupport.unpark(s.thread);
}

3 使用实例

3.1 公平锁

1.代码:

public class ReentrantLockTest {


    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock(true);
        new Thread(()->test(lock),"线程A").start();
        new Thread(()->test(lock),"线程B").start();
        new Thread(()->test(lock),"线程C").start();
    }
    public static   void test(ReentrantLock lock){
        for (int i = 0; i < 3;i++){
            try {
                lock.lock();
                System.out.println(Thread.currentThread().getName()+"获取了锁!");
                Thread.sleep(200);
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
        }
    }
}

2.执行结果:

3.小结:

公平锁可以保证每个线程获取锁的机会是相等的。

3.2 非公平锁

1.代码:

public class ReentrantLockTest {


    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        new Thread(()->test(lock),"线程A").start();
        new Thread(()->test(lock),"线程B").start();
        new Thread(()->test(lock),"线程C").start();
    }
    public static   void test(ReentrantLock lock){
        for (int i = 0; i < 3;i++){
            try {
                lock.lock();
                System.out.println(Thread.currentThread().getName()+"获取了锁!");
                Thread.sleep(200);
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
        }
    }
}

2.执行结果:

3.小结:

非公平锁每个线程获取锁的机会是随机的。

3.3 忽略重复操作

1.代码:

public class ReentrantLockTest {
    private ReentrantLock lock = new ReentrantLock();


    public void doSomething(){
        if(lock.tryLock()){
            try {
                System.out.println(Thread.currentThread().getName()+"获取了锁!");
                Thread.sleep(5);
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
        }
    }
    public static void main(String[] args) throws Exception {
        ReentrantLockTest test = new ReentrantLockTest();
        for (int i = 0; i < 10;i++){
            new Thread(()->{test.doSomething();},"线程"+i).start();
            Thread.sleep(1);
        }
    }
}

2.执行结果:

3.小结:

当线程持有锁时,不会重复执行,可以用来防止定时任务重复执行或者页面事件多次触发时不会重复触发。

3.4 超时不执行

1.代码:

public class ReentrantLockTest {


    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        new Thread(()->test(lock),"线程A").start();
        new Thread(()->test(lock),"线程B").start();
    }
    public static   void test(ReentrantLock lock){
            try {
                if(lock.tryLock(2, TimeUnit.SECONDS)){
                    try {
                        System.out.println(Thread.currentThread().getName()+"获取了锁!");
                        Thread.sleep(3000);
                    }finally {
                        lock.unlock();
                    }
                }
            }catch (Exception e){
                e.printStackTrace();
            }


    }

2.执行结果:

3.小结:

超时不执行可以防止由于资源处理不当长时间占用资源产生的死锁问题。

4 总结

并发是现在软件系统不可避免的问题,ReentrantLock是可重入的独占锁,比起synchronized功能更加丰富,支持公平锁实现,支持中断响应以及限时等待等,是处理并发问题很好的解决方案。

作者:京东物流 陈昌浩

来源:京东云开发者社区

展开阅读全文
  • 0
    感动
  • 0
    路过
  • 0
    高兴
  • 0
    难过
  • 0
    搞笑
  • 0
    无聊
  • 0
    愤怒
  • 0
    同情
热度排行
友情链接