為什麼要掃描昨天的數據,當你可以增量更新今天的數據?
當我們使用 SQL 聚合函數處理大型數據集時,計算可能會變得非常耗費資源。隨著數據集的增長,重複計算整個數據集的指標變得不再高效。為了解決這個問題,我們通常會使用增量聚合的方法,這種方法涉及保持先前的狀態並用新進來的數據更新它。雖然對於像 COUNT 或 SUM 這樣的聚合來說,這種方法很簡單,但問題是:如何將其應用於更複雜的指標,如標準差呢?
標準差是一種統計指標,用來測量變量的值相對於其平均值的變化程度或分散程度。它是通過計算方差的平方根得出的。計算樣本方差的公式如下:
計算標準差可能很複雜,因為它需要更新所有數據點的平均值和平方差的總和。然而,通過代數運算,我們可以推導出一個增量計算的公式,這樣就能夠使用現有數據集並無縫地整合新數據。這種方法避免了每次添加新數據時都要從頭開始計算,使過程變得更加高效(詳細推導可在我的 GitHub 上找到)。
這個公式基本上分為三個部分:1. 現有集的加權方差 2. 新集的加權方差 3. 平均差方差,考慮到組間方差。
這種方法通過保留現有集的 COUNT (k)、AVG (µk) 和 VAR (Sk),並將它們與新集的 COUNT (n)、AVG (µn) 和 VAR (Sn) 結合,實現了增量方差計算。因此,更新的標準差可以高效計算,而無需重新掃描整個數據集。
現在我們已經理解了增量標準差背後的數學(或者至少抓住了要點),讓我們深入了解 dbt SQL 的實現。在接下來的例子中,我們將逐步介紹如何設置增量模型來計算和更新用戶的交易數據統計。
考慮一個名為 stg__transactions 的交易表,該表跟踪用戶的交易(事件)。我們的目標是創建一個靜態時間表 int__user_tx_state,該表聚合用戶交易的「狀態」。兩個表的列詳細信息如下圖所示。
為了提高效率,我們旨在通過將新進來的交易數據與現有的聚合數據(即當前用戶狀態)結合,增量更新狀態表。這種方法使我們能夠在不掃描所有歷史數據的情況下計算更新的用戶狀態。
下面的代碼假設你了解一些 dbt 的概念,如果你不熟悉,仍然可以理解這段代碼,但我強烈建議你查看 dbt 的增量指南或閱讀這篇很棒的文章。
我們將逐步構建完整的 dbt SQL,旨在高效計算增量聚合,而不必重複掃描整個表。過程開始時,將模型定義為增量,並使用 unique_key 更新現有行,而不是插入新行。
— depends_on: {{ ref(‘stg__transactions’) }}{{ config(materialized=’incremental’, unique_key=[‘USER_ID’], incremental_strategy=’merge’) }}
接下來,我們從 stg__transactions 表中獲取記錄。is_incremental 區塊過濾時間戳晚於最新用戶更新的交易,有效地只包括「新交易」。
WITH NEW_USER_TX_DATA AS (SELECT USER_ID, TX_ID, TX_TIMESTAMP, TX_VALUE FROM {{ ref(‘stg__transactions’) }}{% if is_incremental() %} WHERE TX_TIMESTAMP > COALESCE((select max(UPDATED_AT) from {{ this }}), 0::TIMESTAMP_NTZ){% endif %})
在檢索到新交易記錄後,我們按用戶進行聚合,這樣就能在接下來的 CTE 中增量更新每個用戶的狀態。
INCREMENTAL_USER_TX_DATA AS (SELECT USER_ID, MAX(TX_TIMESTAMP) AS UPDATED_AT, COUNT(TX_VALUE) AS INCREMENTAL_COUNT, AVG(TX_VALUE) AS INCREMENTAL_AVG, SUM(TX_VALUE) AS INCREMENTAL_SUM, COALESCE(STDDEV(TX_VALUE), 0) AS INCREMENTAL_STDDEV FROM NEW_USER_TX_DATA GROUP BY USER_ID)
現在我們進入重點部分,需要實際計算聚合。當我們不處於增量模式(即還沒有任何「狀態」行)時,我們只需選擇新的聚合。
NEW_USER_CULMULATIVE_DATA AS (SELECT NEW_DATA.USER_ID, {% if not is_incremental() %} NEW_DATA.UPDATED_AT AS UPDATED_AT, NEW_DATA.INCREMENTAL_COUNT AS COUNT_TX, NEW_DATA.INCREMENTAL_AVG AS AVG_TX, NEW_DATA.INCREMENTAL_SUM AS SUM_TX, NEW_DATA.INCREMENTAL_STDDEV AS STDDEV_TX {% else %}…
但當我們處於增量模式時,我們需要根據上述公式將過去數據與在 INCREMENTAL_USER_TX_DATA CTE 中創建的新數據進行聯接。我們首先計算新的 SUM、COUNT 和 AVG:
…{% else %} COALESCE(EXISTING
新聞來源
本文由 AI 台灣 運用 AI 技術編撰,內容僅供參考,請自行核實相關資訊。
歡迎加入我們的 AI TAIWAN 台灣人工智慧中心 FB 社團,
隨時掌握最新 AI 動態與實用資訊!