PG(PostgreSQL)在一捅到底架構中的使用

本想隨便說說一捅到底架構,沒想到內容還挺多的,要寫三篇文章才能蓋住全部內容……

傳送門:

一捅到底的架構

用Vert.x實現Serverless架構

先說一下傳統JEE裡面,信息流在不同階段的格式。

一般是這樣的:

  1. 首先瀏覽器用Form表單提交數據,數據是Post請求的Body,格式大概是key1=value1&key2=value2&key3=value3...這種形式提交到伺服器;
  2. 軟體伺服器比如Tomcat收到之後,將其解析,並封裝成一個POJO吧,也就是一個普通的Java對象,其每一個屬性名,對應著表單裡面的每一個key,值就是屬性值,隨後將這個POJO轉交至Service組件(Component),如果這裡有EJB就更麻煩,需要做一層RMI調用;
  3. Service組件拿到這個POJO之後,就去找DAO,或者EJB裡面的Entity Bean(JPA的前身),不管是哪一個,我們統一稱之為Persistence吧,Persistence拿到之後呢,根據各種配置文件,比如XML或者DAO裡面的Annotation,做映射,轉換成JDBC的Prepared Statement,底層就是SQL,然後轉交給資料庫執行。

看這個流程,數據的格式經過了多次轉換,Form -> POJO -> ORM -> Table,沒有一個相同的,甚至在Service內部,還會有更多層次,每個層次甚至可能會有自己的DTO之類的定義,會做更多層的轉換,這些轉換非常繁瑣,我們經常在一些JEE項目中可以看到,一個Table有幾十個甚至上百個欄位,可想而知這個轉換起來的效率以及軟體的維護成本有多高了,最後的情況就是開發人員大多數時間都浪費在對這些欄位上,而軟體運行的大部分時間也都用在了這些數據格式的轉換上。

從本質上說,從面向對象的角度出發思考,無論是Form還是POJO還是Table裡面的每一條記錄,難道不都是對象嗎?

既然都是對象,我們是不是有辦法統一起來呢?最先的改變發生在前端,自從Restful Web Service提出之後,瀏覽器和軟體伺服器之間的通信,可以藉助Post等Http請求的Body,直接將JSON數據格式予以提交,這樣我們就在Form表單以外多了一個JSON數據格式的選擇。

其次在Service這一層,各個語言也開始出現對於JSON數據格式的支持,這些支持可能是語言層面的,也有可能是類庫,但是不管是哪一種,在Service開發上,也能夠支持JSON數據格式鳥。

最後是持久層,新興的NoSQL資料庫很多都將JSON作為其存儲格式,但是我們並不想完全放棄傳統的RDBMS,因為我們曾在資料庫上做過不少投資,員工在大學期間學過資料庫的課程,工作期間也有過資料庫的使用經驗,所以如果革命性地將存儲轉換到不怎麼支持join和事務的NoSQL產品上,會面臨著比較大的阻力,尤其是涉及到金錢交易,賬戶信息這些敏感數據,精度要求高,不使用事務等特性會有比較大的風險,所以還是寄希望於傳統資料庫RDBMS自身的改進,最好是在原有的基礎之上,新增JSON數據格式的支持。

那答案就是:PG!

PG在9.4版本之後,新增了JSONB數據類型的支持,所謂的JSONB其實就是JSON,只不過之前已經有過一個JSON數據格式了,但是該格式並不支持對JSON做相應的操作,比如建索引等,JSONB支持資料庫對其中的Field建索引,可使用SQL語句查詢JSON中具體的數據,同時PG支持gin索引,就是Inverted Index Table,針對結構混亂的數據格式,例如JSON,TEXT可用該演算法提升查找效率,同時PG還提供了非同步介面,Vert.x用戶可使用以下非同步客戶端以提升執行效率:

Vert.x MySQL / PostgreSQL client

mauricio/postgresql-async

當瀏覽器,軟體伺服器以及資料庫都齊刷刷地支持JSON之後,我們就能輕鬆地將之前混亂的數據格式統一起來,統一用JSON通信,這樣每次操作就可以從瀏覽器一直捅到資料庫,再結合我們之前說過的基於服務的架構,一些簡單的服務,用一個Verticle就可以完成全部的CRUD 操作了,是不是很方便呢?

PG對比MySQL和MongoDB在性能上也有明顯的優勢,有興趣的可自行測試,懶得做了……

最後給出Future寫法的AsyncClientVerticle的例子,從代碼風格上說,跟Node.js的Promises以及Java自身的CompletableFuture類似,供參考:

import io.vertx.core.AbstractVerticle;nimport io.vertx.core.Future;nimport io.vertx.core.json.JsonArray;nimport io.vertx.core.json.JsonObject;nimport io.vertx.ext.asyncsql.AsyncSQLClient;nimport io.vertx.ext.asyncsql.PostgreSQLClient;nimport io.vertx.ext.sql.ResultSet;nimport io.vertx.ext.sql.SQLConnection;nimport io.vertx.ext.sql.UpdateResult;nnpublic class AsyncClientVerticle extends AbstractVerticle {nn AsyncSQLClient postgreSQLClient;nn public static final int SUCCESS = 1;n public static final int FAILURE = 0;n public static final int EXISTED = -1;n public static final int NOT_EXISTED = 2;nn public void start(){n postgreSQLClient = PostgreSQLClient.createShared(vertx, config());nn vertx.eventBus().<JsonObject>consumer(getClass().getName(), msg ->{n String method = msg.body().getString("method");n Future<SQLConnection> connectionFuture = Future.future();n Future<UpdateResult> updateResultFuture = Future.future();n Future<ResultSet> resultSetFuture = Future.future();nn postgreSQLClient.getConnection(connectionFuture);n connectionFuture.setHandler(asyncResult ->{n if(asyncResult.succeeded()&&(method==null||method.equals("create"))){n JsonArray params = new JsonArray().add(msg.body().getString("account"))n .add(msg.body().getString("password"));n asyncResult.result().updateWithParams("INSERT INTO ACCOUNT(ACCOUNT,PASSWORD) VALUES (?,?)", params,updateResultFuture);n }else if(asyncResult.succeeded()&&method.equals("read")){n JsonArray params = new JsonArray().add(msg.body().getString("account"))n .add(msg.body().getString("password"));n asyncResult.result().queryWithParams("SELECT * FROM ACCOUNT WHERE ACCOUNT = ? AND PASSWORD = ?", params,resultSetFuture);n }else{n msg.reply(FAILURE);n }n });nn updateResultFuture.setHandler(asyncResult->{n if(asyncResult.succeeded()){n msg.reply(SUCCESS);n }else if(asyncResult.cause().getMessage().contains("duplicate key")){n msg.reply(EXISTED);n }else{n msg.reply(FAILURE);n }n connectionFuture.result().close();n });nn resultSetFuture.setHandler(asyncResult ->{n if(asyncResult.succeeded()&&asyncResult.result().getNumRows()>0){n msg.reply(SUCCESS);n }else{n msg.reply(FAILURE);n }n connectionFuture.result().close();n });n });n }nn}n

UnitTest,在測試資料庫的時候,將連接池設置為1,可有助於發現未釋放的連接等常見錯誤:

import io.vertx.core.CompositeFuture;nimport io.vertx.core.DeploymentOptions;nimport io.vertx.core.Future;nimport io.vertx.core.Vertx;nimport io.vertx.core.json.JsonObject;nimport io.vertx.ext.unit.Async;nimport io.vertx.ext.unit.TestContext;nimport io.vertx.ext.unit.junit.VertxUnitRunner;nimport org.junit.*;nimport org.junit.runner.RunWith;nn@RunWith(VertxUnitRunner.class)npublic class AsyncClientVerticleTest {n private static Vertx vertx;nn @BeforeClassn public static void setUp(TestContext context) {n vertx = Vertx.vertx();n DeploymentOptions asyncClientVerticleOptions = new DeploymentOptions()n .setConfig(new JsonObject().put("host", "127.0.0.1")n .put("username","myname")n .put("password","123456")n .put("database", "dbname")n .put("maxPoolSize",1));n vertx.deployVerticle(AsyncClientVerticle.class.getName(),asyncClientVerticleOptions,context.asyncAssertSuccess());n }nn @AfterClassn public static void tearDown(TestContext context) {n vertx.close(context.asyncAssertSuccess());n }nn @Testn public void testRegister(TestContext context) throws Exception {n final Async async = context.async();n Future<String> future1 = Future.future();n Future<String> future2 = Future.future();n vertx.eventBus().<Integer>send(AsyncClientVerticle.class.getName(),new JsonObject().put("account","chen"+Math.random()).put("password","123456"), ar ->{n if(ar.succeeded()){n context.assertEquals(ar.result().body(),AsyncClientVerticle.SUCCESS);n future1.complete();n }else{n future1.fail(ar.cause());n }n });n vertx.eventBus().<Integer>send(AsyncClientVerticle.class.getName(),new JsonObject().put("account","chen").put("password","123456"), ar ->{n if(ar.succeeded()){n context.assertEquals(ar.result().body(),AsyncClientVerticle.EXISTED);n future2.complete();n }else{n future2.fail(ar.cause());n }n });n CompositeFuture.join(future1,future2).setHandler(ar ->{n async.complete();n });n }nn @Testn public void testLogin(TestContext context) throws Exception {n final Async async = context.async();n Future<String> future1 = Future.future();n Future<String> future2 = Future.future();n vertx.eventBus().<Integer>send(AsyncClientVerticle.class.getName(),new JsonObject()n .put("method","read").put("account","chen").put("password","123456"), ar ->{n if(ar.succeeded()){n context.assertEquals(ar.result().body(),AsyncClientVerticle.SUCCESS);n future1.complete();n }else{n future1.fail(ar.cause());n }n });n vertx.eventBus().<Integer>send(AsyncClientVerticle.class.getName(),new JsonObject()n .put("method","read").put("account","chen").put("password","654321"), ar ->{n if(ar.succeeded()){n context.assertEquals(ar.result().body(),AsyncClientVerticle.FAILURE);n future2.complete();n }else{n future2.fail(ar.cause());n }n });n CompositeFuture.join(future1,future2).setHandler(ar ->{n async.complete();n });n }n}n

推薦閱讀:

PostgreSQL 在國內公司應用的多嗎?

TAG:PostgreSQL | 数据库 | jackson编程 |