Apache Airflow 是資料領域中最受歡迎的工作流程管理工具之一,為全球的公司提供工作流程支持。然而,任何在生產環境中使用過 Airflow 的人,特別是在複雜的環境中,都知道它有時會出現一些問題和奇怪的錯誤。
在 Airflow 環境中,有許多需要管理的方面,其中一個重要的指標經常被忽視:DAG 解析時間。監控和優化解析時間對於避免性能瓶頸和確保工作流程的正確運行至關重要,這篇文章將深入探討這個主題。
本教程旨在介紹 airflow-parse-bench,這是一個我開發的開源工具,旨在幫助資料工程師監控和優化他們的 Airflow 環境,提供洞見以減少代碼複雜性和解析時間。
關於 Airflow,DAG 解析時間通常是一個被忽視的指標。每當 Airflow 處理你的 Python 文件以動態構建 DAG 時,解析就會發生。
默認情況下,所有的 DAG 每 30 秒解析一次,這個頻率由配置變數 min_file_process_interval 控制。這意味著每 30 秒,位於你的 dags 文件夾中的所有 Python 代碼都會被讀取、導入和處理,以生成包含待排程任務的 DAG 對象。成功處理的文件會被添加到 DAG Bag 中。
有兩個關鍵的 Airflow 組件負責這個過程:
這兩個組件(通常稱為 DAG 處理器)由 Airflow 調度器執行,確保你的 DAG 對象在觸發之前得到更新。然而,出於可擴展性和安全性原因,也可以在集群中將 DAG 處理器作為單獨的組件運行。
如果你的環境中只有幾十個 DAG,解析過程不太可能造成任何問題。然而,生產環境中常常會有數百甚至數千個 DAG。在這種情況下,如果你的解析時間過高,可能會導致:
- DAG 排程延遲。
- 資源使用率增加。
- 環境心跳問題。
- 調度器故障。
- 過度的 CPU 和內存使用,浪費資源。
現在,想像一下擁有數百個 DAG,並且包含不必要複雜的解析邏輯的環境。小的低效能問題可能迅速轉變為重大問題,影響整個 Airflow 系統的穩定性和性能。
在編寫 Airflow DAG 時,有一些重要的最佳實踐需要記住,以創建優化的代碼。雖然你可以找到許多有關如何改進 DAG 的教程,但我將總結一些關鍵原則,這些原則可以顯著提高 DAG 的性能。
限制頂層代碼
高 DAG 解析時間的最常見原因之一是低效或複雜的頂層代碼。在 Airflow DAG 文件中的頂層代碼在每次調度器解析文件時都會執行。如果這段代碼包含資源密集型操作,例如數據庫查詢、API 調用或動態任務生成,則會顯著影響解析性能。
以下代碼顯示了一個未優化的 DAG 範例:
在這種情況下,每次調度器解析文件時,頂層代碼都會執行,進行 API 請求並處理 DataFrame,這會顯著影響解析時間。
另一個導致解析速度緩慢的重要因素是頂層導入。每個在頂層導入的庫在解析期間都會加載到內存中,這可能會耗時。為了避免這種情況,你可以將導入移到函數或任務定義中。
以下代碼顯示了同一 DAG 的更好版本:
避免在頂層代碼中使用 Xcoms 和變數
在這裡仍然談論同一主題,特別有趣的是避免在頂層代碼中使用 Xcoms 和變數。根據 Google 的文檔:
如果你在頂層代碼中使用 Variable.get(),每次解析 .py 文件時,Airflow 都會執行 Variable.get(),這會打開一個與數據庫的會話。這會顯著降低解析速度。
為了解決這個問題,考慮使用 JSON 字典來一次性檢索多個變數,而不是多次調用 Variable.get()。或者,使用 Jinja 模板,因為這樣檢索的變數僅在任務執行期間處理,而不是在 DAG 解析期間。
刪除不必要的 DAG
雖然這似乎很明顯,但始終重要的是定期清理環境中不必要的 DAG 和文件:
- 刪除未使用的 DAG:檢查你的 dags 文件夾,刪除任何不再需要的文件。
- 使用 .airflowignore:指定 Airflow 應該故意忽略的文件,跳過解析。
- 檢查暫停的 DAG:暫停的 DAG 仍然會被調度器解析,消耗資源。如果不再需要,考慮刪除或存檔。
更改 Airflow 配置
最後,你可以更改一些 Airflow 配置,以減少調度器的資源使用:
- min_file_process_interval:此設置控制 Airflow 解析 DAG 文件的頻率(以秒為單位)。將其從默認的 30 秒增加可以減少調度器的負擔,但會導致 DAG 更新速度變慢。
- dag_dir_list_interval:這決定了 Airflow 每隔多少秒掃描一次 dags 目錄以查找新 DAG。如果你不經常部署新 DAG,考慮增加此間隔以減少 CPU 使用。
我們已經討論了創建優化 DAG 以維持健康 Airflow 環境的重要性。但你如何實際測量 DAG 的解析時間呢?幸運的是,根據你的 Airflow 部署或操作系統,有幾種方法可以做到這一點。
例如,如果你有 Cloud Composer 部署,可以通過在 Google CLI 上執行以下命令輕鬆檢索 DAG 解析報告:
gcloud composer environments run $ENVIRONMENT_NAME \–location $LOCATION \dags report
雖然檢索解析指標很簡單,但測量代碼優化的有效性可能就不那麼容易。每次修改代碼時,你需要將更新的 Python 文件重新部署到雲提供商,等待 DAG 被解析,然後提取新的報告——這是一個緩慢且耗時的過程。
另一種可能的方法,如果你在 Linux 或 Mac 上,是運行此命令以在本地測量解析時間:
time python airflow/example_dags/example.py
然而,雖然這很簡單,但這種方法對於系統地測量和比較多個 DAG 的解析時間並不實用。
為了解決這些挑戰,我創建了 airflow-parse-bench,這是一個 Python 庫,可以簡化使用 Airflow 的原生解析方法來測量和比較 DAG 的解析時間。
airflow-parse-bench 工具使得存儲解析時間、比較結果和標準化 DAG 之間的比較變得簡單。
安裝庫
在安裝之前,建議使用 virtualenv 以避免庫衝突。設置完成後,你可以通過運行以下命令來安裝該包:
pip install airflow-parse-bench
注意:此命令僅安裝基本依賴(與 Airflow 和 Airflow 提供者相關)。你必須手動安裝 DAG 依賴的任何其他庫。
例如,如果一個 DAG 使用 boto3 與 AWS 互動,請確保在你的環境中安裝 boto3。否則,你將遇到解析錯誤。
之後,需要初始化你的 Airflow 數據庫。這可以通過執行以下命令來完成:
airflow db init
此外,如果你的 DAG 使用 Airflow 變數,你也必須在本地定義它們。然而,對於解析目的,並不需要在變數中放置真實值:
airflow variables set MY_VARIABLE ‘ANY TEST VALUE’
如果不這樣做,你將遇到類似以下的錯誤:
error: ‘Variable MY_VARIABLE does not exist’
使用工具
安裝庫後,你可以開始測量解析時間。例如,假設你有一個名為 dag_test.py 的 DAG 文件,包含上面示例中使用的未優化 DAG 代碼。
要測量其解析時間,只需運行:
airflow-parse-bench –path dag_test.py
這次執行會產生以下輸出:
如觀察到的,我們的 DAG 的解析時間為 0.61 秒。如果我再次運行該命令,我會看到一些小的差異,因為解析時間可能會因系統和環境因素而略有變化:
為了呈現更簡潔的數字,可以通過指定迭代次數來聚合多次執行:
airflow-parse-bench –path dag_test.py –num-iterations 5
雖然這需要花費更長的時間來完成,但這樣可以計算五次執行的平均解析時間。
現在,為了評估上述優化的影響,我將 mydag_test.py 中的代碼替換為之前分享的優化版本。在執行相同的命令後,我得到了以下結果:
如注意到的,僅僅應用一些良好的實踐就能將 DAG 解析時間減少近 0.5 秒,突顯了我們所做更改的重要性!
還有其他有趣的功能,我認為值得分享。
作為提醒,如果你在使用該工具時有任何疑問或問題,可以訪問 GitHub 上的完整文檔。
此外,要查看庫支持的所有參數,只需運行:
airflow-parse-bench –help
測試多個 DAG
在大多數情況下,你可能有數十個 DAG 需要測試解析時間。為了解決這個用例,我創建了一個名為 dags 的文件夾,並在其中放入四個 Python 文件。
要測量文件夾中所有 DAG 的解析時間,只需在 –path 參數中指定文件夾路徑:
airflow-parse-bench –path my_path/dags
運行此命令會產生一個表格,總結文件夾中所有 DAG 的解析時間:
默認情況下,表格是從最快到最慢的 DAG 排序的。然而,你可以使用 –order 參數反轉順序:
airflow-parse-bench –path my_path/dags –order desc
跳過未更改的 DAG
–skip-unchanged 參數在開發過程中特別有用。顧名思義,這個選項跳過未自上次執行以來未修改的 DAG 的解析執行:
airflow-parse-bench –path my_path/dags –skip-unchanged
如下面所示,當 DAG 保持不變時,輸出反映了解析時間沒有差異:
重置數據庫
所有 DAG 信息,包括指標和歷史記錄,都存儲在本地 SQLite 數據庫中。如果你想清除所有存儲的數據並重新開始,請使用 –reset-db 標誌:
airflow-parse-bench –path my_path/dags –reset-db
此命令重置數據庫,並將 DAG 作為第一次執行進行處理。
解析時間是維持可擴展和高效的 Airflow 環境的重要指標,特別是隨著你的工作流程需求變得越來越複雜。
因此,airflow-parse-bench 庫可以成為幫助資料工程師創建更好 DAG 的重要工具。通過在本地測試 DAG 的解析時間,你可以輕鬆快速地找到代碼瓶頸,使你的 DAG 更快且更具性能。
由於代碼是在本地執行的,因此產生的解析時間不會與你的 Airflow 集群中的時間相同。然而,如果你能在本地機器上減少解析時間,那麼在雲環境中也可能會重現相同的效果。
最後,這個項目是開放的,歡迎合作!如果你有建議、想法或改進,隨時在 GitHub 上貢獻。
本文由 AI 台灣 運用 AI 技術編撰,內容僅供參考,請自行核實相關資訊。
歡迎加入我們的 AI TAIWAN 台灣人工智慧中心 FB 社團,
隨時掌握最新 AI 動態與實用資訊!