PostgreSQL 技術筆記: 跟疾管署沒有關係的CDC

Dcard Tech
Dcard Tech Blog
Published in
15 min readOct 26, 2022

--

繼上次 Dcard Tech Sharing 活動之後,在此整理了一些有關分享中所提到的細節內容,讓大家能夠更瞭解 Dcard 中使用PostgreSQL CDC 的運作原理。

在 Dcard 多(ㄈㄨˋ)元(ㄗㄚˊ)的系統架構當中,有許多服務元件是使用了 PostgreSQL 來進行資料的儲存。而在日常的維運當中,有許多分析查詢或問題查找的情況。若直接使用 production 的 DB 來提供查詢使用,不僅僅是會影響到效能,也相對的不怎麼安全。而同時 Dcard 是採用了 Microservices 的架構,為了考量 Resource Isolation 的原因,對於不同的服務是分別獨立部署各自的 PostgreSQL DB 來提供服務使用。

因此為了方便管理與查詢, Dcard 的開發團隊於是研究與發展了一個 OfflineDB Proxy 服務,讓分析與問題查找的時候,可以透過一個特別 on demand 的服務來存取與管理 PostgreSQL 複本 DB 。為了讓分析查詢與問題查找的便利, 因此又在此 OfflineDB Proxy 服務的基礎上,開始發展了PostgreSQL CDC (Change Data Capture) 的技術,使複本 DB 上可以有近乎即時的資料可以提供。隨後又因 CDC 的服務發展與加入,讓許多其他的服務元件能夠透過 Data Stream 的方式來取得 DB 相關的異動資料,使服務間的流程設計上可以保證程序的完整性,從而有更可靠與穩定的 Dcard 服務能夠提供給卡友們使用。接下來就一一介紹各個模組間的設計概念。

OfflineDB Proxy

這是一個專門提供複本DB使用的服務,OfflineDB proxy會在收到 postgres db connection後(如下圖),從一個既有的 disk snapshot 建立新的複本DB作為查詢使用,同時也會透過不同的 startup script 執行,針對一些敏感的資料進行額外的保護與處理,使既有的 production 服務資源不受影響,而要建置這樣的系統服務,主要有三個部分需要完成。

Snapshot Rotate Job

針對所需提供服務的 DB,定期的將 production DB 建立 disk snapshot (像是使用 Cloud Service 的 disk snapshot 功能),並且自動淘汰舊的 snapshot,來供給 OfflineDB Proxy 建立新的複本 DB 使用。

Postgres Protocol Handle

在client 連線至 OfflineDB Proxy 後,server 要取得 connection 中所使用的 DB 名稱,要取得相對應的參數,server 會透過 Postgres protocol 中的 Start-up message 來取得參數,再透過此參數作為複本 disk snapshot 的選取,然後建立複本 DB,最後等候建立完成後,再進行 connection forward 的處理。

Resource Controller

當 OfflineDB Proxy 開始提供複本 DB 查詢後,還需要有個方式來對複本 DB 進行管理,因此 Resource Controller 是一個類似 k8s controller 的概念,會定期針對已建立的複本 DB 進行操作與狀態取得,而在連線建立時,也可透過 Controller 取得已存在的複本 DB 來提供查詢服務,若不存在對應的複本DB時,再透過 snapshot 建立新的複本 DB 作為使用,在連線結束之後,對於閒置一段時間的複本DB,也透過 Resource Controller 來自動的進行結束處理,避免資源使用上的浪費。

而在有了這樣的服務之後,在實際的分析查詢資料上,因為在成本和實作上的考量,snapshot 無法過於密集的產生,即時性會有些不足,導致無法在 OfflineDB Proxy 上查詢到最新的資料。為了解決這問題,我們參考了 Netflix 的 dblog 架構,開始了 PostgreSQL CDC 的服務開發,而要怎麼進行這樣的服務建置呢?

取得 PostgreSQL 的 Change Data

在資料庫上的操作,可分為對資料表與資料列的操作,包含建立、修改、刪除,而在 PostgreSQL 資料表上的操作是透過 Data Definition Language(DDL) 來進行執行,而在資料列上則是透過 Data Manipulation Language (DML) 進行執行,為了可以滿足各種的使用情況,需要取得 DDL 與 DML 中執行的任何有關異動資料,且是要即時並完整的取得,而要取得這樣的資料,方法各有不同,我們則是採用了 PostgreSQL Replication 的機制(如下圖),來取得線上 DB 運作時有關的異動資料。

PostgreSQL Replication

PostgreSQL 服務中若要建立一個 Replica Server 會先需要在一個 Primary Server 建立一個 Replication Slot,這是一個用於確保 Replica Server 在與 Primary Server 建立 Replication connection 後,可以讓每個資料異動操作所產的 Write-Ahead Log (WAL) 可以透過 Streaming Replication Protocol 來進行資料同步,在連線中 Replica Server 上會收到按 Log Sequence Number (LSN) 順序的 WAL 來進行資料同步,而在資料同步之後, Replica Server 也能夠透過 Streaming Replication Protocol 對 Primary Server 對已收到或已處理完成的 LSN 進行狀態更新。

在 WAL 資訊中,會紀錄了每個 Transaction 所異動的相關資料,因此透過此方式,就可取得 PostgreSQL 中所異動的相關資訊,因此也能夠將相關的 Change Data 用來進行轉存,並且能夠即時的發送到所需要的各種服務上。

儲存 Change Data 的 Pulsar

對於提供 Change Data 的方式,可以採用各種類型的 Message Queue 服務來提供,而我們則是選用了Apache Pulsar 來進行使用,有下列三項主要原因:

Strong Ordering

在一些特別的應用情境時,會特別需要在意資料異動的先後順序,像是優惠卷的限量取得,就需要考慮到先取先得的功能性,而也因此針對這樣的請求,能不能夠依序的接收與處理,就會是相當重要的考量。

Message Deduplication

由於production的服務架構中,會需要考慮到 High Availability (HA) 的功能設計,而在這樣的情況中,會面臨到若在有多個相同的異動資料取得時,需要保證相同的資料不會被重複的提供到 Data Stream上,避免相同的資料被重複提供的狀況發生時,因此在Change Data取得後,若有 Message Deduplication 的功能,就可簡單的使用 LSN 作為 message key,來避免相同的Change Data,被多筆的進行儲存與轉發。

Tiered Storage

針對 Dcard 百萬級的用戶看板服務,每天會產生的 Change Data 是相當巨量可觀的,因此在資料的儲存成本與空間限制上,就需要特別的考量與選擇,而 Pulsar 對於訊息的儲存上,有提供各樣的 offload service 選擇來進行儲存,可彈性的使用 GCS 或 AWS S3 服務進行存放與使用,因此在空間儲存的用量限制,就可不用特別擔心,且長期儲存與維護成本上也相對於自建服務便宜。

複本 DB 的 Data Sync

pg2pulsar

透過以上的概念,我們需要實際的進行 PostgreSQL CDC 程序來處理(如下圖),因此我們開發了一個 pg2pulsar 程序,作為PostgreSQL CDC 的 處理,當中是使用了 logical replication 的方式來進行,透過 2ndQuadrant 所開源的 pglogical2 extension 使用與剖析 WAL 的方式,來取得所需要的完整 Change Data (不透過 extension 的話,WAL 像是 update 操作可能會只有 row 異動部份的資料,但有些應用場景會希望有完整 row 的資料),主要會進行下列幾個步驟進行:

  1. 建立 Pulsar connection
  2. 取得最後已儲存的 LSN 資訊
  3. 建立 PostgreSQL replication connection
  4. 開始 replication,接受來自 PostgreSQL 的 WAL
  5. 取得 WAL 後剖析資料,取出需的 Change Data
  6. 將 Change Data 按 LSN 順序,透過 message 發送至 Pulsar
  7. 完成 message 發送後,對 PostgreSQL 進行狀態更新

因此有了這樣的程序後,就可取得 production DB 上的 Change Data 並且發送到 Pulsar 上來進行轉發與儲存。

pulsar2pg

而對於 OfflineDB 的要有另一個程序來進行 data sync,因此 pulsar2pg 是作為用來接收來自 Pulsar 所提供的 Change Data 來進行資料更新執行的程序,會按下列幾個步驟進行

  1. 對複本 DB 建立 connection
  2. 取得複本 DB 目前 LSN
  3. 建立 Pulsar connection
  4. 透過目前 LSN 取得 Change Data
  5. 將 Change Data 對複本 DB 進行 data sync

在 pg2pulsar 與 pulsar2pg 的這兩個程式的運作下,就可將 production 上 DB 即時的異動資料,交付給複本 DB 來進行同步更新,使複本 DB 也有最新的資料可以做即時分析使用。

CDC Stream Gateway

在有了這樣的 CDC 服務建置後,除了給 OfflineDB 提供即時性之外,為了提供給各樣即時應用的功能處理,Dcard 服務內,還另外發展了一個 CDC Stream Gateway (如下圖),簡化 Application 到 Message Queue 間的處理,建立一個 abstract layer,來提供給各種不同的 Stream Application 作為使用,透過 gRPC stream server 服務,可主動的轉發訊息,提供 CDC Stream 給不同的 application 進行使用,在需要使用 Change Data 進行連動處理的服務,只要需透過 gRPC stream client 連線的方式,指定所需取得的 DB 與 table,就可收到即時的 Change Data 來進行相關的非同步操作處理。比起像是在處理業務邏輯自行訊息發送 message queue,CDC Stream Gateway 以 database 做為 ground truth,可以確保異動一定會被通知到。

例如 Dcard 上的文章是透過 PostgresSQL 來存放時, 若想針對文章在進行編輯內容後, 能夠立即更新 Elasticsearch 搜尋的索引內容時,就可簡單的透過 CDC Stream Gateway 取得 Change Data 進行更新(如下 Golang 範例),使用一個 CDC consumer 指定所需接收的 DB 與 table ,再透過一個 Handler 進行處理即可,就可根據資料變更的種類, 分別對於 Elasticsearch 進行不同的操作處理即可。若處理過程中有例外發生, 像是操作 Elasticsearch 的請求發生網路異常錯誤,在 Handler return error 後,對 message 進行 nack 處理,Message Queue 將會在一定時間後重新進行 message delivery,就可確保 Change Data 會再次被處理到,保證程序上的完整性。

grpcConn, err := grpc.Dial("cdc-stream-gateway")
...
// create a CDC consumer
consumer := pgcapture.NewConsumer(ctx, grpcConn, pgcapture.ConsumerOption{
URI: "postdb",
TableRegex: "posts",
})
...// register a handler
err = consumer.Consume(map[pgcapture.Model]pgcapture.ModelHandlerFunc{
PostModel: func(change pgcapture.Change) error {
var post *cdc.Post
switch change.Op {
case change.OpInsert, change.OpUpdate:
post = change.New.(*cdc.Post)
// when a post is inserted or updated, call Elasticsearch to save a post
if err := esAPI.SavePost(post); err != nil {
return err
}
case change.OpDelete:
post = change.Old.(*cdc.Post)
// when a posted is deleted, call Elasticsearch API to delete a post
if err := esAPI.DeletePost(post.ID); err != nil {
return err
}
}
},
}

而這樣的服務建置,也帶來了維護上的便利,除了可以透過 Protocol Buffers 的制定,明確訊息結構外,也能夠在 Message Queue 需要進行版本升級或者替換掉 Pulsar 時,只需要對 CDC Stream Gateway 的服務進行更新,而其他已存在 的Stream Application 仍可兼容運作甚至完全無感。若當有需要擴充額外功能時(例如要加上特殊的 filter),也可透過 CDC Stream Gateway 的修改與調整,就可達到額外擴充功能的實現。

三大模塊 OfflineDB Proxy、CDC、Stream Gateway 整個搭建起來之後就如下圖顯示的架構與流動方向。透過 OfflineDB proxy 可以 on demand 透過 snapshot 去產生 database,而 database 即時性不足的地方就透過 CDC + pg2pulsar + pulsar2pg 即時補上異動資料。而應用程式也以透過 gRPC stream gateway 取得異動資料做出各種不同的功能。

結語

最後最後,介紹了那麼多有關 PostgreSQL 的相關技術後,當然要跟大家分享一下 Dcard 有關的服務概況,目前 Dcard 的服務中,已有數十來個相關的 CDC Stream 應用,正在 Dcard 的線上服務中每天正常的運行著,而整個 CDC 服務可優化調整的地方也都還在持續探索著,而對於一些其他類型 DB 的 CDC 服務,像是 MongoDB 未來也都還在規劃發展進行中,期待之後有機會再跟各位分享相關的內容。

參考文獻

--

--