SQLAlchemy入門和進階
- SQLAlchemy 簡介
- 橫向對比
- 核心概念與入門
- 模型定義
- 增
- 查
- 複雜查詢
- 刪
- 改
- 基礎性能
- 擴展與進階
- 事件
- 反射
- Python3.x asyncio擴展
- 分片Session
- 自定義的列類型
- 混合(hybrid)屬性
- 序列化Query
- Baked Query
- 多態與關係
在新團隊里做的技術分享,過了一段時間,整理下來並且有了新的想法。似乎入門級的教程在知乎更受歡迎?
SQLAlchemy 簡介
SQLAlchemy 是一個功能強大的Python ORM 工具包,口碑不錯,社區活躍也較為開放
提供 全功能的SQL和ORM操作 本次附贈的文件(這裡放不上來,也懶得放gayhub了,總之很簡單的,單元測試多一些,一下午搞定):connect.py :底層的資料庫連接orm.py :模型定義的樣例
example_test.py :單元測試,實質上可以對應業務的具體使用python3_test.py :展示Python3 asyncio下的SQLAlchemy分別建立python2/3的虛擬環境,然後安裝對應的requirements.txt即可
無論什麼語言,無論什麼庫,做一個ORM實現,至少應當實現完全語義化的資料庫操作,使得操作資料庫表就像在操作對象。
完整的ORM應當可以完全避免SQL拼接為什麼需要ORM
當時分享完畢之後,也確實很多同事表示還是喜歡裸SQL,我後來也又在工作中看到了很多遺留代碼的問題。我也正好趁浴室迷思 想了一下,為什麼我需要ORM呢?
第一條來自一個定理:
一切由人直接來保證安全性的系統,就一定會出錯
拼接SQL、把SQL做成模板、開始使用ORM、封裝出DAO層,幾乎是每個項目的共識吧?
過往的項目中,由我第一手寫的,都會第一時間加入ORM,畢竟也只是兩三個小文件,一百行以內的事情(後續由於封裝的增多,可能會到達數百行)
這段時間在寫舊系統的小規模重構(定理2:一個好程序猿應當友好地幫前人擦好屁股,而不是靠重新製造一個新屁股實現),拼接字元串並沒有帶來任何優點,反而引入了非常簡單的注入漏洞,簡單的設想這樣一個列表API的場景:
- 根據請求參數控制對應的:過濾條件、排序方法、翻頁
- 根據需要預取關聯的表,JOIN並把對一對多的關係化為一個list
第一條,剛一上手,就發現滿地的string format,翻頁用了:
order_sql = "ORDER BY {} {}".format(order_by,direction)
毫無疑問的order_by=id%3Bselect+1%3B-- 就直接注入了
要解決這些在SQL拼接的問題,除了表單驗證,毫無疑問需要做一個SQL字元轉義,另外在能用SQL參數的地方,需要用參數(然後也得注意拼接時候參數的個數,是的,這裡我們的介面有另一個BUG,參數數量沒數對)
第二個功能點,想像一下在需要的地方額外加一句LEFT JOIN,然後對結果再做額外的解析
還有一些附屬功能:單元測試如何建表?代碼里遍地的硬編碼表名如何解決?
自己不是不能實現,但自己來實現這些,就走上了發明ORM的老路,用一個成熟的、文檔豐富的ORM,豈不美哉?橫向對比
簡單的挑了三個
(知乎的表格似乎智障,不插入表格了)
SQLAlchemy、Peewee、Django ORM
Django ORM一直就不是一個全功能的ORM,會發現你想寫的SQL幾乎無法通過ORM寫出來,當然raw屬於tan90,使用裸SQL不在我們的考慮範圍。Django 1.12後提供了一些subquery等各類豐富SQL操作,但這麼新,估計還極少項目在這麼新的版本
Peewee如果有興趣可以後續繼續使用來感受一下,Peewee也是一個功能全面的ORM,star很多但開發沒有SQLAlchemy活躍
核心概念與入門
官方文檔
我總是在想為什麼團隊里很多人會覺得SQLAlchemy入門門檻高,我曾經也被困擾過,但回頭一看會發現的概念實質比較簡單。官方文檔的脈絡不太清晰,要掃過一遍並且學以致用才能感受得到。example很友好的!回過頭來看它的從教程到API的文檔,會發現它的文檔非常詳細,學會它,除了學會了Python操作SQL的一個庫,同樣也可以學到從代碼組織、各類Pythonic技巧到思想的很多東西總的感受是:上手還算容易,精通要花很多功夫,但確實還挺有趣的先放一個表,待會我們會繼續講
(再次損失一個表)
概念很少,並且很清晰,理解這些概念之後的後續使用時,基本可以感受到:你能直覺想到的操作,還確實都有(比如subquery、複雜查詢的構造)模型定義
我們來看看他如何完成模型定義:
# coding=utf-8from __future__ import unicode_literals, absolute_importfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String, DateTimeModelBase = declarative_base() #<-元類class User(ModelBase): __tablename__ = "auth_user" id = Column(Integer, primary_key=True) date_joined = Column(DateTime) username = Column(String(length=30)) password = Column(String(length=128))
從這裡可以看到,模型定義甚至與資料庫是無關的,所以允許不同的資料庫後端,不同類型擁有不同的表現形式和建表語句
這裡我們可以看到它實現了 ORM與資料庫連接的解耦,一些資料庫後端不支持的數據類型,例如Numeric類型,在sqlite中不支持,不過SQLAlchemy也能做一些兼容使用普通浮點
Model 等同於資料庫的一張表
Column 顯然就是這張表的一列PS: SQLAlchemy 1.2之後才支持comment注釋,以在ddl產生建表SQL時寫上comment屬性,1.2還在beta版里,所以還不能用。。。我倒很好奇為毛這個feature這麼不重要
增
with get_session() as session: session.add(User(username="asd", password="asd")) session.add(User(username="qwe", password="qwe")) session.commit()
session(會話)的概念,可以看成一個管理資料庫持久連接的對象,在此下面是完全透明的連接池和事務等東西
get_session底下configure可以控制auto_commit參數,= False時寫操作默認都不放在事務里,SQLAlchemy默認為True
session.add函數將會把Model加入當前的持久空間(可以從session.dirty看到),直到commit時更新
查
with get_session() as session: # <class sqlalchemy.orm.query.Query> session.query(User)
最簡單的這個查詢返回了一個Query對象
需要注意的是,這裡只構造Query,事實上並沒有發送至資料庫進行查詢,只會在Query.get()、Query.all()、Query.one()以及Query.__iter__等具有「執行」語義的函數,才會真的去獲取Query :本質上是數據表的若干行
- 在查詢情況的下,等同於SQL 中的 SELECT Syntax
- 在update函數的操作時,可以根據參數選擇等同於直接UPDATE users SET xxx WHERE name=xxx或者先用SELECT 選出ID,再循環用UPDATE xxx WHERE id=xxx
- delete同上
以SQLAlchemy為代表的ORM基本都支持鏈式操作。
形如:with get_session() as session: # <class sqlalchemy.orm.query.Query> query = (session .query(User) .filter(User.username == "asd") .filter_by(username="asd") #上面兩個都是添加where .join(Addreess)#使用ForeignKey .join(Addreess,Addreess.user_id==User.id)#使用顯式聲明 .limit(10) .offset(0) )
所有Query支持的詳情見Query API文檔
上面也涉及到一個特別有意思的filter函數:User.username == "asd" ,實際上是SQLAlchemy重載了Column上的各種運算符 __eq__、__ge__,返回了一個BinaryExpression對象,看起來就更加符合直覺上的語義
複雜查詢
基於Query的subquery
with get_session() as session: # <class sqlalchemy.orm.query.Query> query = (session .query(User.id) .filter(User.username == "asd") .filter_by(username="asd") .limit(10) ) subquery = query.subquery() query2 = session.query(User).filter( User.id.in_(subquery) ) print query2#<-列印展開成的SQL,此處沒有SQL查詢
理解了Query、Column的概念,也很容易自行構造出這樣的SQL
所有在Column級別上的使用 詳見Column API文檔
刪
上面我們提到了直接對Query進行的刪除:
with get_session() as session: query = (session .query(User) .filter(User.username == "asd") .filter_by(username="asd") .join(Addreess) .join(Addreess,Addreess.user_id=User.id) .limit(10) .delete()#<-這裡 )
另外,因為Model也可以被放進session里,然後刪除的,和插入是個反向操作:
with get_session() as session: instance = session.query(User).get(1) session.delete(instance) #下一句執行:DELETE FROM auth_user WHERE auth_user.id = ? session.commit()
改
改首先是上述Query中所說的update方法:
with get_session() as session: # get by id query = (session .query(User) .filter_by(id=1) .update({"username": User.username + "a"}, synchronize_session=False) )
然後是在Model級別的方法:
with get_session() as session: # get by id user = (session .query(User) .get(1) ) user.password = "zxcv" # UPDATE auth_user SET password=? # WHERE auth_user.id = ? session.commit()
在對Model的屬性進行修改的時候,session會得到修改對應的內容,下次commit即會提交SQL
這裡留個思考題:如果對1、同一對象的同一屬性進行修改,2、同一對象的不同屬性進行修改 ,最終會有幾個SQL被發出? 如果你來實現這樣的功能,你會從哪裡下手?基礎性能
SQLAlchemy性能
比較了十萬條記錄插入的性能
另外不要覺得比sqlite 裸SQL慢三倍很慢,注意這個量級,實際項目中會發現慢查詢、不規範操作(例如for循環里放查詢)的危害比引入ORM的這點開銷打多了
總結
到這再貼上面那個概念表,應該就能比較好的理解了
在用裸SQL可以解決的場景下,上述的SQLAlchemy入門部分就足以掌控場景,完成所有的增刪查改API需求(甚至自動生成代碼的需求),自動生成真是偷懶無止境。。不過發明新的DSL嘛,能不做就不做。。
擴展與進階
從過往的經驗來看,SQLAlchemy以優雅的直覺實現了諸多介面,並保留了良好的可擴展性,這裡拋磚引玉一些有趣的特性
事件
應用層的觸發器(trigger),支持:
- ConnectionEvents 包括Connection和Engine(連接後進行一些自檢操作)
- DDLEvents 模型增刪查改事件
- DialectEvents 不同種類的資料庫的事件
- PoolEvents 連接池事件,連接的檢出和回收等
上面的性能測試里就使用了兩種事件
from sqlalchemy import eventfrom sqlalchemy.engine import Engineimport timeimport logginglogging.basicConfig()logger = logging.getLogger("myapp.sqltime")logger.setLevel(logging.DEBUG)@event.listens_for(Engine, "before_cursor_execute")def before_cursor_execute(conn, cursor, statement, parameters, context, executemany): conn.info.setdefault(query_start_time, []).append(time.time()) logger.debug("Start Query: %s", statement)@event.listens_for(Engine, "after_cursor_execute")def after_cursor_execute(conn, cursor, statement, parameters, context, executemany): total = time.time() - conn.info[query_start_time].pop(-1) logger.debug("Query Complete!") logger.debug("Total Time: %f", total)
反射
現有項目或者別人的代碼里如果已經用其他的方式寫好了表定義,不想再定義Model了,想用SQLAlchemy直接使用對應的資料庫表
查文檔關鍵字:Automapfrom sqlalchemy.ext.automap import automap_basefrom sqlalchemy.orm import Sessionfrom sqlalchemy import create_engineBase = automap_base()# engine, suppose it has two tables user and address set upengine = create_engine("sqlite:///mydatabase.db")# reflect the tablesBase.prepare(engine, reflect=True)tables = Base.classes#<-load tablesUser = Base.classes.userAddress = Base.classes.address# rudimentary relationships are producedsession.add(Address(email_address="foo@bar.com", user=User(name="foo")))session.commit()# collection-based relationships are by default named# "<classname>_collection"print (u1.address_collection)
擴展閱讀:DeferredReflection
我之前在一些OLAP應用 用來做數據分析時用到過。。
Python3.x asyncio擴展
16年12月 Python3.6進入穩定期,同時也標誌著Python3.4和3.5中的asyncio模塊進入穩定期
SQLAlchemy對asyncio的支持在於,它實質上可以在engine層進行擴展,同時擴展Engine、Connection、Transaction、Context 代碼量約400行
Strategies for creating new instances of Engine types. These are semi-private implementation classes which provide the underlying behavior for the "strategy" keyword argument available on :func:~sqlalchemy.engine.create_engine. Current available options are plain, threadlocal, and mock. New strategies can be added via new EngineStrategy classes. """
形如:
from sqlalchemy.engine.strategies import DefaultEngineStrategyfrom .engine import AsyncioEngineASYNCIO_STRATEGY = _asyncioclass AsyncioEngineStrategy(DefaultEngineStrategy): name = ASYNCIO_STRATEGY engine_cls = AsyncioEngineAsyncioEngineStrategy() async def main(): engine = create_engine( # In-memory sqlite database cannot be accessed from different # threads, use file. sqlite:///test.db, strategy=ASYNCIO_STRATEGY ) metadata = MetaData() users = Table( users, metadata, Column(id, Integer, primary_key=True), Column(name, Text), ) # Create the table await engine.execute(CreateTable(users)) conn = await engine.connect()
另外提一嘴的是:asyncio不是銀彈,會導致應用層壓力直接傳給DB,會掩蓋應用的SQL寫的爛的問題
分片Session
讀寫分離是當資料庫壓力到達一定階段時,由應用層進行的拆分資料庫壓力的措施
實現一種主從分離的Session:- 最簡單的方案是直接擴展Session類get_bind方法
get_bind(mapper=None, clause=None)
Return a 「bind」 to which this Session is bound. Note that the 「mapper」 argument is usually present when Session.get_bind() is called via an ORM operation such as a Session.query(), each individual INSERT/UPDATE/DELETE operation within a Session.flush(), call, etc.
- 也可以使用sqlalchemy.ext.horizontal_shard模塊中已經實現好的ShardedSession
Parameters:
- shard_chooser – A callable which, passed a Mapper, a mapped instance, and possibly a SQL clause, returns a shard ID. This id may be based off of the attributes present within the object, or on some round-robin scheme.
- id_chooser – A callable, passed a query and a tuple of identity values, which should return a list of shard ids where the ID might reside.
- query_chooser – For a given Query, returns the list of shard_ids where the query should be issued.
- shards – A dictionary of string shard names to Engine objects.
允許根據model或者SQL條件、ID選擇具體的資料庫連接。一個未經驗證的腦洞:因為shards是Engine的dict,那麼是否允許在異構資料庫之間使用Shard?這樣會帶來什麼樣的優缺點?
自定義的列類型
很久很久以前做的功能了,想像一個這樣的場景:
- Postgresql支持IP/CIDR的存儲,本質上是使用4*8bit=32bit的int存儲
- Mysql此時並沒有這樣簡單的IP存儲 如何對其進行擴展?
自定義實現的列類型實質上需要:
- 指定在某種資料庫方言下的存儲類型,例如Mysql下使用int
- 實現兩個方法:從資料庫中取出來一個python對象和把Python對象放入資料庫
- 按需需要實現:支持一些操作符(例如==,in_)
from sqlalchemy import typesclass MyIPType(types.TypeDecorator): impl = types.Integer def process_bind_param(self, value, dialect): #from python to database if dialect=="mysql": pass return #.... def process_result_value(self, value, dialect): #from database to python object return #...
我們也可以在awesome-sqlalchemy中找到一些有趣的類型擴展
混合(hybrid)屬性
我們常見使用Python的property修飾器來構造一個複雜屬性,SQLAlchemy中,這個混合屬性的作用也類似,不僅可以用於獲得對應的值,也可以用於Query時的鏈式操作
定義一個Model後,可以在各類增刪查改中用到這個混合屬性。混合屬性 混合在:既是一個Python屬性,也是一個可以放入資料庫查詢的屬性
class Interval(Base): __tablename__ = interval id = Column(Integer, primary_key=True) start = Column(Integer, nullable=False) end = Column(Integer, nullable=False) def __init__(self, start, end): self.start = start self.end = end @hybrid_property def length(self): return self.end - self.start #下面這個寫著玩的。。 @length.setter def length(self, value): self._value = value>>> i1 = Interval(5, 10)>>> i1.length5>>> print Session().query(Interval).filter_by(length=5)SELECT interval.id AS interval_id, interval.start AS interval_start,interval."end" AS interval_endFROM intervalWHERE interval."end" - interval.start = :param_1
上述還有一個寫著玩兒的setter,hybrid_property支持:
- comparator 擴展Interval.length在各種比較符(><=)的行為
- deleter/setter 顧名思義
- expression 可以擴展最後展開的SQL表達式,例如展開成SUM(xxx):
from sqlalchemy.orm import func #下面這個寫著玩的。。 @length.expression def length(self, expr): return func.sum(self.end, expr)
序列化Query
提供一個介面,以序列化和反序列化Query,用於跨系統、微服務的場景
from sqlalchemy.ext.serializer import loads, dumpsmetadata = MetaData(bind=some_engine)Session = scoped_session(sessionmaker())# ... define mappersquery = Session.query(User). filter(User.somedata==foo).order_by(User.sortkey)# pickle the queryserialized = dumps(query)# unpickle. Pass in metadata + scoped_session # 上面提到過的 query和Session實際上是密不可分的query2 = loads(serialized, metadata, Session)print query2.all()
這個做起來其實就非常帶感了,微服務之間的必要條件就是各種dump,結合一下celery,實現一個去中心的HTTP服務也是不在話下
Baked Query
緩存從Query生成的SQL,以減少生成時間,實際上是個應用層面的存儲過程、View
from sqlalchemy.ext import bakedbakery = baked.bakery()#<-創建了一個LRUfrom sqlalchemy import bindparamdef search_for_user(session, username, email=None): baked_query = bakery(lambda session: session.query(User)) baked_query += lambda q: q.filter(User.name == bindparam(username)) baked_query += lambda q: q.order_by(User.id) if email: baked_query += lambda q: q.filter(User.email == bindparam(email)) result = baked_query(session).params(username=username, email=email).all() return result
上面說到了SQLAlchemy展開成SQL的性能問題,真的特別擔憂的話,再來一個緩存綁定參數如何?
多態和關係
使用多個模型,但實際上只是操作一張資料庫表 此處基本略,之前寫過一篇文章了:這兒
class Employee(Base): __tablename__ = employee id = Column(Integer, primary_key=True) name = Column(String(50)) type = Column(String(50)) __mapper_args__ = { polymorphic_identity:employee, polymorphic_on:type }
這裡定義了僱員Employee 模型,指定type欄位為多態所在欄位,並且對於這個模型,當type欄位為employee時,即為一個僱員
一對一、一對多、多對多的關係和自動收集成collection,這裡不會細說,relationship函數的各種參數留待大家遊玩。
關係間的收集有多種lazy方式,可以選擇在父類讀取時直接JOIN或者Subquery,也可以在需要的時候使用Query.option設置。說起來的篇幅會更長,我投個懶,大家去讀文檔吧~hfgl
推薦閱讀:
※Python 在電氣工程及其自動化上有哪些應用?
※為什麼 x in range(1000000000000001) 的執行速度這麼快
※python多線程下載,進度條顯示問題如何解決?
※scrapy 知乎用戶信息爬蟲
※Python 中列表和元組有哪些區別?
TAG:Python |