原创

Java锁体系之JUC介绍

众所周知,java 是以高并发闻名,并发离不开的就是锁,前文再讲解 java 内存模型的时候介绍过 volatile 关键字,上一篇文章介绍了 sychorized 关键字,分别介绍了他们的使用和原理,本章,小编带大家了解最好一个同步利器:JUC

1.JUC 架构体系

下图为 java 并发包的基础架构,从中我们能知道他们的层次结构,其中 volatile 和 CAS 位于最底层,他们是上面的技术支撑,通过他们,可以演变出来好用的原子类,非阻塞数据结构,AQS,最上层都是针对不同使用场景来生成出来的实用工具类,下文将从最底层的 CAS 开始讲起 JUC架构体系

2.CAS 原理

2.1.简介

CAS 全称是 compare and swap,是一种用于在多线程环境下实现同步功能的机制。CAS 操作包含三个操作数 -- 内存位置、预期数值和新值。CAS 的实现逻辑是将内存位置处的数值与预期数值想比较,若相等,则将内存位置处的值替换为新值。若不相等,则不做任何操作。 在 Java 中,Java 并没有直接实现 CAS,CAS 相关的实现是通过 C++ 内联汇编的形式实现的。Java 代码需通过 JNI 才能调用。

可以参考如下博客了解具体CAS原理

2.2AS 的优缺点

优点:非阻塞的轻量级的乐观锁,通过 CPU 指令实现,在资源竞争不激烈的情况下性能高,相比 synchronized 重量锁,synchronized 会进行比较复杂的加锁、解锁和唤醒操作。 缺点: 1.ABA 问题: 线程 C、D;线程 D 将 A 修改为 B 后又修改为 A,此时 C 线程以为 A 没有改变过,java 的原子类 AtomicStampedReference,通过控制变量值的版本号来保证 CAS 的正确性。具体解决思路就是在变量前追加上版本号,每次变量更新的时候把版本号加一,那么 A - B - A 就会变成 1A - 2B - 3A。 2.自旋时间过长,消耗 CPU 资源,如果资源竞争激烈,多线程自旋长时间消耗资源

2.3 CAS 原理

当 JNI 调用 CAS 方法的时候,JVM 会根据当前处理器的类型来决定是否为 cmpxchg 指令添加 lock 前缀。如果程序是在多处理器上运行,就为 cmpxchg 指令加上 lock 前缀(lock cmpxchg)。反之,如果程序是在单处理器上运行,就省略 lock 前缀(单处理器自身会维护单处理器内的顺序一致性,不需要 lock 前缀提供的内存屏障效果)。可见,CAS 其实在 cpu 也是使用了锁,只是 cpu 层面的锁粒度较 jvm 层面的锁粒度小太多,所以性能会好很多。

3.volicate 原理

之前文章:“Java 内存模型之 volatile 关键字的语义及原理”已经详细介绍了,这里不再赘述

4.JUC 关键核心类 AQS 解析

JUC 包里面,最为核心的类就是 AQS 和原子包里面的类,他们为 java 并发 lock 提供了模板支持,因为 AQS 是一个抽象类,要使用必须写子类,ReentrantLock 就是一个很好的子类

4.1 ReentrantLock 概念:

ReentrantLock 内部包含了一个 AQS 对象,也就是 AbstractQueuedSynchronizer 类型的对象。这个 AQS 对象就是 ReentrantLock 可以实现加锁和释放锁的关键性的核心组件。ReentrantLock 之所以用 Reentrant 打头,意思就是他是一个可重入锁。可重入锁的意思,就是你可以对一个 ReentrantLock 对象多次执行 lock()加锁和 unlock()释放锁,也就是可以对一个锁加多次,叫做可重入加锁。

4.2 Demo:

private static ReentrantLock lock = new ReentrantLock();
	public static void main(String[] args) {
		for (int i = 0; i < 5; i++) {
			Thread thread = new Thread(new Runnable() {
				@Override
				public void run() {
					lock.lock();
					try {
						Thread.sleep(2000);
						System.out.println(Thread.currentThread().getName());
					} catch (InterruptedException e) {
						e.printStackTrace();
					} finally {
						lock.unlock();
					}
				};
			});
			thread.start();
		}
	}

4.3 ReentrantLock 加锁和释放锁的底层原理流程:

ReentrantLock加锁和释放锁的底层原理流程:
ReentrantLock加锁和释放锁的底层原理流程:

流程:

  1. AQS 对象内部有一个核心的变量叫做 state,是 int 类型的,代表了加锁的状态。初始状态下,这个 state 的值是 0,AQS 内部还有一个关键变量,用来记录当前加锁的是哪个线程,初始化状态下,这个变量是 null。
  2. 线程 1 跑过来调用 ReentrantLock 的 lock()方法尝试进行加锁,这个加锁的过程,直接就是用 CAS 操作将 state 值从 0 变为 1,如果之前没人加过锁,那么 state 的值肯定是 0,此时线程 1 就可以加锁成功。
  3. 其实每次线程 1 可重入加锁一次,会判断一下当前加锁线程就是自己,那么他自己就可以可重入多次加锁,每次加锁就是把 state 的值给累加 1,别的没啥变化。.接着,如果线程 1 加锁了之后,线程 2 跑过来一下看到,哎呀!state 的值不是 0 啊?所以 CAS 操作将 state 从 0 变为 1 的过程会失败,因为 state 的值当前为 1,说明已经有人加锁了!
  4. 接着线程 2 会看一下,是不是自己之前加的锁啊?当然不是了,“加锁线程”这个变量明确记录了是线程 1 占用了这个锁,所以线程 2 此时就是加锁失败。
  5. 接着,线程 2 会将自己放入 AQS 中的一个等待队列,因为自己尝试加锁失败了,此时就要将自己放入队列中来等待,等待线程 1 释放锁之后,自己就可以重新尝试加锁了
  6. 接着,线程 1 在执行完自己的业务逻辑代码之后,就会释放锁!他释放锁的过程非常的简单,就是将 AQS 内的 state 变量的值递减 1,如果 state 值为 0,则彻底释放锁,会将“加锁线程”变量也设置为 null!
  7. 接下来,会从等待队列的队头唤醒线程 2 重新尝试加锁。线程 2 现在就重新尝试加锁,这时还是用 CAS 操作将 state 从 0 变为 1,此时就会成功,成功之后代表加锁成功,就会将 state 设置为 1。此外,还要把“加锁线程”设置为线程 2 自己,同时线程 2 自己就从等待队列中出队了。

5.JUC 组件使用实战

java 并发包有很多已经提供的 lock,下面列举几个的使用场景和 demo 演示(Condition、CountDownLatch、CyclicBarrier、Semaphore、ReentrantReadWriteLock)

5.1.Condition

Condition 是在 java 1.5 中才出现的,它用来替代传统的 Object 的 wait()、notify()实现线程间的协作,相比使用 Object 的 wait()、notify(),使用 Condition 的 await()、signal()这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用 Condition,阻塞队列实际上是使用了 Condition 来模拟线程间协作。

Condition 是个接口,基本的方法就是 await()和 signal()方法;

Condition 依赖于 Lock 接口,生成一个 Condition 的基本代码是 lock.newCondition()

调用 Condition 的 await()和 signal()方法,都必须在 lock 保护之内,就是说必须在 lock.lock()和 lock.unlock 之间才可以使用

Demo:

public class ConditionTest2 {
	private static BoundedBuffer bb = new BoundedBuffer();
	public static void main(String[] args) {
		// 启动10个“写线程”,向BoundedBuffer中不断的写数据(写入0-9);
		// 启动10个“读线程”,从BoundedBuffer中不断的读数据。
		for (int i = 0; i < 10; i++) {
			new PutThread("p" + i, i).start();
			new TakeThread("t" + i).start();
		}
	}

	static class PutThread extends Thread {
		private int num;

		public PutThread(String name, int num) {
			super(name);
			this.num = num;
		}

		public void run() {
			try {
				Thread.sleep(1); // 线程休眠1ms
				bb.put(num); // 向BoundedBuffer中写入数据
			} catch (InterruptedException e) {
			}
		}
	}

	static class TakeThread extends Thread {
		public TakeThread(String name) {
			super(name);
		}

		public void run() {
			try {
				Thread.sleep(10); // 线程休眠1ms
				Integer num = (Integer) bb.take(); // 从BoundedBuffer中取出数据
			} catch (InterruptedException e) {
			}
		}
	}
}

class BoundedBuffer {
	final Lock lock = new ReentrantLock();
	final Condition notFull = lock.newCondition();
	final Condition notEmpty = lock.newCondition();

	final Object[] items = new Object[5];
	int putptr, takeptr, count;

	public void put(Object x) throws InterruptedException {
		lock.lock(); // 获取锁
		try {
			// 如果“缓冲已满”,则等待;直到“缓冲”不是满的,才将x添加到缓冲中。
			while (count == items.length)
				notFull.await();
			// 将x添加到缓冲中
			items[putptr] = x;
			// 将“put统计数putptr+1”;如果“缓冲已满”,则设putptr为0。
			if (++putptr == items.length)
				putptr = 0;
			// 将“缓冲”数量+1
			++count;
			// 唤醒take线程,因为take线程通过notEmpty.await()等待
			notEmpty.signal();
			// 打印写入的数据
			System.out.println(Thread.currentThread().getName() + " put  " + (Integer) x);
		} finally {
			lock.unlock(); // 释放锁
		}
	}

	public Object take() throws InterruptedException {
		lock.lock(); // 获取锁
		try {
			// 如果“缓冲为空”,则等待;直到“缓冲”不为空,才将x从缓冲中取出。
			while (count == 0)
				notEmpty.await();
			// 将x从缓冲中取出
			Object x = items[takeptr];
			// 将“take统计数takeptr+1”;如果“缓冲为空”,则设takeptr为0。
			if (++takeptr == items.length)
				takeptr = 0;
			// 将“缓冲”数量-1
			--count;
			// 唤醒put线程,因为put线程通过notFull.await()等待
			notFull.signal();
			// 打印取出的数据
			System.out.println(Thread.currentThread().getName() + " take " + (Integer) x);
			return x;
		} finally {
			lock.unlock(); // 释放锁
		}
	}
}

例子展示了一个有界缓存的实现,当缓存中无数据时,阻塞 take 线程,直到 put 线程通知,类似和可替代 Object 类的 notify()和 wait();

5.2CountDownLatch

CountDownLatch(倒计时器)是 java.util.concurrent 包中一个类,CountDownLatch 只要提供的机制是多个(具体数量等于初始化 CountDownLatch 时 count 的值)线程都达到了预期状态或者完成了预期工作时触发事件,其他线程可以等待这个事件来触发自己后续的工作。等待的线程可以是多个,即 CountDownLatch 可以唤醒多个等待的线程。到达自己预期状态的线程会调用 CountDownLatch 的 countDown 方法,而等待的线程会调用 CountDownLatch 的 await 方法。

CoundDownLatch 这个类能够使一个线程等待其他线程完成各自的工作后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行。

CoundDownLatch 是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减 1。当计数器值到达 0 时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。

Demo:

public class CountDownLatchTest {
	private static int LATCH_SIZE = 5;
	private static CountDownLatch doneSignal;
	public static void main(String[] args) {
		try {
			doneSignal = new CountDownLatch(LATCH_SIZE);
			// 新建5个任务
			for (int i = 0; i < LATCH_SIZE; i++){
				new InnerThread().start();
			}
			System.out.println("main await begin.");
			// "主线程"等待线程池中5个任务的完成
			doneSignal.await();
			System.out.println("main await finished.");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	static class InnerThread extends Thread {
		public void run() {
			try {
				Thread.sleep(1000);
				System.out.println(Thread.currentThread().getName() + " sleep 1000ms.");
				// 将CountDownLatch的数值减1
				doneSignal.countDown();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

代码中 main 线程在 doneSignal.await()的时候阻塞了,直到设定的次数 doneSignal.countDown()才会通知 main 线程继续执行

5.3 CyclicBarrier

CyclicBarrier(循环栅栏): CyclicBarrier 和 CountDownLatch 非常类似,它也可以实现线程间的技术等待,但是它的功能比 CountDownLatch 更加复杂和强大。主要应用场景和 CountDownLatch 类似。CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。

CyclicBarrier 默认的构造方法是 CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。

Demo:

public class CyclicBarrierTest {
	public static void main(String[] args) {
		CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
			public void run() {
				System.out.println("CyclicBarrier arrived." );
			}
		});
		for (int i = 0; i < barrier.getParties(); i++) {
			new Thread(new MyRunnable(barrier), "队友" + i).start();
		}
		System.out.println("main function is finished.");
	}

	private static class MyRunnable implements Runnable {
		private CyclicBarrier barrier;

		public MyRunnable(CyclicBarrier barrier) {
			this.barrier = barrier;
		}

		@Override
		public void run() {
			for (int i = 0; i < 3; i++) {
				try {
					Random rand = new Random();
					int randomNum = rand.nextInt((3000 - 1000) + 1) + 1000;// 产生1000到3000之间的随机整数
					Thread.sleep(randomNum);
					System.out.println(Thread.currentThread().getName() + ", 通过了第" + i + "个障碍物, 使用了 " + ((double) randomNum / 1000) + "s");
					this.barrier.await();
				} catch (InterruptedException e) {
					e.printStackTrace();
				} catch (BrokenBarrierException e) {
					e.printStackTrace();
				}
			}
		}
	}
}

上面代码模拟的是 假设有一家公司要全体员工进行团建活动,活动内容为翻越三个障碍物,每一个人翻越障碍物所用的时间是不一样的。但是公司要求所有人在翻越当前障碍物之后再开始翻越下一个障碍物,也就是所有人翻越第一个障碍物之后,才开始翻越第二个,以此类推。类比地,每一个员工都是一个“其他线程”。当所有人都翻越的所有的障碍物之后,程序才结束。而主线程可能早就结束了,这里我们不用管主线程。

总结:

CountDownLatch 和 CyclicBarrier 都有让多个线程等待同步然后再开始下一步动作的意思,但是 CountDownLatch 的下一步的动作实施者是主线程,具有不可重复性;而 CyclicBarrier 的下一步动作实施者还是“其他线程”本身,具有往复多次实施动作的特点。

5.4 Semaphore

Semaphore(信号量)-允许多个线程同时访问: synchronized 和 ReentrantLock 都是一次只允许一个线程访问某个资源,Semaphore(信号量)可以指定多个线程同时访问某个资源。

Demo:

public class SemaphoreTest1 {
	private static final int SEM_MAX = 10;

	public static void main(String[] args) {
		Semaphore sem = new Semaphore(SEM_MAX);
		// 创建线程池
		ExecutorService threadPool = Executors.newFixedThreadPool(3);
		// 在线程池中执行任务
		threadPool.execute(new MyThread(sem, 5));
		threadPool.execute(new MyThread(sem, 4));
		threadPool.execute(new MyThread(sem, 7));
		// 关闭池
		threadPool.shutdown();
	}
}

class MyThread extends Thread {
	private volatile Semaphore sem; // 信号量
	private int count; // 申请信号量的大小

	MyThread(Semaphore sem, int count) {
		this.sem = sem;
		this.count = count;
	}

	public void run() {
		try {
			// 从信号量中获取count个许可
			sem.acquire(count);
			Thread.sleep(2000);
			System.out.println(Thread.currentThread().getName() + " acquire count=" + count);
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			// 释放给定数目的许可,将其返回到信号量。
			sem.release(count);
			System.out.println(Thread.currentThread().getName() + " release " + count + "");
		}
	}
}

上面代码中,信号量 sem 的许可总数是 10 个;共 3 个线程,分别需要获取的信号量许可数是 5,4,7。前面两个线程获取到信号量的许可后,sem 中剩余的可用的许可数是 1;因此,最后一个线程必须等前两个线程释放了它们所持有的信号量许可之后,才能获取到 7 个信号量许可。该思想有点类似限流算法里面的令牌桶算法

5.5 ReentrantReadWriteLock

ReentrantReadWriteLock 是 Lock 的另一种实现方式,我们已经知道了 ReentrantLock 是一个排他锁,同一时间只允许一个线程访问,而 ReentrantReadWriteLock 允许多个读线程同时访问,但不允许写线程和读线程、写线程和写线程同时访问。相对于排他锁,提高了并发性。在实际应用中,大部分情况下对共享数据(如缓存)的访问都是读操作远多于写操作,这时 ReentrantReadWriteLock 能够提供比排他锁更好的并发性和吞吐量。

读写锁内部维护了两个锁,一个用于读操作,一个用于写操作。所有 ReadWriteLock 实现都必须保证 writeLock 操作的内存同步效果也要保持与相关 readLock 的联系。也就是说,成功获取读锁的线程会看到写入锁之前版本所做的所有更新。

ReentrantReadWriteLock 支持以下功能:

支持公平和非公平的获取锁的方式;

  • 支持可重入。读线程在获取了读锁后还可以获取读锁;写线程在获取了写锁之后既可以再次获取写锁又可以获取读锁;
  • 还允许从写入锁降级为读取锁,其实现方式是:先获取写入锁,然后获取读取锁,最后释放写入锁。但是,从读取锁升级到写入锁是不允许的;
  • 读取锁和写入锁都支持锁获取期间的中断; Condition 支持。仅写入锁提供了一个 Conditon 实现;读取锁不支持 Conditon ,readLock().newCondition() 会抛出 UnsupportedOperationException。 Demo:
public class ReadWriteLockTest {

	public static void main(String[] args) {
		ReadWriteLock lock = new ReentrantReadWriteLock();
		final Lock readLock = lock.readLock();
		final Lock writeLock = lock.writeLock();
		final Resource resource = new Resource();
		final Random random = new Random();
		for (int i = 0; i < 20; ++i) {// 写线程
			new Thread() {
				public void run() {
					writeLock.lock();
					try {
						resource.setValue(resource.getValue() + 1);
						System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()) + " - " + Thread.currentThread() + "获取了写锁,修正数据为:" + resource.getValue());
						Thread.sleep(random.nextInt(1000));// 随机休眠
					} catch (Exception e) {
						e.printStackTrace();
					} finally {
						writeLock.unlock();
					}
				};
			}.start();
		}
		for (int i = 0; i < 20; ++i) {// 读线程
			new Thread() {
				public void run() {
					readLock.lock();
					try {
						System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()) + " - " + Thread.currentThread() + "获取了读锁,读取的数据为:" + resource.getValue());
						Thread.sleep(random.nextInt(800));// 随机休眠
					} catch (Exception e) {
						e.printStackTrace();
					} finally {
						readLock.unlock();
					}
				};
			}.start();
		}
	}

}

// 资源
class Resource {
	private int value;
	public void setValue(int value) {
		this.value = value;
	}
	public int getValue() {
		return value;
	}
}

6.原子变量类

原子类来自于 java.util.concurrent 包,而 java.util.concurrent 包完全建立在 CAS 和 volatile 之上的,原子变量类都是非阻塞的数据结构,相比 synchronized 重量级锁做了更多的优化。

6.1 AtomicInteger 简介

AtomicInteger 是一个 int 类型的原子类,了解 AtomicInteger,只需要两点:

  • Volatile 原语保证了线程之间数据的可见性;
  • CAS 指令,compareAndSet 利用 JNI 来完成 CPU 指令的操作,保证对比和赋值两个操作的的原子性

6.2 源码解析:

//定义变量int类型
private volatile int value;

//获取值
public final int get() {
    return value;
}

//改变值
public final int incrementAndGet() {
    for (;;) {
        int current = get();
        int next = current + 1;
        if (compareAndSet(current, next))
            return next;
    }
}

在这里采用了 CAS 操作,每次从内存中读取数据然后将此数据和+1 后的结果进行 CAS 操作,如果成功就返回结果,否则重试直到成功为止。

public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

整体的过程就是这样子的,利用 CPU 的 CAS 指令,同时借助 JNI 来完成 Java 的非阻塞算法。其它原子操作都是利用类似的特性完成的。 其中 compareAndSwapInt 方法如下:

unsafe.compareAndSwapInt(this, valueOffset, expect, update);
类似:if (this == expect) {
  this = update
 return true;
} else {return false;
}
正文到此结束