AtomicLong與LongAdder性能對比

AtomicLong與LongAdder性能對比

來自專欄計算機技術1 人贊了文章

Hystrix需要根據過去一段時間內失敗的請求次數來判斷是否打開熔斷開關,所以它會維護一個時間窗口,並不斷向該窗口中累加失敗請求次數,在多線程環境下一般會使用AtomicLong,但是Hystrix中使用的是LongAdder。查了一下,發現在Hystrix,Guava,JDK8中都有這個類,應該是Java8中才加到標準庫中,其他庫要兼容老版本只能自己複製一份了。Hystrix和Java8中的LongAdder具體實現有細微差別,不過整體思路是一樣的,下面的分析都是以jdk為準的。

為什麼Hystrix使用LongAdder而不是AtomicLong呢?在LongAdder的Java doc中有

This class is usually preferable to AtomicLong when multiple threads update a common sum that is used for purposes such as collecting statistics, not for fine-grained synchronization control. Under low update contention, the two classes have similar characteristics. But under high contention, expected throughput of this class is significantly higher, at the expense of higher space consumption.

在存在高度競爭的條件下,LongAdder的性能會遠遠好於AtomicLong,不過會消耗更多空間。高度競爭當然是指在多線程條件下。

我們知道AtomicLong是通過cas來更新值的,按理說是很快的,LongAdder為什麼會比它更快,是還有其他什麼更快的手段嗎?先不管這些,直接實驗一下,看是不是真的更快。

1. 性能對比

public class TestAtomic { private static final int TASK_NUM = 1000; private static final int INCREMENT_PER_TASK = 10000; private static final int REPEAT = 10; private static long l = 0; public static void main(String[] args) throws Exception { repeatWithStatics(REPEAT, () -> testAtomicLong()); repeatWithStatics(REPEAT, () -> testLongAdder()); repeatWithStatics(REPEAT, () -> testLong()); } public static void testAtomicLong() { AtomicLong al = new AtomicLong(0); execute(TASK_NUM, () -> repeat(INCREMENT_PER_TASK, () -> al.incrementAndGet())); } public static void testLong() { l = 0; execute(TASK_NUM, () -> repeat(INCREMENT_PER_TASK, () -> l++)); } public static void testLongAdder() { LongAdder adder = new LongAdder(); execute(TASK_NUM, () -> repeat(INCREMENT_PER_TASK, () -> adder.add(1))); } public static void repeatWithStatics(int n, Runnable runnable) { long[] elapseds = new long[n]; ntimes(n).forEach(x -> { long start = System.currentTimeMillis(); runnable.run(); long end = System.currentTimeMillis(); elapseds[x] = end - start; }); System.out.printf("total: %d, %s
"
, Arrays.stream(elapseds).sum(), Arrays.toString(elapseds)); } private static void execute(int n, Runnable task) { try { CountDownLatch latch = new CountDownLatch(n); ExecutorService service = Executors.newFixedThreadPool(100); Runnable taskWrapper = () -> { task.run(); latch.countDown(); }; service.invokeAll(cloneTask(n, taskWrapper)); latch.await(); service.shutdown(); } catch (Exception e) {} } private static Collection<Callable<Void>> cloneTask(int n, Runnable task) { return ntimes(n).mapToObj(x -> new Callable<Void>() { @Override public Void call() throws Exception { task.run(); return null; } }).collect(Collectors.toList()); } private static void repeat(int n, Runnable runnable) { ntimes(n).forEach(x -> runnable.run()); } private static IntStream ntimes(int n) { return IntStream.range(0, n); } }

上面是用1000個並發任務,每個任務對數據累加10000次,每個實驗測試10次。

輸出:

total: 1939, [258, 196, 200, 174, 186, 178, 204, 189, 185, 169]

total: 613, [57, 45, 47, 53, 69, 61, 80, 67, 64, 70]

total: 1131, [85, 67, 77, 81, 280, 174, 108, 67, 99, 93]

從上往下依次是AtomicLong, LongAdder, long。

從結果能看到LongAdder確實性能高於AtomicLong,不過還有一個讓我非常吃驚的結果,就是LongAdder竟然比直接累加long還快(當然直接累加long最終得到的結果是錯誤的,因為沒有同步),這個有些反常識了,其實這裡涉及到了一些隱藏的問題,就是cache的false sharing,因為平時編程時不太會關注cache這些,所以碰到這個結果會出乎預料,詳細的解釋在後面的第三節。

2. LongAdder源碼分析

先來分析一下LongAdder為什麼會比AtomicLong快,是不是用到了什麼比cas還快的東西。

LongAdder的父類Striped64的注釋中已經將整個類的設計講的很清楚的了,類中主要維護兩個值,一個long型的base屬性,一個Cell數組,它們值的和才是真正的結果。Cell是對long的一個包裝,為什麼將long包裝起來,猜測有兩個原因:1)可以在類中添加padding數據,避免false sharing,2)包裝起來才好使用cas。

LongAdder.add的流程簡單描述就是,先嘗試通過cas修改base,成功則返回,失敗則根據當前線程hash值從Cell數組中選擇一個Cell,然後向Cell中add數據。Cell數組是動態增長的,並且是用時才初始化的,這是為了避免佔用過多空間。

看到注釋大概能猜到為什麼快了,LongAdder仍然用的cas,快是因為在高度競爭的條件下,對一個值進行修改,衝突的概率很高,需要不斷cas,導致時間浪費在循環上,如果將一個值拆分為多個值,分散壓力,那麼性能就會有所提高。

下面來看源碼,進入LongAdder的add方法:

public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); }}

上面先對base進行cas操作,然後判斷Cell數組是否為空,不為空則根據當前線程probe值(類似hash值)選擇Cell並進行cas,都不成功進入longAccumulate方法。

final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); // force initialization h = getProbe(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { // (1) if ((a = as[(n - 1) & h]) == null) { // (1.1) if (cellsBusy == 0) { // Try to attach new Cell Cell r = new Cell(x); // Optimistically create if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { // Recheck under lock Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } collide = false; } else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) // (1.2) break; else if (n >= NCPU || cells != as) collide = false; // At max size or stale else if (!collide) collide = true; else if (cellsBusy == 0 && casCellsBusy()) { // (1.3) try { if (cells == as) { // Expand table unless stale Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table } h = advanceProbe(h); } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // (2) boolean init = false; try { // Initialize table if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) // (3) break; // Fall back on using base }}

Cell數組不為空時進入分支(1),如果根據當前線程hash獲得的Cell為null,則進入(1.1)開始實例化該Cell,否則進入(1.2)對Cell進行cas,不成功的話表示衝突比較多,開始進入(1.3)對Cell數組擴容了,cellsBusy是用cas實現的一個spinlock;

Cell數組為空且獲取到cellsBusy時進入分支(2),開始初始化Cell數組;

分支(1)和(2)都進不去,沒辦法,只能再次對base進行cas。

上面只是對源碼做了粗略的分析,詳細的每個分支的含義我也不知道,不過這些我們都不需要較真去弄的非常清楚,畢竟世界上只有一個Doug Lea,我們只需要知道LongAdder是怎麼比AtomicLong快的就行,實際就是用多個long來分擔壓力,一群人到十個盤子里夾菜當然比到一個盤子里夾菜衝突小。

2.1 實現一個簡化版的LongAdder

知道了原理,那我們就自己來實現一個簡陋的LongAdder。

public class MyLong { private static final int LEN = 2 << 5; private AtomicLong[] atomicLongs = new AtomicLong[LEN]; public MyLong() { for (int i = 0; i < LEN; ++i) { atomicLongs[i] = new AtomicLong(0); } } public void add(long l) { atomicLongs[hash(Thread.currentThread()) & (LEN - 1)].addAndGet(l); } public void increment() { add(1); } public long get() { return Arrays.stream(atomicLongs).mapToLong(al -> al.get()).sum(); } // 從HashMap里抄過來的 private static final int hash(Object key) { int h; return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16); }}

在最上面的TestAtomic類中加上方法:

public static void main(String[] args) throws Exception { repeatWithStatics(REPEAT, () -> testAtomicLong()); repeatWithStatics(REPEAT, () -> testLongAdder()); repeatWithStatics(REPEAT, () -> testLong()); repeatWithStatics(REPEAT, () -> testMyLong());} public static void testMyLong() { MyLong myLong = new MyLong(); execute(TASK_NUM, () -> repeat(INCREMENT_PER_TASK, () -> myLong.increment()));}

輸出:

total: 1907, [176, 211, 192, 182, 195, 173, 199, 229, 184, 166]total: 641, [67, 50, 45, 53, 73, 58, 80, 63, 69, 83]total: 947, [90, 82, 70, 72, 87, 78, 136, 107, 77, 148]total: 670, [81, 80, 73, 67, 57, 94, 62, 49, 57, 50]

可以看到性能比AtomicLong好多了。

3. 為什麼LongAdder比直接累加long還快

前面解釋了LongAdder比AtomicLong快,但是為什麼它還會比long快?解答這個問題之前要先介紹偽共享的概念。

3.1 偽共享(false sharing)

在計算機中定址是以位元組為單位,但是cache從內存中複製數據是以行為單位的,一個行會包含多個位元組,一般為64位元組,每個CPU有自己的L1、L2 cache,現在有兩個變數x、y在同一行中,如果CPU1修改x,緩存一致性要求數據修改需要馬上反應到其他對應副本上,CPU2 cache對應行重新刷新,然後CPU2才能訪問y,如果CPU1一直修改x,CPU2一直訪問y,那麼CPU2得一直等到cache刷新後才能訪問y,帶來性能下降,產生這個問題的原因有兩方面:1)x、y位於同一行,2)兩個CPU會頻繁的訪問這兩個數據,如果這兩個條件其中一個不成立,那就不會產生問題。更多關於偽共享的概念參考偽共享(False Sharing)和(false sharing(wiki)。

3.1.1 解決辦法

既然這個問題出現了,那肯定是有解決辦法的。一般就是添加padding數據,來將x、y隔開,讓它們不會位於同一行中。

Java中的話,在Java7之前需要手動添加padding數據,後來JEP 142提案提出應該為程序員提供某種方式來標明哪些欄位是會存在緩存競爭的,並且虛擬機能夠根據這些標識來避免這些欄位位於同一行中,程序員不用再手動填充padding數據。

@Contended就是應JEP 142而生的,在欄位或類上標準該註解,就表示編譯器或虛擬機需要在這些數據周圍添加padding數據。Java8的偽共享和緩存行填充--@Contended注釋中詳細解釋了@Contended註解的使用方法,在百度或者谷歌上搜索 jep 142 site:mail.openjdk.java.net 能找到很多@Contended相關資料。

下面實驗一下來觀察false sharing:

public class TestContended { private static int NCPU = Runtime.getRuntime().availableProcessors(); private static ForkJoinPool POOL = new ForkJoinPool(NCPU); private static int INCREMENT_PER_TASK = 1000000; private static final int REPEAT = 10; private static long l = 0; private static long l1 = 0; private static long l2 = 0; private static long cl1 = 0; private static volatile long q0, q1, q2, q3, q4, q5, q6; private static long cl2 = 0; public static void main(String[] args) { repeatWithStatics(REPEAT, () -> testLongWithSingleThread()); repeatWithStatics(REPEAT, () -> testLong()); repeatWithStatics(REPEAT, () -> testTwoLong()); repeatWithStatics(REPEAT, () -> testTwoContendedLong()); } public static void testLongWithSingleThread() { repeat(2 * INCREMENT_PER_TASK, () -> l++); } public static void testLong() { asyncExecute2Task(() -> repeat(INCREMENT_PER_TASK, () -> l++), () -> repeat(INCREMENT_PER_TASK, () -> l++)); } public static void testTwoLong() { asyncExecute2Task(() -> repeat(INCREMENT_PER_TASK, () -> l1++), () -> repeat(INCREMENT_PER_TASK, () -> l2++)); } public static void testTwoContendedLong() { asyncExecute2Task(() -> repeat(INCREMENT_PER_TASK, () -> cl1++), () -> repeat(INCREMENT_PER_TASK, () -> cl2++)); } public static void repeatWithStatics(int n, Runnable runnable) { long[] elapseds = new long[n]; ntimes(n).forEach(x -> { long start = System.currentTimeMillis(); runnable.run(); long end = System.currentTimeMillis(); elapseds[x] = end - start; }); System.out.printf("total: %d, %s
"
, Arrays.stream(elapseds).sum(), Arrays.toString(elapseds)); } private static void asyncExecute2Task(Runnable task1, Runnable task2) { try { CompletableFuture.runAsync(task1, POOL) .thenCombine(CompletableFuture.runAsync(task2, POOL), (r1, r2) -> 0).get(); } catch (Exception e) {} } private static void repeat(int n, Runnable runnable) { ntimes(n).forEach(x -> runnable.run()); } private static IntStream ntimes(int n) { return IntStream.range(0, n); }}

不知道為什麼我用不了@Contended註解,即使啟動參數加上-XX:-RestrictContended也不行,所以只能手工添加padding數據。目前緩存行大小一般為64位元組(也可以通過CPUZ來查看),也就是填充7個long就可以將兩個long型數據隔離在兩個緩存行中了。

輸出:

total: 16, [9, 5, 1, 0, 0, 0, 0, 1, 0, 0]total: 232, [35, 35, 33, 24, 25, 23, 13, 15, 15, 14]total: 148, [17, 15, 14, 16, 14, 15, 13, 17, 12, 15]total: 94, [8, 8, 8, 8, 15, 9, 10, 11, 8, 9]

從上往下依次為:1)單線程累加一個long;2)兩個線程累加一個long;3)兩個線程累加兩個long,這兩個long位於同一緩存行中;4)兩個線程累加兩個long,且它們位於不同緩存行中。

從上面的結果看,padding還是很有效的。結果2相比於1,不僅會有線程切換代價還會有false sharing問題,對於純計算型任務線程個數不要超過CPU個數。不過有一點想不通的是,結果2和3為什麼差距這麼大。

以上轉自公司同事「yuanzhongcheng」的分享


推薦閱讀:

TAG:計算機 | 計算機科學 | 編程 |