阿里雲的分散式事務中間件是什麼實現的?

阿里雲最近新推出了個 分散式事務中間件

TXC 簡介_TXC for EDAS_二方服務_企業級分散式應用服務 EDAS-阿里雲

,客戶端jar混淆了,看了半天,沒什麼進展,向各位大神提問,希望有好的思路,後續我也會貼出我個人的一些想法,希望集大成,自己做一個開源。


首先聲明,本人不接受任何因本回答的追責。如您不同意本條款,請當沒看見下面的內容。

由於txc-client-2.0.2.3.jar 使用的混淆技術,所以打開就是這個樣子。

通過點開 com.a.a.a.b.a.a 類可以發現

基本上可以斷定 ,com.a.a.a xxx 包下是一些sql解析的代碼。

由於他提供的dome 中有個 tx_undo_log 表的建表語句 所以,通過搜索

發現了

這個類處理後 信息為

[
["0001", "diamond.getconfig 執行錯誤", "檢查與diamond的連接"],
["0002", "在diamond 中獲取的資料庫key為空", "檢查diamond上的數據"],
["0003", "在diamond 中取出取出的group信息為空", "檢查diamond上的數據"],
["0004", "netty線程池滿", "請在配置文件中調整線程數, corePoolSize 的值調大一些"],
["0005", "mt模式資源管理器未定義", ""],
["0006", "bean name為%s的實例未在spring上下文中定義", ""],
["0007", "diamond 寫入錯誤,mt服務中止", ""],
["0101", "無法連接伺服器", "請檢查txc server是否啟動,到txc server的網路連接是否正常"],
["0102", "register client app name failed", "請檢查txc server是否啟動,到txc server的網路連接是否正常"],
["0103", "txcConnection closed", "網路斷開,請檢查到對端(client 或txc server)的網路連接"],
["0104", "dispatch 錯誤", "網路處理錯誤,請檢查到對端(client 或txc server)的網路連接 "],
["0105", "on message 錯誤", "網路處理錯誤,請檢查到對端(client 或txc server)的網路連接 "],
["0201", "表 txc_undo_log 不存在", "檢查txc server連接資料庫的表結構"],
["0202", "sql 類型錯誤,應為 delete update insert 之一", "sql語句不支持,請修改"],
["0203", "資料庫表不存在", "請檢查資料庫表"],
["0204", "同步真實表錯誤", "請查看錶結構是否修改"],
["0205", "獲取表Meta失敗", "請檢查資料庫表"],
["0206", "DRDS全局提交失敗", "請檢查資料庫表"],
["0207", "DRDS全局回滾失敗", "請檢查資料庫表"],
["0301", "sleep 時出錯", ""],
["0302", "創建隨機數失敗", ""],
["0303", "client 未註冊appname", ""],
["0304", "註冊RM失敗", ""],
["0305", "sql cache 出錯", ""],
["0306", "事物檢查錯誤", ""],
["0307", "開啟rt分支錯誤", ""],
["0308", "處理分支回滾錯誤", ""],
["0309", "處理分支提交錯誤", ""],
["0310", "刪除分支日誌錯誤", ""],
["0311", "插入分支日誌錯誤", ""],
["0312", "更新分支日誌錯誤", ""],
["0313", "獲取分支日誌錯誤", ""],
["0314", "分支回滾邏輯錯誤", ""],
["0315", "更新全局日誌錯誤", ""],
["0316", "同步全局回滾錯誤", ""],
["0317", "刪除全局日誌錯誤", ""],
["0318", "異常", ""],
["0319", "server 發送請求失敗", ""],
["0320", "server發送響應錯誤", ""],
["0321", "插入全局日誌錯誤", ""],
["0322", "轉儲TXC事務日誌錯誤", ""],
["0323", "主備節點同步錯誤", ""],
["0324", "報警日誌錯誤", ""],
["0325", "診斷性錯誤", ""],
["0326", "查詢讀鎖錯誤", ""],
["0327", "保存高警日誌錯誤", ""],
["0328", "寫TXC日誌錯誤", "請排查SQL"],
["0401", "Txc切換分組失敗", "請檢查diamond"],
["0501", "Txc Insert 解析主鍵錯誤", "請檢查SQL"],
["9999", "未預期異常", ""],
["10000", "unknown error", ""]
]

//涉及到的表 TXC_MT_JOURNEL txc_undo_log

所以認定 在事務回滾過程中 有用到txc_undo_log

然後再 com.taobao.txc.resourcemanager.a.e 類中 發現了對於 txc_undo_log 的方法,雖然

表名被當成參數傳人 ,在靜態塊中初始化 ,此處有疑問,為什麼這麼做呢,是考慮到以後會改表名么? 果然 阿里就是叼

static {
a = LoggerInit.logger;
e.b = "txc_undo_log";
e.c = 0;
}

com.taobao.txc.resourcemanager.b.b 類中也有類似的代碼

猜測txc_undo_log 應該是在 參與事務的所有庫中都有這張表

然後我在 com.taobao.txc.common.c.F 類下發現了如下代碼

感覺非常像平常寫 rpc調用時 用於反序列化的,的類映射,所以就,一個個點開來看,處理到了如下信息, 感謝寫代碼的童鞋,重載了toString 方法啊 ,有兩個類沒重載我就 沒法子了

//com.taobao.txc.common.c.F 包下內容

1 : "BeginMessage timeout:" + "this.a" + " appname:" +" this.b "+ " txcInst:" +" this.c";
2 : "BeginResultMessage result:" +" this.a "+ " xid:" + "this.c";
3 : "BranchCommitMessage " +" sb.append(",size:").append(this.b.size()).append(" DBname:").append(this.e).append(",appName:").append(this.d).append(",commitMode:").append(this.g).append(",udata:").append(this.h).append(",rtsql:").append(this.f).toString();"
4 : "BranchCommitResultMessage" + ;
5 : "BranchRollbackMessage" + "DBname:" + this.e + ",appName:" + this.d + ",commitMode:" + this.f + ",isDelLock:" + this.g + ",udata:" + this.h;
6 : "BranchRollbackResultMessage" + "result:" + this.a;
7 : "GlobalCommitMessage tranId:" + "this.a";
8 : "GlobalCommitResultMessage tranId:" + this.c + ",result:" + this.a + ",msg:" + this.b();
9 : "GlobalRollbackMessage tranId:" + this.a;;
10 : "GlobalRollbackResultMessage tranId:" + this.c + ",result:" + this.a + ",msg:" + this.b();;
11 : "RegisterMessage key:" + this.b + " tranId:" + this.a + " Commit mode:" + this.d + " business key:" + this.c;;
12 : "RegisterResultMessage result:" + this.a + ",tranId:" + this.c + ",branchId:" + this.d;
13 : "ReportStatusMessage this.a + :" + this.b + " ReportStatusMessage:" + this.c + ",key:" + this.d;
14 : "ReportStatusResultMessage branchId:" + this.c + ",result:" + this.a;
15 : "BeginRetryBranchMessage dbName:" + this.b + ",Commit mode:" + this.c + ",effectiveTime:" + this.a + ",sql:" + this.d;
16 : "BeginRetryBranchResultMessage result:" + this.a + ",xid:" + this.c + ",branchId:" + this.d
17 : "ReportUdataMessage " +this.a + ":" + this.b + " ReportUdataMessage udata:" + this.d;
18 : "ReportUdataResultMessage result:" + this.a;
19 : "TxcMergeMessage "
20 : "TxcMergeResultMessage ";
21 : "QueryLockMessage tranId:" + this.a + ",key:" + this.b + ",business key:" + this.c;
22 : "QueryLockResultMessage result:" + this.a + ",message:" + this.b();
101 : "com.taobao.txc.rpc.impl.RegisterClientAppNameMessage");
102 : "com.taobao.txc.rpc.impl.RegisterClientAppNameResultMessage");
103 : "com.taobao.txc.rpc.impl.RegisterRmMessage");
104 : "com.taobao.txc.rpc.impl.RegisterRmResultMessage");
105 : "com.taobao.txc.cluster.message.RegisterClusterNodeMessage");
106 : "com.taobao.txc.cluster.message.RegisterClusterNodeResultMessage");
107 : "com.taobao.txc.cluster.message.ClusterBranchMessage");
108 : "com.taobao.txc.cluster.message.ClusterBranchResultMessage");
109 : "com.taobao.txc.cluster.message.ClusterGlobalMessage");
110 : "com.taobao.txc.cluster.message.ClusterGlobalResultMessage");
111 : "com.taobao.txc.cluster.message.ClusterSyncMessage");
112 : "com.taobao.txc.cluster.message.ClusterSyncResultMessage");
113 : "com.taobao.txc.message.ClusterDumpMessage");
114 : "com.taobao.txc.message.ClusterDumpResultMessage");
115 : "com.taobao.txc.cluster.message.ClusterMergeMessage");
116 : "com.taobao.txc.cluster.message.ClusterMergeResultMessage");
117 : "com.taobao.txc.cluster.message.ClusterQueryLockMessage");
118 : "com.taobao.txc.cluster.message.ClusterQueryLockResultMessage");
119 : "com.taobao.txc.cluster.message.ClusterAlarmMessage");
120 : "com.taobao.txc.cluster.message.ClusterAlarmResultMessage");
121 : com.taobao.txc.common.c.t;
122 : com.taobao.txc.common.c.u;
123 : "com.taobao.txc.cluster.message.ClusterBkupMessage");
124 : "com.taobao.txc.cluster.message.ClusterBkupResultMessage");

由此然後 通過調用其中一些類的構造方法,發現是在 netty 的hander 裡面進行的創建,所以我覺得 這裡應該是實現了一個小型 netty的 rpc調用 服務

有如下代碼

package com.taobao.txc.client;

import java.util.*;
import java.util.concurrent.*;
import com.taobao.txc.a.b.*;
import com.taobao.txc.common.a.*;
import com.taobao.txc.a.a.*;
import com.taobao.txc.resourcemanager.a.*;
import com.taobao.txc.resourcemanager.a.a.*;
import com.taobao.txc.resourcemanager.*;
import com.taobao.txc.resourcemanager.mt.*;
import com.taobao.txc.common.*;

public class a
{
private static final LoggerWrap b;
private static a c;
private String d;
private String e;
private com.taobao.txc.client.a.a f;
public final ThreadPoolExecutor a;

public static a a(final String s, final String s2, final int n, final Set& set, final String s3, final String s4, final int n2) {
if (a.c == null) {
synchronized (a.class) {
if (a.c == null) {
a.c = new a(s, s2, n, set, s3, s4, n2);
}
}
}
return a.c;
}

private a(String property, String property2, int int1, final Set& set, final String d, final String e, final int n) {
this.f = null;
this.a = new ThreadPoolExecutor(32, 200, 500L, TimeUnit.SECONDS, new LinkedBlockingQueue&(20000), new ThreadPoolExecutor.CallerRunsPolicy());
if (System.getProperty("txc.appname") != null) {
property = System.getProperty("txc.appname");
}
if (System.getProperty("txc.servergroup") != null) {
property2 = System.getProperty("txc.servergroup");
}
if (System.getProperty("txc.mode") != null) {
int1 = Integer.parseInt(System.getProperty("txc.mode"));
}
this.d = d;
this.e = e;
this.a(property, property2, int1, set, n);
}

public void a(final String s, final String s2, final int n, final Set& set, final int n2) {
System.out.println("client mode:" + n + " [1:AT 2:MT 3:ATMT 4:RT 5:ATRT 6:MTRT 7:ATMTRT]");
System.out.println("txcAppName:" + s);
System.out.println("txcServerGroup:" + s2);
final g b = g.b(this.a);
b.g(s2);
b.a(s);
b.a(n2);
b.a(set);
b.e(this.a());
b.f(this.b());
b.b();
o.a(s2);
com.taobao.txc.client.a.b.info("RpcClient inited. client mode:" + n + " txcAppName:" + s + " txcServerGroup:" + s2);
final com.taobao.txc.client.a.a.a c = com.taobao.txc.client.a.a.a.c();
c.a(b);
c.d();
this.f = c;
com.taobao.txc.client.a.b.info("TxcTransactionManager inited");
i i = null;
if ((0x1 n) &> 0) {
final c a = com.taobao.txc.resourcemanager.c.a(this.a);
a.c(s);
a.b();
com.taobao.txc.client.a.b.info("RmRpcClient inited");
final j c2 = j.c();
c2.a(a);
c2.a(new e());
c2.d();
com.taobao.txc.client.a.b.info("TxcResourceManager inited");
i = new i();
i.a(a);
i.a(c2);
i.a();
com.taobao.txc.client.a.b.info("AT TxcRMMessageListener inited");
}
if ((0x2 n) &> 0) {
final com.taobao.txc.resourcemanager.mt.c b2 = com.taobao.txc.resourcemanager.mt.c.b(this.a);
b2.c(s);
b2.b();
com.taobao.txc.client.a.b.info("MtRmRpcClient inited");
final MtResourceManager txcResourceManager = MtResourceManager.getTxcResourceManager();
txcResourceManager.a(b2);
txcResourceManager.c();
com.taobao.txc.client.a.b.info("MtResourceManager inited");
if (i == null) {
i = new i();
}
i.b(b2);
i.a(txcResourceManager);
i.a();
com.taobao.txc.client.a.b.info("MT TxcRMMessageListener inited");
}
if ((0x4 n) &> 0) {
final com.taobao.txc.resourcemanager.d.b a2 = com.taobao.txc.resourcemanager.d.b.a(this.a);
a2.g(s2);
a2.a(s);
a2.b();
com.taobao.txc.client.a.b.info("RtRpcClient inited");
final com.taobao.txc.resourcemanager.d.a h = com.taobao.txc.resourcemanager.d.a.h();
h.a(a2);
h.d();
com.taobao.txc.client.a.b.info("RtResourceManager inited");
if (i == null) {
i = new i();
}
i.a(a2);
i.a(h);
i.a();
com.taobao.txc.client.a.b.info("RT TxcRMMessageListener inited");
}
}

public String a() {
if (this.d == null || this.d.isEmpty()) {
this.d = System.getProperty("txc.accesskey");
}
return this.d;
}

public String b() {
if (this.e == null || this.e.isEmpty()) {
this.e = System.getProperty("txc.secretkey");
}
return this.e;
}

static {
b = LoggerInit.logger;
a.c = null;
}
}

com.taobao.txc.common.TxcContext 類中是存儲了一些事務上下文的東西

TxcTable 感覺是存儲了些 ddl 語句的影響行數據。用於做 提交不一致的回滾

package com.taobao.txc.resourcemanager.a.d;

import com.taobao.txc.resourcemanager.b.a.*;
import com.taobao.txc.parser.a.b.a.*;
import java.sql.*;
import com.taobao.txc.parser.struct.*;

public class c& extends a&
{
public c(final com.taobao.txc.resourcemanager.b.a.a a, final d d, final com.taobao.txc.resourcemanager.b.b.a.a& a2, final com.taobao.txc.parser.a.b.a.b b) {
super(a, d, a2, b);
com.taobao.txc.resourcemanager.a.b.a(b, a, d);
}

@Override
public T b(final Object... array) {
final com.taobao.txc.parser.a.b.a.b c = this.c();
final com.taobao.txc.resourcemanager.b.a.a a = this.a();
final d b = this.b();
final com.taobao.txc.resourcemanager.b.b.a.a& d = this.d();
T t = d.b(0);
final Statement c2 = b.c();
final TxcRuntimeContext a2 = a.a((String)null);
a.setAutoCommit(false);
try {
final String string = c.d() + c.a(c2) + " FOR UPDATE";
c.b(c.a(b, string));
t = d.b(c2, array);
c.c(c.b(b, string));
a2.a(c);
a.commit();
}
catch (Throwable t2) {
a.rollback();
throw new SQLException(t2);
}
finally {
a.setAutoCommit(true);
}
return t;
}

@Override
public T c(final Object... array) {
final com.taobao.txc.parser.a.b.a.b c = this.c();
final com.taobao.txc.resourcemanager.b.a.a a = this.a();
final d b = this.b();
final com.taobao.txc.resourcemanager.b.b.a.a& d = this.d();
d.b(0);
final Statement c2 = b.c();
final TxcRuntimeContext c3 = a.c();
T b2;
try {
final String string = c.d() + c.a(c2) + " FOR UPDATE";
c.b(c.a(b, string));
b2 = d.b(c2, array);
c.c(c.b(b, string));
c3.a(c);
}
catch (Throwable t) {
throw new SQLException(t);
}
return b2;
}
}

此類 使用了 FOR UPDATE 語句,感覺是作為提交階段,先提交者 鎖住此此事務更改的參與表,防止 被其他事務改寫的情況。

好了,後續我還會回答一個 該事務中間件的實現猜想,希望拋磚引玉,能藉助大家的力量弄懂該組件。

----------------------------------------------------------------

----------------------------------------------下面是我的猜想----------------------------------------------------

所以我的大致猜想是這樣的,如果不對歡迎拍磚

1. TXC 是基於 rocketmq ,Netty 來實現的。

2. 實現了一個 以rocketmq 和 操作庫 的 tx_undo_log 做日誌存儲 ,Netty RPC 做,事務控制的 二階段提交協議。

3. 在事務提交階段,會同時開啟一個事務,鎖住 分散式事務的參與記錄,等待事務合併後釋放。或者有節點提交錯誤時回滾。

一個事務的處理流程我認為可能是這樣的

1. 啟動之後 啟動RPC 連接上事務控制服務

2. 包裝了 DataSource ,和 jdbc 的 Connection ,在進行 DML 操作之前,先使用 解析到對應表的 ,使用 SELECT xxx from xx for update 語句 構造 undo 語句。

操作完了後,發送一個消息,通知事務控制服務,可以本事務可以提交了。

3. 判斷自己外部 是否還有事務(即是自己是不是最外層事務)。如果自己不是最外層事務,則返回(不阻塞線程), 如果自己是最外層事務,則 等待事務 提交(當前線程阻塞).

4. (另外的線程)Rpc 接受到 事務控制器 提交事務命令後。獲取合併事務。如果成功(柱塞住參與事務的表記錄),回復成功消息,失敗回復失敗消息(回滾,不再鎖住表記錄)。

5. 根據事務提交情況,控制器會 下發,完成事務,或者回滾事務的命令,由RPC線程處理, 如果成功則釋放,鎖住的表記錄,事務完成, 最外層業務線程的阻塞返回結束,

如果失敗,則執行 uodo 日誌裡面的sql ,回滾操作,釋放表記錄, 最外層業務線程阻塞返回拋出 事務異常。

現在沒想明白的問題:

1. 在事務提交階段,提交後表記錄,的鎖已經釋放,怎麼樣才能保證該行數據能被後續的鎖操作鎖住,業務在大量事務操作的時候,可能記錄先被其他的事務搶到了。

我把我的猜想跟我同事說了後他覺得這個 做法有點像

ebay經典的BASE (basically available, soft state, eventually consistent)方案

希望各位大神能提出看法

附上 客戶端jar包

下載地址

http://txc-console.oss-cn-beijing.aliyuncs.com/sdk/txc-client-2.0.18.jar?spm=5176.doc43161.2.2.mIkVrsfile=txc-client-2.0.18.jar


ACID包括很多內容。

上面說的原子性的部分,簡單說就是要麼全部成功,要麼全部失敗。

不過ACID還有其他約束。比如一致讀。在事務提交之前,其他session不能讀到未提交的數據。猜測內部有個排隊機,在所有的讀之前,查看自己的session處於版本的哪個位置,然後嘗試去讀取過去版本的數據,也許就是讀取undo部分的數據,不過oracle的undo本身是不開放的訪問……

不管是mysql,還是oracle都有基於mvcc的多版本鎖,過去版本的數據只在內部的undo才能找到,應該不是有一張臨時表,然後讀取記錄臨時表中的數據就可以做到的。

目前看到的資料很少,不知道實現的acid到什麼地步,也沒有多版本機制。只有測試下才能確定。


我對txc sql以及 架構上做了一些分析,可以參考

https://pan.baidu.com/s/1hr8t97u 歡迎大家指正


是否是遵循了TCC協議?


題主 小心被請去吃月餅^O^


推薦閱讀:

TAG:分散式系統 | 分散式事務 |