Amazon SageMaker Pipelines 包含一些功能,可以幫助你簡化和自動化機器學習 (ML) 工作流程。這讓科學家和模型開發者可以專注於模型開發和快速實驗,而不是基礎設施管理。
Pipelines 提供了一個簡單的 Python SDK,可以協調複雜的 ML 工作流程,並通過 SageMaker Studio 可視化這些工作流程。這有助於數據準備、特徵工程、模型訓練和部署自動化。Pipelines 還與 Amazon SageMaker 自動模型調整集成,能自動找到最佳的超參數值,以達到最佳性能模型,根據你選擇的指標來判斷。
集成模型在 ML 社群中變得越來越受歡迎。它們通過結合多個模型的預測來生成更準確的預測。Pipelines 可以快速用於創建集成模型的端到端 ML 管道。這使開發者能夠在保持效率和可重複性的同時,構建高準確度的模型。
在這篇文章中,我們提供了一個使用 Pipelines 訓練和部署的集成模型範例。
使用案例概述
銷售代表在 Salesforce 中生成新線索並創建機會來追蹤它們。以下應用是一種使用無監督學習的 ML 方法,自動根據各種文本信息(如名稱、描述、細節和產品服務組)識別每個機會中的使用案例。
初步分析顯示,不同行業的使用案例各不相同,不同的使用案例在年化收入的分佈上也有很大差異,這有助於進行細分。因此,使用案例是一個重要的預測特徵,可以優化分析並改善銷售推薦模型。
我們可以將使用案例識別視為主題識別問題,並探索不同的主題識別模型,如潛在語義分析 (LSA)、潛在狄利克雷分配 (LDA) 和 BERTopic。在 LSA 和 LDA 中,每個文檔僅被視為一組單詞,單詞的順序或語法角色並不重要,這可能會導致在確定主題時一些信息的損失。此外,它們需要預先確定的主題數量,而這在我們的數據集中很難確定。由於 BERTopic 克服了上述問題,因此我們使用它來識別使用案例。
這種方法使用三個連續的 BERTopic 模型以層級方式生成最終聚類。
每個 BERTopic 模型由四個部分組成:
嵌入 – 在 BERTopic 中可以使用不同的嵌入方法。在這種情況下,輸入數據來自不同的領域,通常是手動輸入。因此,我們使用句子嵌入來確保可擴展性和快速處理。
維度縮減 – 我們使用均勻流形近似和投影 (UMAP),這是一種無監督和非線性的維度縮減方法,用於減少高維文本向量。
聚類 – 我們使用平衡迭代減少和層次聚類 (BIRCH) 方法來形成不同的使用案例聚類。
關鍵字識別 – 我們使用基於類別的 TF-IDF 從每個聚類中提取最具代表性的單詞。
序列集成模型
沒有預先確定的主題數量,因此我們設置聚類的輸入數量為 15 到 25 個主題。觀察後發現,有些主題是廣泛和一般的。因此,對它們單獨應用另一層 BERTopic 模型。在第二層模型中結合所有新識別的主題以及來自第一層結果的原始主題後,手動進行後處理以最終確定主題識別。最後,對某些聚類使用第三層來創建子主題。
為了使第二層和第三層模型有效運作,你需要一個映射文件,將先前模型的結果映射到特定的單詞或短語。這有助於確保聚類的準確性和相關性。
我們使用貝葉斯優化進行超參數調整和交叉驗證,以減少過擬合。數據集包含機會名稱、機會細節、需求、相關產品名稱、產品細節和產品組等特徵。模型使用自定義損失函數進行評估,並選擇最佳的嵌入模型。
挑戰與考量
以下是這個解決方案的一些挑戰和考量:
管道的數據預處理能力對提升模型性能至關重要。通過在訓練之前預處理進來的數據,我們可以確保模型接收到高質量的數據。一些預處理和數據清理步驟包括將所有文本列轉換為小寫、移除模板元素、縮寫、網址、電子郵件等,移除不相關的 NER 標籤,以及對合併文本進行詞形還原。結果是更準確和可靠的預測。
我們需要一個高度可擴展的計算環境,以便輕鬆處理和訓練數百萬行數據。這使我們能夠輕鬆執行大規模數據處理和建模任務,並減少開發時間和成本。
由於每個 ML 工作流程的每個步驟都需要不同的資源需求,因此靈活和可調整的管道對於有效的資源分配至關重要。我們可以通過優化每個步驟的資源使用來減少整體處理時間,從而加快模型開發和部署。
執行自定義腳本進行數據處理和模型訓練需要可用的框架和依賴項。
協調多個模型的訓練可能會很具挑戰性,特別是當每個後續模型依賴於前一個模型的輸出時。這些模型之間的工作流程協調過程可能會複雜且耗時。
在每個訓練層之後,有必要修訂一個映射,反映模型生成的主題,並將其用作後續模型層的輸入。
解決方案概述
在這個解決方案中,入口點是 Amazon SageMaker Studio,這是一個由 AWS 提供的基於網頁的集成開發環境 (IDE),使數據科學家和 ML 開發者能夠以協作和高效的方式構建、訓練和部署 ML 模型。
以下圖示說明了解決方案的高級架構。
作為架構的一部分,我們使用以下 SageMaker 管道步驟:
SageMaker 處理 – 此步驟允許你在訓練之前預處理和轉換數據。這一步驟的一個好處是能夠使用內建算法進行常見數據轉換和資源的自動擴展。你還可以使用自定義代碼進行複雜的數據預處理,並允許使用自定義容器映像。
SageMaker 訓練 – 此步驟允許你使用 SageMaker 內建算法或自定義代碼訓練 ML 模型。你可以使用分佈式訓練來加速模型訓練。
SageMaker 回調 – 此步驟允許你在 ML 工作流程中運行自定義代碼,例如發送通知或觸發額外的處理步驟。你可以在此步驟中運行外部過程,並在完成後恢復管道工作流程。
SageMaker 模型 – 此步驟允許你創建或註冊模型到 Amazon SageMaker。
實施步驟
首先,我們設置 SageMaker 管道:
import boto3
import sagemaker
# 創建一個自定義區域的 Session(例如 us-east-1),如果未指定則為 None
region = “<your-region-name>”
# 分配默認的 S3 存儲桶給 SageMaker 會話,如果未指定則為 None
default_bucket = “<your-s3-bucket>”
boto_session = boto3.Session(region_name=region)
sagemaker_client = boto_session.client(“sagemaker”)
初始化 SageMaker 會話
sagemaker_session = sagemaker.session.Session(boto_session=boto_session, sagemaker_client=sagemaker_client, default_bucket= default_bucket,)
為會話設置 SageMaker 執行角色
role = sagemaker.session.get_execution_role(sagemaker_session)
在管道上下文中管理交互
pipeline_session = sagemaker.workflow.pipeline_context.PipelineSession(boto_session=boto_session, sagemaker_client=sagemaker_client, default_bucket=default_bucket,)
定義運行腳本的基礎映像
account_id = role.split(“:”)[4]
# 創建一個基礎映像來處理依賴項
ecr_repository_name = “<your-base-image-to-run-script>”
tag = “latest”
container_image_uri = “{0}.dkr.ecr.{1}.amazonaws.com/{2}:{3}”.format(account_id, region, ecr_repository_name, tag)
以下是工作流程步驟的詳細說明:
預處理數據 – 這涉及清理和準備數據以進行特徵工程,並將數據拆分為訓練、測試和驗證集。
import os
BASE_DIR = os.path.dirname(os.path.realpath(__file__))
from sagemaker.workflow.parameters import ParameterString
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.processing import (
ProcessingInput,
ProcessingOutput,
ScriptProcessor,
)
processing_instance_type = ParameterString(
name=”ProcessingInstanceType”,
# 選擇適合工作的實例類型
default_value=”ml.m5.4xlarge”
)
script_processor = ScriptProcessor(
image_uri=container_image_uri,
command=[“python”],
instance_type=processing_instance_type,
instance_count=1,
role=role,
)
# 定義數據預處理工作
step_preprocess = ProcessingStep(
name=”DataPreprocessing”,
processor=script_processor,
inputs=[
ProcessingInput(source=BASE_DIR, destination=”/opt/ml/processing/input/code/”)
],
outputs=[
ProcessingOutput(output_name=”data_train”, source=”/opt/ml/processing/data_train”), # 輸出數據和字典等以供後續步驟使用
],
code=os.path.join(BASE_DIR, “preprocess.py”),
)
訓練第一層 BERTopic 模型 – 使用 SageMaker 訓練步驟來訓練 BERTopic 模型的第一層,使用 Amazon Elastic Container Registry (Amazon ECR) 映像和自定義訓練腳本。
base_job_prefix=”OppUseCase”
from sagemaker.workflow.steps import TrainingStep
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
training_instance_type = ParameterString(
name=”TrainingInstanceType”,
default_value=”ml.m5.4xlarge”
)
# 創建一個訓練工作的估算器
estimator_first_layer = Estimator(
image_uri=container_image_uri,
instance_type=training_instance_type,
instance_count=1,
output_path= f”s3://{default_bucket}/{base_job_prefix}/train_first_layer”, # S3 存儲桶,訓練輸出將存儲在此
role=role,
entry_point = “train_first_layer.py”
)
# 根據數據預處理步驟的輸入創建估算器的訓練工作
step_train_first_layer = TrainingStep(
name=”TrainFirstLayerModel”,
estimator = estimator_first_layer,
inputs={
TrainingInput(
s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs[ “data_train” ].S3Output.S3Uri,
),
},
)
from sagemaker.workflow.callback_step import CallbackStep, CallbackOutput, CallbackOutputTypeEnum
first_sqs_queue_to_use = ParameterString(
name=”FirstSQSQueue”,
default_value= <first_queue_url>, # 添加隊列網址
)
first_callback_output = CallbackOutput(output_name=”s3_mapping_first_update”, output_type=CallbackOutputTypeEnum.String)
step_first_mapping_update = CallbackStep(
name=”FirstMappingUpdate”,
sqs_queue_url= first_sqs_queue_to_use,
# 在 SQS 消息中提供的輸入參數
inputs={
“input_location”: f”s3://{default_bucket}/{base_job_prefix}/mapping”,
“output_location”: f”s3://{default_bucket}/{base_job_prefix}/mapping_first_update “
},
outputs=[
first_callback_output,
],
)
step_first_mapping_update.add_depends_on([step_train_first_layer]) # 回調在 step_train_first_layer 之後運行
訓練第二層 BERTopic 模型 – 使用另一個 SageMaker TrainingStep 訓練 BERTopic 模型的第二層,使用 ECR 映像和自定義訓練腳本。
estimator_second_layer = Estimator(
image_uri=container_image_uri,
instance_type=training_instance_type, # 與第一層訓練相同的類型
instance_count=1,
output_path=f”s3://{bucket}/{base_job_prefix}/train_second_layer”, # S3 存儲桶,訓練輸出將存儲在此
role=role,
entry_point = “train_second_layer.py”
)
# 根據預處理的輸入、前一個回調步驟的輸出和第一層訓練步驟的輸出創建估算器的訓練工作
step_train_second_layer = TrainingStep(
name=”TrainSecondLayerModel”,
estimator = estimator_second_layer,
inputs={
TrainingInput(
s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs[ “data_train”].S3Output.S3Uri,
),
TrainingInput(
# 前一個回調步驟的輸出
s3_data= step_first_mapping_update.properties.Outputs[“s3_mapping_first_update”],
),
TrainingInput(
s3_data=f”s3://{bucket}/{base_job_prefix}/train_first_layer”
),
}
)
使用回調步驟 – 與步驟 3 類似,這涉及向 SQS 隊列發送消息,觸發 Lambda 函數。Lambda 函數在 Amazon S3 中更新映射文件,並將成功令牌發送回管道以恢復其運行。
second_sqs_queue_to_use = ParameterString(
name=”SecondSQSQueue”,
default_value= <second_queue_url>, # 添加隊列網址
)
second_callback_output = CallbackOutput(output_name=”s3_mapping_second_update”, output_type=CallbackOutputTypeEnum.String)
step_second_mapping_update = CallbackStep(
name=”SecondMappingUpdate”,
sqs_queue_url= second_sqs_queue_to_use,
# 在 SQS 消息中提供的輸入參數
inputs={
“input_location”: f”s3://{default_bucket}/{base_job_prefix}/mapping_first_update “,
“output_location”: f”s3://{default_bucket}/{base_job_prefix}/mapping_second_update “
},
outputs=[
second_callback_output,
],
)
step_second_mapping_update.add_depends_on([step_train_second_layer]) # 回調在 step_train_second_layer 之後運行
訓練第三層 BERTopic 模型 – 這涉及從 Amazon S3 獲取映射文件,並使用 ECR 映像和自定義訓練腳本訓練 BERTopic 模型的第三層。
estimator_third_layer = Estimator(
image_uri=container_image_uri,
instance_type=training_instance_type, # 與前兩層訓練相同的類型
instance_count=1,
output_path=f”s3://{default_bucket}/{base_job_prefix}/train_third_layer”, # S3 存儲桶,訓練輸出將存儲在此
role=role,
entry_point = “train_third_layer.py”
)
# 根據預處理步驟、第二個回調步驟和前兩層的輸出創建估算器的訓練工作
step_train_third_layer = TrainingStep(
name=”TrainThirdLayerModel”,
estimator = estimator_third_layer,
inputs={
TrainingInput(
s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs[“data_train”].S3Output.S3Uri,
),
TrainingInput(
# s3_data = 前一個回調步驟的輸出
s3_data= step_second_mapping_update.properties.Outputs[‘s3_mapping_second_update’],
),
TrainingInput(
s3_data=f”s3://{default_bucket}/{base_job_prefix}/train_first_layer”
),
TrainingInput(
s3_data=f”s3://{default_bucket}/{base_job_prefix}/train_second_layer”
),
}
)
註冊模型 – 使用 SageMaker 模型步驟將模型註冊到 SageMaker 模型註冊表中。當模型被註冊後,你可以通過 SageMaker 推理管道使用該模型。
from sagemaker.model import Model
from sagemaker.workflow.model_step import ModelStep
model = Model(
image_uri=container_image_uri,
model_data=step_train_third_layer.properties.ModelArtifacts.S3ModelArtifacts,
sagemaker_session=sagemaker_session,
role=role,
)
register_args = model.register(
content_types=[“text/csv”],
response_types=[“text/csv”],
inference_instances=[“ml.c5.9xlarge”, “ml.m5.xlarge”],
model_package_group_name=model_package_group_name,
approval_status=model_approval_status,
)
step_register = ModelStep(name=”OppUseCaseRegisterModel”, step_args=register_args)
為了有效訓練 BERTopic 模型以及 BIRCH 和 UMAP 方法,你需要一個自定義訓練映像,該映像可以提供運行算法所需的其他依賴項和框架。要查看自定義 Docker 映像的工作範例,請參考為 SageMaker 創建自定義 Docker 容器映像。
結論
在這篇文章中,我們解釋了如何使用 SageMaker Pipelines 提供的廣泛步驟和自定義映像來訓練集成模型。欲了解如何使用現有的機器學習運營 (MLOps) 模板開始使用 Pipelines 的更多信息,請參考使用 Amazon SageMaker Pipelines 構建、自動化、管理和擴展 ML 工作流程。
關於作者
Bikramjeet Singh 是 AWS 銷售洞察、分析和數據科學 (SIADS) 團隊的應用科學家,負責為 SIADS 內的 ML 科學家構建 GenAI 平台和 AI/ML 基礎設施解決方案。在擔任應用科學家之前,Bikram 曾在 SIADS 和 Alexa AI 擔任軟體開發工程師。
Rahul Sharma 是 AWS 的高級專家解決方案架構師,幫助 AWS 客戶構建 ML 和生成 AI 解決方案。在加入 AWS 之前,Rahul 在金融和保險行業工作了幾年,幫助客戶構建數據和分析平台。
Sachin Mishra 是一位經驗豐富的專業人士,擁有 16 年的技術諮詢和軟體領導角色的行業經驗。Sachin 在 AWS 負責銷售策略科學和工程功能。在這個角色中,他負責擴大銷售策略的認知分析,利用先進的 AI/ML 技術推動洞察並優化業務成果。
Nada Abdalla 是 AWS 的研究科學家。她的工作和專業知識涵蓋統計和 ML 的多個科學領域,包括文本分析、推薦系統、貝葉斯建模和預測。她曾在學術界工作,並在加州大學洛杉磯分校 (UCLA) 獲得生物統計學碩士和博士學位。通過她在學術界和行業的工作,她在著名的統計期刊和應用 ML 會議上發表了多篇論文。在空閒時間,她喜歡跑步和與家人共度時光。
新聞來源
本文由 AI 台灣 使用 AI 編撰,內容僅供參考,請自行進行事實查核。加入 AI TAIWAN Google News,隨時掌握最新 AI 資訊!