Java并发
概念
并发编程三要素:
- 原子性:要么成功要么失败
- 可见性:线程A修改完变量后,线程B能否感知
- 有序性:指令执行顺序
Java内存模型
即Java虚拟机的内存规范,用于屏蔽各操作系统和平台间的差异
具体内容:
- 变量在主存中(内存)
- 线程有自己的工作内存(类似高速缓存)
- 线程对变量操作只能在自身工作内存中执行,不能直接操作主存
内存模型关于三要素的保证:
- 原子性
- 简单类型的赋值才是原子操作(x = 10 是,x = y 不是)
- 可见性
- 不保证,需要使用volatile关键字
- 有序性
- 允许使用指令重排序,对单线程无影响,对多线程会造成影响
- 能够保证的有:
- 保证单线程内指令重排序对最终结果无影响(也即实际执行顺序可能会变,但结果不变)
- 若 unlock 写在前面,lock写在后面,不会重排序造成lock在前
- volatile变量,对它的先写后读操作,不会重排序成先读后写
- 线程4个顺序,不重要
指令重排序
操作系统为了性能优化,实际执行指令的顺序可能与书写顺序不一致
这种优化不会改变单线程的实际执行结果,但对多线程有影响
举例:
x = y;
z = 1;
z++;
x = x + z
对应cpu行为,x = y,去内存中取y的值,较慢
z = 1;z++;
可能对应设置寄存器为1,寄存器进行加1操作。
显然,等待 x = y的这段时间,先做后面两行代码操作是没问题的
Volatile
作用
- 禁止指令重排序
- 当程序执行到volatile变量的读或写时,保证前面的读写已发生且已可见,后面的读写未发生
- 书写顺序在volatile变量操作前面的指令不会排到后面,反之一样
- 只保证可见性,不保证原子性
实现原理
内存屏障,估计是cpu提供的功能:
- 禁止指令重排序,同上
- 写操作立即刷新进主内存
- 线程的读volatile变量操作,不会使用高速缓存,而是会去主存再读一遍
场景举例
//线程1
volatile boolean stop = false;
while(!stop){
doSomething();
}
//线程2
stop = true;
锁
概念
- 可重入锁
- 内部可以再次进入
- 假如用变量 x 来表示,进入锁即加1,超过1即不能再进入锁。那么已经获得锁的方法,递归调用自己,是不能获得锁的,即为不可重入锁。反之为可重入锁。
- 公平锁:先等锁的先获得锁
- 非公平锁:等待队列上的线程,不分先后顺序,每次有机会时都随机指定一个
- 自旋锁:拿不到锁时不是线程阻塞,而是空白死循环去一直拿
- 乐观锁:默认无冲突,先操作再检测冲突
- 悲观锁:默认有冲突,先加锁再操作
- 独占锁:ReentrantLock/Synchronized
- 共享锁:ReentrantReadWriteLock,读锁为共享锁,写锁为独占锁
Synchronized
特性
- 可重入
- 抛出异常会自动释放锁
版本差异
版本1.6及之前
调用操作系统原生方法进行同步控制
版本1.6之后
- 分为无状态锁、偏向锁、轻量级锁、重量级锁
- 默认用开销少的锁,竞争大的时候再升级,少的时候降级。因而可以优化性能
- 锁粗化,即合并加锁
- 锁消除,判断代码有没有线程外的变量,来判断是否需要加锁
- 适应性自旋,即成功过一下一次循环就久一点,否则就短一点
单例模式
public class Singleton {
private volatile static Singleton uniqueInstance;
private Singleton() {
}
public static Singleton getUniqueInstance() {
//先判断对象是否已经实例过,没有实例化过才进入加锁代码
if (uniqueInstance == null) {
//类对象加锁
synchronized (Singleton.class) {
if (uniqueInstance == null) {
uniqueInstance = new Singleton();
}
}
}
return uniqueInstance;
}
}
volatile阻止的不是singleton = newSingleton()这句话内部[分配空间、初始化、赋值]的指令重排,而是保证了在一个写操作([1-2-3])完成之前,不会调用读操作(if (instance == null))
Lock
ReadWriteLock
读写锁
ReentrantLock - 排它锁
支持公平与非公平
// 构造参数:isFair
Lock NonFairlock = new ReentrantLock(false);
Lock fairLock = new ReentrantLock(true);
生产者消费者示例
// 馒头生产消费示例
package cn.chenrenyi.start;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author chenrenyi
**/
public class ProductAndConsumeQueue {
public List<Integer> queue;
private int maxQueueSize;
private Lock lock;
private Condition notFull;
private Condition notEmpty;
public ProductAndConsumeQueue(int maxQueueSize) {
this.queue = new ArrayList<>();
this.maxQueueSize = maxQueueSize;
this.lock = new ReentrantLock(true);
this.notFull = lock.newCondition();
this.notEmpty = lock.newCondition();
}
public void push() {
try {
lock.lock();
while (queue.size() == maxQueueSize) {
try {
// wait until not full
System.out.println("队列已满,生产者休眠中,等待队列不满再生产");
notFull.await();
System.out.println("生产者被唤醒");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// start to push because of not empty
queue.add(new Random().nextInt());
System.out.println("生产者生产了馒头");
// signal to public that queue is not empty now.
notEmpty.signal();
} finally {
lock.unlock();
}
}
public void pop() {
try {
lock.lock();
while (queue.size() == 0) {
try {
// wait until not empty
System.out.println("队列已空,消费者进入休眠,等待队列不空时再消费");
notEmpty.await();
System.out.println("消费者被唤醒");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// start to push because of not empty
queue.remove(0);
System.out.println("消费了馒头");
// signal to public that queue is not full now.
notFull.signal();
} finally {
lock.unlock();
}
}
public static void test() {
ProductAndConsumeQueue queue = new ProductAndConsumeQueue(4);
Runnable productor = new Runnable() {
@Override
public void run() {
while (true) {
try {
TimeUnit.SECONDS.sleep(new Random().nextInt(5));
} catch (InterruptedException e) {
e.printStackTrace();
}
queue.push();
}
}
};
Runnable consumer = new Runnable() {
@Override
public void run() {
while(true) {
try {
TimeUnit.SECONDS.sleep(new Random().nextInt(5));
} catch (InterruptedException e) {
e.printStackTrace();
}
queue.pop();
}
}
};
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(productor);
executorService.execute(consumer);
executorService.shutdown();
}
}
相关类
Atomic
包:java.util.concurrent.atomic
示例:AtomicInteger.incrementAndGet()
实现原理
自旋加CAS+volatile:while(cas(old, new)){}
CAS
compare and swap,cpu提供的指令,比较并替换。上述类的原理。
ABA问题
线程1将变量替换2,线程2又替换为1,线程3以为没变其实已经变过一轮了
解决方法:
- AtomicStampedReference:版本号解决,对原数加1时,对版本号也加1
并发工具包
- Semaphore(信号量)-允许多个线程同时访问
- CountDownLatch(倒计时器)
- 一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行
- CyclicBarrier(循环栅栏)
- 一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行
线程
线程池
构造函数
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
@NotNull TimeUnit unit,
@NotNull BlockingQueue<Runnable> workQueue,
@NotNull ThreadFactory threadFactory,
@NotNull RejectedExecutionHandler handler
失败处理策略
AbortPolicy:A handler for rejected tasks that throws a RejectedExecutionException
DiscardPolicy:A handler for rejected tasks that silently discards the rejected task
DiscardOldestPolicy:A handler for rejected tasks that discards the oldest unhandled request and then retries execute, unless the executor is shut down, in which case the task is discarded.
CallerRunsPolicy:A handler for rejected tasks that runs the rejected task directly in the calling thread of the execute method, unless the executor has been shut down, in which case the task is discarded.
线程池种类
newSingleThreadExecutor:单线程,无限队列,不回收
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
newFixedThreadPool:固定线程数,无限队列,不回收
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newScheduledThreadPool:延迟或周期执行任务。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
newCachedThreadPool
// 无限队列,动态创建,60秒空闲即回收。使用同步队列 SynchronousQueue,提交任务操作会阻塞直到线程创建好并取出任务
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newWorkStealingPool:ForkJoinPool,1.7之后新增
ThreadLocal
线程本地变量,线程内可见,线程外不可见
// 创建
ThreadLocal<String> mStringThreadLocal = new ThreadLocal<>();
ThreadLocal<String> mThreadLocal = new ThreadLocal<String>() {
@Override
protected String initialValue() {
return Thread.currentThread().getName();
}
};
// 使用
mStringThreadLocal.set("chenrenyi");
mStringThreadLocal.get();
Set 创建过程:
- 首先获取当前线程的成员变量:threadLocals,类型:ThreadLocalMap
- 如果上述ThreadLocalMap对象不为空,则设置值,否则创建这个ThreadLocalMap对象并设置值
- 以当前 ThreadLocal 对象为 key,set 的参数值为值
即创建一个 ThreadLocalMap ,再赋值给 Thread.threadLocals 属性上。map 的 key 为创建的 ThreadLocal 对象,值为值。其中 key 弱引用,避免内存泄漏
为何只在线程内可见:
因为 threadLocalMap 是 thread 的一个成员变量,而每个 thread 的成员变量自然不一样,所以只在线程内可见
对象存放在哪里:
堆上
发布于 2020/08/22
浏览
次