標籤:

SQLAlchemy入門和進階

目錄:

  1. SQLAlchemy 簡介

  2. 橫向對比

  3. 核心概念與入門

    1. 模型定義

    2. 複雜查詢

    3. 基礎性能
  4. 擴展與進階

    1. 事件

    2. 反射

    3. Python3.x asyncio擴展

    4. 分片Session

    5. 自定義的列類型

    6. 混合(hybrid)屬性

    7. 序列化Query

    8. Baked Query

    9. 多態與關係

(知乎沒有自動目錄和側邊欄懸浮呢。。惆悵)

在新團隊里做的技術分享,過了一段時間,整理下來並且有了新的想法。似乎入門級的教程在知乎更受歡迎?

SQLAlchemy 簡介

SQLAlchemy 是一個功能強大的Python ORM 工具包,口碑不錯,社區活躍也較為開放

提供 全功能的SQL和ORM操作 本次附贈的文件(這裡放不上來,也懶得放gayhub了,總之很簡單的,單元測試多一些,一下午搞定):

connect.py :底層的資料庫連接

orm.py :模型定義的樣例

example_test.py :單元測試,實質上可以對應業務的具體使用

python3_test.py :展示Python3 asyncio下的SQLAlchemy

分別建立python2/3的虛擬環境,然後安裝對應的requirements.txt即可

無論什麼語言,無論什麼庫,做一個ORM實現,至少應當實現完全語義化的資料庫操作,使得操作資料庫表就像在操作對象。

完整的ORM應當可以完全避免SQL拼接

為什麼需要ORM

當時分享完畢之後,也確實很多同事表示還是喜歡裸SQL,我後來也又在工作中看到了很多遺留代碼的問題。我也正好趁浴室迷思 想了一下,為什麼我需要ORM呢?

第一條來自一個定理:

一切由人直接來保證安全性的系統,就一定會出錯

拼接SQL、把SQL做成模板、開始使用ORM、封裝出DAO層,幾乎是每個項目的共識吧?

過往的項目中,由我第一手寫的,都會第一時間加入ORM,畢竟也只是兩三個小文件,一百行以內的事情(後續由於封裝的增多,可能會到達數百行)

這段時間在寫舊系統的小規模重構(定理2:一個好程序猿應當友好地幫前人擦好屁股,而不是靠重新製造一個新屁股實現),拼接字元串並沒有帶來任何優點,反而引入了非常簡單的注入漏洞,簡單的設想這樣一個列表API的場景:

  1. 根據請求參數控制對應的:過濾條件、排序方法、翻頁
  2. 根據需要預取關聯的表,JOIN並把對一對多的關係化為一個list

第一條,剛一上手,就發現滿地的string format,翻頁用了:

order_sql = "ORDER BY {} {}".format(order_by,direction)

毫無疑問的order_by=id%3Bselect+1%3B-- 就直接注入了

要解決這些在SQL拼接的問題,除了表單驗證,毫無疑問需要做一個SQL字元轉義,另外在能用SQL參數的地方,需要用參數(然後也得注意拼接時候參數的個數,是的,這裡我們的介面有另一個BUG,參數數量沒數對)

第二個功能點,想像一下在需要的地方額外加一句LEFT JOIN,然後對結果再做額外的解析

還有一些附屬功能:單元測試如何建表?代碼里遍地的硬編碼表名如何解決?

自己不是不能實現,但自己來實現這些,就走上了發明ORM的老路,用一個成熟的、文檔豐富的ORM,豈不美哉?

橫向對比

簡單的挑了三個

(知乎的表格似乎智障,不插入表格了)

SQLAlchemy、Peewee、Django ORM

Django ORM一直就不是一個全功能的ORM,會發現你想寫的SQL幾乎無法通過ORM寫出來,當然raw屬於tan90,使用裸SQL不在我們的考慮範圍。Django 1.12後提供了一些subquery等各類豐富SQL操作,但這麼新,估計還極少項目在這麼新的版本

Peewee如果有興趣可以後續繼續使用來感受一下,Peewee也是一個功能全面的ORM,star很多但開發沒有SQLAlchemy活躍

核心概念與入門

官方文檔

我總是在想為什麼團隊里很多人會覺得SQLAlchemy入門門檻高,我曾經也被困擾過,但回頭一看會發現的概念實質比較簡單。

官方文檔的脈絡不太清晰,要掃過一遍並且學以致用才能感受得到。example很友好的!

回過頭來看它的從教程到API的文檔,會發現它的文檔非常詳細,學會它,除了學會了Python操作SQL的一個庫,同樣也可以學到從代碼組織、各類Pythonic技巧到思想的很多東西

總的感受是:上手還算容易,精通要花很多功夫,但確實還挺有趣的

先放一個表,待會我們會繼續講

(再次損失一個表)

概念很少,並且很清晰,理解這些概念之後的後續使用時,基本可以感受到:你能直覺想到的操作,還確實都有(比如subquery、複雜查詢的構造)

模型定義

我們來看看他如何完成模型定義:

# coding=utf-8from __future__ import unicode_literals, absolute_importfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, DateTimeModelBase = declarative_base() #<-元類class User(ModelBase): __tablename__ = "auth_user" id = Column(Integer, primary_key=True) date_joined = Column(DateTime) username = Column(String(length=30)) password = Column(String(length=128))

從這裡可以看到,模型定義甚至與資料庫是無關的,所以允許不同的資料庫後端,不同類型擁有不同的表現形式和建表語句

這裡我們可以看到它實現了 ORM與資料庫連接的解耦,一些資料庫後端不支持的數據類型,例如Numeric類型,在sqlite中不支持,不過SQLAlchemy也能做一些兼容使用普通浮點

Model 等同於資料庫的一張表

Column 顯然就是這張表的一列

PS: SQLAlchemy 1.2之後才支持comment注釋,以在ddl產生建表SQL時寫上comment屬性,1.2還在beta版里,所以還不能用。。。我倒很好奇為毛這個feature這麼不重要

with get_session() as session: session.add(User(username="asd", password="asd")) session.add(User(username="qwe", password="qwe")) session.commit()

session(會話)的概念,可以看成一個管理資料庫持久連接的對象,在此下面是完全透明的連接池和事務等東西

get_session底下configure可以控制auto_commit參數,= False時寫操作默認都不放在事務里,SQLAlchemy默認為True

session.add函數將會把Model加入當前的持久空間(可以從session.dirty看到),直到commit時更新

with get_session() as session: # <class sqlalchemy.orm.query.Query> session.query(User)

最簡單的這個查詢返回了一個Query對象

需要注意的是,這裡只構造Query,事實上並沒有發送至資料庫進行查詢,只會在Query.get()、Query.all()、Query.one()以及Query.__iter__等具有「執行」語義的函數,才會真的去獲取

Query :本質上是數據表的若干行

  1. 在查詢情況的下,等同於SQL 中的 SELECT Syntax
  2. 在update函數的操作時,可以根據參數選擇等同於直接UPDATE users SET xxx WHERE name=xxx或者先用SELECT 選出ID,再循環用UPDATE xxx WHERE id=xxx
  3. delete同上

以SQLAlchemy為代表的ORM基本都支持鏈式操作。

形如:

with get_session() as session: # <class sqlalchemy.orm.query.Query> query = (session .query(User) .filter(User.username == "asd") .filter_by(username="asd") #上面兩個都是添加where .join(Addreess)#使用ForeignKey .join(Addreess,Addreess.user_id==User.id)#使用顯式聲明 .limit(10) .offset(0) )

所有Query支持的詳情見Query API文檔

上面也涉及到一個特別有意思的filter函數:User.username == "asd" ,實際上是SQLAlchemy重載了Column上的各種運算符 __eq__、__ge__,返回了一個BinaryExpression對象,看起來就更加符合直覺上的語義

複雜查詢

基於Query的subquery

with get_session() as session: # <class sqlalchemy.orm.query.Query> query = (session .query(User.id) .filter(User.username == "asd") .filter_by(username="asd") .limit(10) ) subquery = query.subquery() query2 = session.query(User).filter( User.id.in_(subquery) ) print query2#<-列印展開成的SQL,此處沒有SQL查詢

理解了Query、Column的概念,也很容易自行構造出這樣的SQL

所有在Column級別上的使用 詳見Column API文檔

上面我們提到了直接對Query進行的刪除:

with get_session() as session: query = (session .query(User) .filter(User.username == "asd") .filter_by(username="asd") .join(Addreess) .join(Addreess,Addreess.user_id=User.id) .limit(10) .delete()#<-這裡 )

另外,因為Model也可以被放進session里,然後刪除的,和插入是個反向操作:

with get_session() as session: instance = session.query(User).get(1) session.delete(instance) #下一句執行:DELETE FROM auth_user WHERE auth_user.id = ? session.commit()

改首先是上述Query中所說的update方法:

with get_session() as session: # get by id query = (session .query(User) .filter_by(id=1) .update({"username": User.username + "a"}, synchronize_session=False) )

然後是在Model級別的方法:

with get_session() as session: # get by id user = (session .query(User) .get(1) ) user.password = "zxcv" # UPDATE auth_user SET password=? # WHERE auth_user.id = ? session.commit()

在對Model的屬性進行修改的時候,session會得到修改對應的內容,下次commit即會提交SQL

這裡留個思考題:如果對1、同一對象的同一屬性進行修改,2、同一對象的不同屬性進行修改 ,最終會有幾個SQL被發出? 如果你來實現這樣的功能,你會從哪裡下手?

基礎性能

SQLAlchemy性能

比較了十萬條記錄插入的性能

另外不要覺得比sqlite 裸SQL慢三倍很慢,注意這個量級,實際項目中會發現慢查詢、不規範操作(例如for循環里放查詢)的危害比引入ORM的這點開銷打多了

總結

到這再貼上面那個概念表,應該就能比較好的理解了

在用裸SQL可以解決的場景下,上述的SQLAlchemy入門部分就足以掌控場景,完成所有的增刪查改API需求(甚至自動生成代碼的需求),自動生成真是偷懶無止境。。不過發明新的DSL嘛,能不做就不做。。

擴展與進階

從過往的經驗來看,SQLAlchemy以優雅的直覺實現了諸多介面,並保留了良好的可擴展性,這裡拋磚引玉一些有趣的特性

事件

應用層的觸發器(trigger),支持:

  1. ConnectionEvents 包括Connection和Engine(連接後進行一些自檢操作)
  2. DDLEvents 模型增刪查改事件
  3. DialectEvents 不同種類的資料庫的事件
  4. PoolEvents 連接池事件,連接的檢出和回收等

上面的性能測試里就使用了兩種事件

from sqlalchemy import eventfrom sqlalchemy.engine import Engineimport timeimport logginglogging.basicConfig()logger = logging.getLogger("myapp.sqltime")logger.setLevel(logging.DEBUG)@event.listens_for(Engine, "before_cursor_execute")def before_cursor_execute(conn, cursor, statement, parameters, context, executemany): conn.info.setdefault(query_start_time, []).append(time.time()) logger.debug("Start Query: %s", statement)@event.listens_for(Engine, "after_cursor_execute")def after_cursor_execute(conn, cursor, statement, parameters, context, executemany): total = time.time() - conn.info[query_start_time].pop(-1) logger.debug("Query Complete!") logger.debug("Total Time: %f", total)

反射

現有項目或者別人的代碼里如果已經用其他的方式寫好了表定義,不想再定義Model了,想用SQLAlchemy直接使用對應的資料庫表

查文檔關鍵字:Automap

from sqlalchemy.ext.automap import automap_basefrom sqlalchemy.orm import Sessionfrom sqlalchemy import create_engineBase = automap_base()# engine, suppose it has two tables user and address set upengine = create_engine("sqlite:///mydatabase.db")# reflect the tablesBase.prepare(engine, reflect=True)tables = Base.classes#<-load tablesUser = Base.classes.userAddress = Base.classes.address# rudimentary relationships are producedsession.add(Address(email_address="foo@bar.com", user=User(name="foo")))session.commit()# collection-based relationships are by default named# "<classname>_collection"print (u1.address_collection)

擴展閱讀:DeferredReflection

我之前在一些OLAP應用 用來做數據分析時用到過。。

Python3.x asyncio擴展

16年12月 Python3.6進入穩定期,同時也標誌著Python3.4和3.5中的asyncio模塊進入穩定期

SQLAlchemy對asyncio的支持在於,它實質上可以在engine層進行擴展,同時擴展Engine、Connection、Transaction、Context 代碼量約400行

Strategies for creating new instances of Engine types. These are semi-private implementation classes which provide the underlying behavior for the "strategy" keyword argument available on :func:~sqlalchemy.engine.create_engine. Current available options are plain, threadlocal, and mock. New strategies can be added via new EngineStrategy classes. """

形如:

from sqlalchemy.engine.strategies import DefaultEngineStrategyfrom .engine import AsyncioEngineASYNCIO_STRATEGY = _asyncioclass AsyncioEngineStrategy(DefaultEngineStrategy): name = ASYNCIO_STRATEGY engine_cls = AsyncioEngineAsyncioEngineStrategy() async def main(): engine = create_engine( # In-memory sqlite database cannot be accessed from different # threads, use file. sqlite:///test.db, strategy=ASYNCIO_STRATEGY ) metadata = MetaData() users = Table( users, metadata, Column(id, Integer, primary_key=True), Column(name, Text), ) # Create the table await engine.execute(CreateTable(users)) conn = await engine.connect()

另外提一嘴的是:asyncio不是銀彈,會導致應用層壓力直接傳給DB,會掩蓋應用的SQL寫的爛的問題

分片Session

讀寫分離是當資料庫壓力到達一定階段時,由應用層進行的拆分資料庫壓力的措施

實現一種主從分離的Session:

  1. 最簡單的方案是直接擴展Session類get_bind方法

get_bind(mapper=None, clause=None)

Return a 「bind」 to which this Session is bound. Note that the 「mapper」 argument is usually present when Session.get_bind() is called via an ORM operation such as a Session.query(), each individual INSERT/UPDATE/DELETE operation within a Session.flush(), call, etc.

  1. 也可以使用sqlalchemy.ext.horizontal_shard模塊中已經實現好的ShardedSession

Parameters:

  • shard_chooser – A callable which, passed a Mapper, a mapped instance, and possibly a SQL clause, returns a shard ID. This id may be based off of the attributes present within the object, or on some round-robin scheme.
  • id_chooser – A callable, passed a query and a tuple of identity values, which should return a list of shard ids where the ID might reside.
  • query_chooser – For a given Query, returns the list of shard_ids where the query should be issued.
  • shards – A dictionary of string shard names to Engine objects.

允許根據model或者SQL條件、ID選擇具體的資料庫連接。一個未經驗證的腦洞:因為shards是Engine的dict,那麼是否允許在異構資料庫之間使用Shard?這樣會帶來什麼樣的優缺點?

自定義的列類型

很久很久以前做的功能了,想像一個這樣的場景:

  • Postgresql支持IP/CIDR的存儲,本質上是使用4*8bit=32bit的int存儲
  • Mysql此時並沒有這樣簡單的IP存儲 如何對其進行擴展?

自定義實現的列類型實質上需要:

  1. 指定在某種資料庫方言下的存儲類型,例如Mysql下使用int
  2. 實現兩個方法:從資料庫中取出來一個python對象和把Python對象放入資料庫
  3. 按需需要實現:支持一些操作符(例如==,in_)

from sqlalchemy import typesclass MyIPType(types.TypeDecorator): impl = types.Integer def process_bind_param(self, value, dialect): #from python to database if dialect=="mysql": pass return #.... def process_result_value(self, value, dialect): #from database to python object return #...

我們也可以在awesome-sqlalchemy中找到一些有趣的類型擴展

混合(hybrid)屬性

我們常見使用Python的property修飾器來構造一個複雜屬性,SQLAlchemy中,這個混合屬性的作用也類似,不僅可以用於獲得對應的值,也可以用於Query時的鏈式操作

定義一個Model後,可以在各類增刪查改中用到這個混合屬性。混合屬性 混合在:既是一個Python屬性,也是一個可以放入資料庫查詢的屬性

class Interval(Base): __tablename__ = interval id = Column(Integer, primary_key=True) start = Column(Integer, nullable=False) end = Column(Integer, nullable=False) def __init__(self, start, end): self.start = start self.end = end @hybrid_property def length(self): return self.end - self.start #下面這個寫著玩的。。 @length.setter def length(self, value): self._value = value>>> i1 = Interval(5, 10)>>> i1.length5>>> print Session().query(Interval).filter_by(length=5)SELECT interval.id AS interval_id, interval.start AS interval_start,interval."end" AS interval_endFROM intervalWHERE interval."end" - interval.start = :param_1

上述還有一個寫著玩兒的setter,hybrid_property支持:

  1. comparator 擴展Interval.length在各種比較符(><=)的行為
  2. deleter/setter 顧名思義
  3. expression 可以擴展最後展開的SQL表達式,例如展開成SUM(xxx):

from sqlalchemy.orm import func #下面這個寫著玩的。。 @length.expression def length(self, expr): return func.sum(self.end, expr)

序列化Query

提供一個介面,以序列化和反序列化Query,用於跨系統、微服務的場景

from sqlalchemy.ext.serializer import loads, dumpsmetadata = MetaData(bind=some_engine)Session = scoped_session(sessionmaker())# ... define mappersquery = Session.query(User). filter(User.somedata==foo).order_by(User.sortkey)# pickle the queryserialized = dumps(query)# unpickle. Pass in metadata + scoped_session # 上面提到過的 query和Session實際上是密不可分的query2 = loads(serialized, metadata, Session)print query2.all()

這個做起來其實就非常帶感了,微服務之間的必要條件就是各種dump,結合一下celery,實現一個去中心的HTTP服務也是不在話下

Baked Query

緩存從Query生成的SQL,以減少生成時間,實際上是個應用層面的存儲過程、View

from sqlalchemy.ext import bakedbakery = baked.bakery()#<-創建了一個LRUfrom sqlalchemy import bindparamdef search_for_user(session, username, email=None): baked_query = bakery(lambda session: session.query(User)) baked_query += lambda q: q.filter(User.name == bindparam(username)) baked_query += lambda q: q.order_by(User.id) if email: baked_query += lambda q: q.filter(User.email == bindparam(email)) result = baked_query(session).params(username=username, email=email).all() return result

上面說到了SQLAlchemy展開成SQL的性能問題,真的特別擔憂的話,再來一個緩存綁定參數如何?

多態和關係

使用多個模型,但實際上只是操作一張資料庫表 此處基本略,之前寫過一篇文章了:這兒

class Employee(Base): __tablename__ = employee id = Column(Integer, primary_key=True) name = Column(String(50)) type = Column(String(50)) __mapper_args__ = { polymorphic_identity:employee, polymorphic_on:type }

這裡定義了僱員Employee 模型,指定type欄位為多態所在欄位,並且對於這個模型,當type欄位為employee時,即為一個僱員

一對一、一對多、多對多的關係和自動收集成collection,這裡不會細說,relationship函數的各種參數留待大家遊玩。

關係間的收集有多種lazy方式,可以選擇在父類讀取時直接JOIN或者Subquery,也可以在需要的時候使用Query.option設置。說起來的篇幅會更長,我投個懶,大家去讀文檔吧~hfgl


推薦閱讀:

Python 在電氣工程及其自動化上有哪些應用?
為什麼 x in range(1000000000000001) 的執行速度這麼快
python多線程下載,進度條顯示問題如何解決?
scrapy 知乎用戶信息爬蟲
Python 中列表和元組有哪些區別?

TAG:Python |