博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
比AtomicLong更高效的并发计数类LongAdder
阅读量:2441 次
发布时间:2019-05-10

本文共 8466 字,大约阅读时间需要 28 分钟。

最近在看(轻量级的流量控制、熔断降级 Java 库)源码的时候,看到在统计数量的时候使用了LongAdder。这个LongAdder是jdk1.8新增的,出自Doug Lea之手,伟大的Java并发大师的鼻祖。在没有接触到LongAdder之前,AtomicLong这个类在并发计数上无论性能还是准确性已经做得极好了。在阿里流控框架中使用这样一个LongAdder类,必然有其过人之处。

回顾AtomicLong

AtomicLong是通过CAS(即Compare And Swap)原理来完成原子递增递减操作,在并发情况下不会出现线程不安全结果。AtomicLong中的value是使用volatile修饰,并发下各个线程对value最新值均可见。我们以incrementAndGet()方法来深入。

public final long incrementAndGet() {
return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L; }

这里是调用了unsafe的方法

public final long getAndAddLong(Object var1, long var2, long var4) {
long var6; do {
var6 = this.getLongVolatile(var1, var2); } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4)); return var6; }

方法中this.compareAndSwapLong()有4个参数,var1是需要修改的类对象,var2是需要修改的字段的内存地址,var6是修改前字段的值,var6+var4是修改后字段的值。compareAndSwapLong只有该字段实际值和var6值相当的时候,才可以成功设置其为var6+var4。

multiple-thread-cas-show1

再继续往深一层去看

public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);

这里Unsafe.compareAndSwapLong是native方法,底层通过JNI(Java Native Interface)来完成调用,实际就是借助C来调用CPU指令来完成。

实现中使用了do-while循环,如果CAS失败,则会继续重试,直到成功为止。并发特别高的时候,虽然这里可能会有很多次循环,但是还是可以保证线程安全的。不过如果自旋CAS操作长时间不成功,竞争较大,会带CPU带来极大的开销,占用更多的执行资源,可能会影响其他主业务的计算等。

LongAdder怎么优化AtomicLong

Doug Lea在jdk1.5的时候就针对HashMap进行了线程安全和并发性能的优化,推出了分段锁实现的ConcurrentHashMap。一般Java面试,基本上离不开ConcurrentHashMap这个网红问题。另外在ForkJoinPool中,Doug Lea在其工作窃取算法上对WorkQueue使用了细粒度锁来较少并发的竞争,更多细节可参考我的原创文章。如果已经对ConcurrentHashMap有了较为深刻的理解,那么现在来看LongAdder的实现就会相对简单了。

来看LongAdder的increase()方法实现,

public void add(long x) {
Cell[] as; long b, v; int m; Cell a; //第一个if进行了两个判断,(1)如果cells不为空,则直接进入第二个if语句中。(2)同样会先使用cas指令来尝试add,如果成功则直接返回。如果失败则说明存在竞争,需要重新add if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); } }

这里用到了Cell类对象,Cell对象是LongAdder高并发实现的关键。在casBase冲突严重的时候,就会去创建Cell对象并添加到cells中,下面会详细分析。

@sun.misc.Contended static final class Cell {
volatile long value; Cell(long x) {
value = x; } //提供CAS方法修改当前Cell对象上的value final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe(); Class
ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value")); } catch (Exception e) {
throw new Error(e); } } }

而这一句a = as[getProbe() & m]其实就是通过getProbe()拿到当前Thread的threadLocalRandomProbe的probe Hash值。这个值其实是一个随机值,这个随机值由当前线程ThreadLocalRandom.current()产生。不用Rondom的原因是因为这里已经是高并发了,多线程情况下Rondom会极大可能得到同一个随机值。因此这里使用threadLocalRandomProbe在高并发时会更加随机,减少冲突。更多ThreadLocalRandom信息想要深入了解可关注这篇文章。拿到as数组中当前线程的Cell对象,然后再进行CAS的更新操作,我们在源码上进行分析。longAccumulate()是在父类Striped64.java中。

multiple-thread-cas-show2.png

final void longAccumulate(long x, LongBinaryOperator fn,                              boolean wasUncontended) {
int h; if ((h = getProbe()) == 0) {
//如果当前线程的随机数为0,则初始化随机数 ThreadLocalRandom.current(); // force initialization h = getProbe(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty for (;;) {
Cell[] as; Cell a; int n; long v; //如果当前cells数组不为空 if ((as = cells) != null && (n = as.length) > 0) {
//如果线程随机数对应的cells对应数组下标的Cell元素不为空, if ((a = as[(n - 1) & h]) == null) {
//当使用到LongAdder的Cell数组相关的操作时,需要先获取全局的cellsBusy的锁,才可以进行相关操作。如果当前有其他线程的使用,则放弃这一步,继续for循环重试。 if (cellsBusy == 0) {
// Try to attach new Cell //Cell的初始值是x,创建完毕则说明已经加上 Cell r = new Cell(x); // Optimistically create //casCellsBusy获取锁,cellsBusy通过CAS方式获取锁,当成功设置cellsBusy为1时,则获取到锁。 if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false; try {
// Recheck under lock Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) {
rs[j] = r; created = true; } } finally {
//finally里面释放锁 cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } collide = false; } else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash //如果a不为空,则对a进行cas增x操作,成功则返回 else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; //cells的长度n已经大于CPU数量,则继续扩容没有意义,因此直接标记为不冲突 else if (n >= NCPU || cells != as) collide = false; // At max size or stale else if (!collide) collide = true; //到这一步则说明a不为空但是a上进行CAS操作也有多个线程在竞争,因此需要扩容cells数组,其长度为原长度的2倍 else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) {
// Expand table unless stale Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally {
cellsBusy = 0; } collide = false; continue; // Retry with expanded table } //继续使用新的随机数,避免在同一个Cell上竞争 h = advanceProbe(h); } //如果cells为空,则需要先创建Cell数组。初始长度为2.(个人理解这个if放在前面会比较好一点,哈哈) else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false; try {
// Initialize table if (cells == as) {
Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally {
cellsBusy = 0; } if (init) break; } //如果在a上竞争失败,且扩容竞争也失败了,则在casBase上尝试增加数量 else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base } }

最后是求LongAdder的总数,这一步就非常简单了,把base的值和所有cells上的value值加起来就是总数了。

public long sum() {        Cell[] as = cells; Cell a;        long sum = base;        if (as != null) {            for (int i = 0; i < as.length; ++i) {                if ((a = as[i]) != null)                    sum += a.value;            }        }        return sum;    }

思考

源码中Cell数组的会控制在不超过NCPU的两倍,原因是LongAdder其实在底层是依赖CPU的CAS指令来操作,如果多出太多,即使在代码层面没有竞争,在底层CPU的竞争会更多,所以这里会有一个数量的限制。所以在LongAdder的设计中,由于使用到CAS指令操作,瓶颈在于CPU上。

YY一下,那么有没有方式可以突破这个瓶颈呢?我个人觉得是有的,但是有前提条件,应用场景极其有限。基于ThreadLocal的设计,假设统计只在一个固定的线程池中进行,假设线程池中的线程不会销毁(异常补充线程的就暂时不管了),则可以认为线程数量是固定且不变的,那么统计则可以依赖于只在当前线程中进行,那么即使是高并发,就转化为ThreadLocal这种单线程操作了,完全可以摆脱CAS的CPU指令操作的限制,那么性能将极大提升。

总结

在并发处理上,AtomicLong和LongAdder均具有各自优势,需要怎么使用还是得看使用场景。看完这篇文章,其实并不意味着LongAdder就一定比AtomicLong好使,个人认为在QPS统计等统计操作上,LongAdder会更加适合,而AtomicLong在自增控制方面是LongAdder无法代替的。在多数地并发和少数高并发情况下,AtomicLong和LongAdder性能上差异并不是很大,只有在并发极高的时候,才能真正体现LongAdder的优势。

转载地址:http://fhnqb.baihongyu.com/

你可能感兴趣的文章
wordpress当发布器_当我们与专家讨论WordPress时发生了什么
查看>>
css后代选择器与子选择器_后代选择器(CSS选择器)
查看>>
ppt修复演示文稿_使用WImpress建立惊人的演示文稿
查看>>
如何以正确的方式在您的WordPress网站上安装jQuery Mobile
查看>>
css选择器除了第一个_:第一个孩子(CSS选择器)
查看>>
html中的href属性_href(HTML属性)
查看>>
css3超链接文本样式_CSS3:文本样式和其他基础
查看>>
wordpress入门主题_WordPress儿童主题入门
查看>>
WordPress 3.5的新功能
查看>>
wordpress插件开发_使用免费WordPress插件的开发人员指南
查看>>
wordpress模板_如何在15分钟内定制WordPress模板
查看>>
wordpress插件开发_WordPress插件开发人员的10个必知技能
查看>>
joomla数据库表结构_WordPress v Joomla:简介和内容结构
查看>>
wordpress插件_在五分钟或更短的时间内设计一个WordPress插件
查看>>
wordpress配置_如何在WordPress中配置自动更新
查看>>
在WordPress中添加丰富的代码段支持并控制您的个人品牌
查看>>
SitePoint播客#155:South By Southwest的会议和CodePoet
查看>>
wordpress插件开发_WordPress插件开发
查看>>
PHPMaster:PHP.INI之旅
查看>>
wordpress插件_适用于图形设计师和摄影师的WordPress插件
查看>>