原子性
原子操作即在对变量的值进行操作的时候,以一种不可中断的方式一步到位的处理,从而操作了立刻生效,其他线程在处理时已经是最新的值,而不需要锁以及其他同步机制处理。一般而言,对于只涉及单个变量的情况,原子操作为其他同步机制提供一种方便和高效的提单方案。
原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序不可以被打乱,也不可以被切割而只执
行其中的一部分(不可中断性)。将整个操作视作一个整体, 资源在该次操作中保持一致,这是原子性的核心特征。
原子操作是通过使用一些类以及方法完成的,例如AtomicInteger、AtomicLong、AtomicBoolean、AtomicIntegerArray、AtomicLongArray、AtomicRefrenceArray、DoubleAccumulator、DoubleAdder、LongAccumulator和LongAddr。
CAS机制
CAS(Compare and swap)属于硬件同步原语,处理器提供的内存操作指令,保证原子性。CAS操作需要两个参数,一个旧值和一个目标值,修改前先比较旧值是否改变,如果变了,就将新值赋给变量,否则不做改变。
在Java中,实现原子性需要使用sun.misc.Unsafe类提供了CAS机制。
CAS存在的问题
- 仅针对单个变量的操作,不能用于多个变量来实现原子操作
- 循环+CAS,自旋的实现让所有线程都处于高频运行,争抢CPU执行时间的状态 。如果操作长时间不成功,会带来很大的CPU资源消耗
- ABA问题。(无法体现出数据的变动)。
ABA问题
如果在算法中采用自己的方式来管理节点对象的内存,那么可能出现ABA问题。在这种情况下,即使链表的头结点仍然只想之前观察到的节点,那么也不足以说明链表的内容没有发生变化。如果通过垃圾回收器来管理链表节点仍然无法避免ABA问题,那么还有一个相对简单的解决方法:不是只是更新某个引用的值,而是更新两个值,包含一个引用和一个版本号。
那么在JAVA中怎么操作可能会现ABA问题呢?
假如有一个Queue(先别管它怎么实现),那么在以下情况下会出现ABA问题:
- 1.我们在CAS中比较对节点的引用 & 我们复用节点。假如queue初始的状态是A -> E。在变化后的状态是A -> X -> E。那么我们在CAS中比较对A的引用时,就无法看出状态的变化。“复用”,就是像这个例子一样,把同一个节点再次加个队列。
- 2.我们在CAS中比较对节点的引用 & 某个new出来的节点A2的地址恰好和A1的地址相同。
第一种情况不管GC环境还是非GC环境,都会造成ABA问题。所以GC只是可能会避免ABA问题,就像《JAVA并发编程实战》中说的一样。
GC环境和无GC的环境(如C++)的不同在于第二种情况。即,在JAVA中第,第二种情况是不可能发生的。原因在于,在我们用CAS比较A1和A2这两个引用时,暗含着的事实是——这两个引用存在,所以它们所引用的对象都是GC root可达的。那么在A1引用的对象还未被GC时,一个新new的对象是不可能和A1的对象有相同的地址的。所以A1 != A2。
所以,在JAVA的GC环境中,如果两个引用在CAS中被判断为相等,它们引用的肯定是同一个对象。但是,这种描述对于非GC环境不成立。
例如,在C++中,我们对指针(类比于JAVA中的引用)采用CAS操作,那么即使两个指针相同,他们也未必引用同一个对象,有可能是第一个指针所指向的内存被释放后,第二个对象恰好分配在相同地址的内存。
由于ABA问题带来的隐患,各种乐观锁的实现中通常都会用版本戳version来对记录或对象标记,避免并发操作带来的问题,在Java中,AtomicStampedReference也实现了这个作用,它通过包装[E,Integer]的元组来对对象标记版本戳stamp,从而避免ABA问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
| import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicStampedReference; import java.util.concurrent.locks.LockSupport;
class Node { public final String value; public Node next;
public Node(String value) { this.value = value; }
@Override public String toString() { return "value=" + value; } }
class Stack { AtomicReference<Node> top = new AtomicReference<Node>();
public void push(Node node) { Node oldTop; do { oldTop = top.get(); node.next = oldTop; } while (!top.compareAndSet(oldTop, node)); System.out.println(Thread.currentThread().getName() + " push " + node); }
public Node pop(int time) {
Node newTop; Node oldTop; do { oldTop = top.get(); if (oldTop == null) { return null; } newTop = oldTop.next; if (time != 0) { LockSupport.parkNanos(1000 * 1000 * time); } } while (!top.compareAndSet(oldTop, newTop)); System.out.println(Thread.currentThread().getName() +" pop "+ oldTop.toString()); return oldTop; } }
class ConcurrentStack { AtomicStampedReference<Node> top = new AtomicStampedReference<>(null, 0);
public void push(Node node) { Node oldTop; int v; do { v = top.getStamp(); oldTop = top.getReference(); node.next = oldTop; } while (!top.compareAndSet(oldTop, node, v, v+1)); System.out.println(Thread.currentThread().getName() + " push " + node); }
public Node pop(int time) { Node newTop; Node oldTop; int v;
do { v = top.getStamp(); oldTop = top.getReference(); if (oldTop == null) { return null; } newTop = oldTop.next; if (time != 0) { LockSupport.parkNanos(1000 * 1000 * time); } } while (!top.compareAndSet(oldTop, newTop, v, v+1)); System.out.println(Thread.currentThread().getName() + " pop " + oldTop); return oldTop; } }
public class LearnABA { public static void main(String[] args) {
Thread.currentThread().setName("Thread0");
Stack stack = new Stack();
stack.push(new Node("B")); stack.push(new Node("A"));
Thread thread1 = new Thread(() -> { Node node = stack.pop(800);
System.out.println(Thread.currentThread().getName() + " done..."); }, "Thread1");
thread1.start();
Thread thread2 = new Thread(() -> { LockSupport.parkNanos(1000 * 1000 * 300L);
Node nodeA = stack.pop(0); Node nodeB = stack.pop(0);
stack.push(new Node("D")); stack.push(new Node("C")); stack.push(nodeA);
System.out.println(Thread.currentThread().getName() + " done..."); }, "Thread2");
thread2.start();
LockSupport.parkNanos(1000 * 1000 * 1000 * 2L);
System.out.println("开始遍历Stack:"); Node node = null; while ((node = stack.pop(0))!=null){ } } }
|
在上面测试代码中,使用普通CAS的栈,在第二次入栈之前的A节点的时候,因为引用判断通过了,因为Java的GC环境,入栈了第一个版本的A(只带有B节点的A),而不是我们所希望的第二个版本的A节点(B已经移除),对于这种情况,我们需要使用带版本的CAS锁。
JUC包的原子类
类 |
描述 |
AtomicBoolean |
原子更新布尔类型 |
AtomicInteger |
原子更新整型 |
AtomicLong |
原子更新长整型 |
AtomicIntegerArray |
原子更新整型数组里的元素。 |
AtomicLongArray |
原子更新长整型数组里的元素 |
AtomicRefrenceArray |
原子更新引用类型数组里的元素 |
AtomicIntegerFieldUpdater |
原子更新整型的字段的更新器 |
AtomicLongFieldUpdater |
原子更新长整型字段的更新器。 |
AtomicRefrenceFieldUpdater |
原子更新引用类型里的字段 |
AtomicRefrence |
原子更新引用类型 |
AtomicStampedRefrence |
原子更新带有版本号的引用类型 |
AtomicMarkableRefrence |
原子更新带有标记位的引用类型。 |
DoubleAdder |
Double计数器增强版 |
LongAdder |
Long计数器增强版 |
DoubleAccumulator |
Double计数器增强版,可自定义累加规则 |
LongAccumulator |
Long计数器增强版,可自定义累加规则 |
AtomicInteger
方法 |
描述 |
public final int getAndSet(int newValue) |
给AtomicInteger设置newValue并返回加oldValue |
public final boolean compareAndSet(int expect, int update) |
如果输入的值和期望值相等就set并返回true/false |
public final int getAndIncrement() |
对AtomicInteger原子的加1并返回当前自增前的value |
public final int getAndDecrement() |
对AtomicInteger原子的减1并返回自减之前的的value |
public final int getAndAdd(int delta) |
对AtomicInteger原子的加上delta值并返加之前的value |
public final int incrementAndGet() |
对AtomicInteger原子的加1并返回加1后的值 |
public final int decrementAndGet() |
对AtomicInteger原子的减1并返回减1后的值 |
public final int addAndGet(int delta) |
给AtomicInteger原子的加上指定的delta值并返回加后的值 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| import java.util.concurrent.atomic.AtomicInteger;
class Shared { static AtomicInteger ai = new AtomicInteger(0); }
class AtomThread implements Runnable { String name; AtomThread(String n) { name = n; new Thread(this).start(); } public void run() { System.out.println("Starting " + name); for(int i = 1; i <= 3; i ++) { System.out.println(name + " got: " + Shared.ai.incrementAndGet()); } } }
public class LearnAtomic1 { public static void main(String args[]) { new AtomThread("A"); new AtomThread("B"); new AtomThread("C"); } }
|