機器不會突然壞掉,總是會有一些徵兆。但是,問題是人類不一定能夠察覺到這些徵兆。這就是機器預測性維護的用武之地!這篇指南將帶你進入機器預測性維護的精彩世界,利用 AWS 和 MLOps 確保你的設備保持可靠。
學習目標
- 了解如何設計和實施一個完整的 MLOps 管道,用於預測性維護,包括數據獲取、模型訓練和部署。
- 學會整合 Docker、FastAPI 和 AWS 服務等必要工具,建立一個穩健的、可投入生產的機器學習應用程式。
- 探索使用 GitHub Actions 自動化 CI/CD 工作流程,確保代碼的平滑和可靠整合與部署。
- 設置最佳實踐來監控、追蹤性能和持續改進,以保持你的機器學習模型高效且可維護。
這篇文章是數據科學博客馬拉松的一部分。
問題:意外停機與維護成本
在工業環境中,設備的意外故障會導致停機和財務損失。在我們的專案中,我們使用 MLOps 最佳實踐和機器學習來及早檢測問題,從而能夠及時修理,減少中斷。
在深入實施之前,讓我們先仔細看看專案架構。
必要的前置條件
下面我們將首先查看所需的前置條件:
克隆資料庫:
git clone "https://github.com/karthikponna/Predictive_Maintenance_MLOps.git"
cd Predictive_Maintenance_MLOps
創建並激活虛擬環境:
# 對於 macOS 和 Linux:
python3 -m venv venv
source venv/bin/activate
# 對於 Windows:
python -m venv venv
.\venv\Scripts\activate
安裝所需的依賴項:
pip install -r requirements.txt
設置環境變量:
# 創建一個 `.env` 文件並添加你的 MongoDB 連接字符串:
MONGO_URI=your_mongodb_connection_string
專案結構
專案結構概述了專案的關鍵組件和組織,確保清晰和可維護性。它有助於理解不同模塊如何互動以及整體系統的設計。良好的結構簡化了開發、調試和擴展。
project_root/
│
├── .github/
│ └── workflows/
│ └── main.yml
│
├── data_schema/
│ └── schema.yaml
│
├── final_model/
│ ├── model.pkl
│ └── preprocessor.pkl
│
├── Machine_Predictive_Data/
│ └── predictive_maintenance.csv
│
├── machine_predictive_maintenance/
│ ├── cloud/
│ ├── components/
│ ├── constant/
│ ├── entity/
│ ├── exception/
│ ├── logging/
│ ├── pipeline/
│ ├── utils/
│ └── __init__.py
│
├── my_venv/
│
├── notebooks/
│ ├── EDA.ipynb
│ ├── prediction_output/
│
├── templates/
│ └── table.html
│
├── valid_data/
| └── test.csv
│
├── .env
├── .gitignore
├── app.py
├── Dockerfile
├── main.py
├── push_data.py
├── README.md
├── requirements.txt
├── setup.py
├── test_mongodb.py
數據獲取
在這個專案中,我們使用一個機器預測性維護的 CSV 文件,將其轉換為 JSON 記錄,並插入到 MongoDB 集合中。
數據集鏈接:Kaggle 數據集
以下是將 CSV 轉換為 JSON 記錄並插入 MongoDB 的代碼片段:
class PredictiveDataExtract():
def __init__(self):
"""
初始化 PredictiveDataExtract 類。
"""
try:
pass
except Exception as e:
raise MachinePredictiveMaintenanceException(e, sys)
def csv_to_json_convertor(self, file_path):
try:
data = pd.read_csv(file_path)
data.reset_index(drop=True, inplace=True)
records = list(json.loads(data.T.to_json()).values())
return records
except Exception as e:
raise MachinePredictiveMaintenanceException(e, sys)
def insert_data_mongodb(self, records, database, collection):
try:
self.records = records
self.database = database
self.collection = collection
self.mongo_client = pymongo.MongoClient(MONGO_DB_URL)
self.database = self.mongo_client[self.database]
self.collection = self.database[self.collection]
self.collection.insert_many(self.records)
return(len(self.records))
except Exception as e:
raise MachinePredictiveMaintenanceException(e, sys)
if __name__=="__main__":
FILE_PATH="Machine_Predictive_Data/predictive_maintenance.csv"
DATABASE="Predictive_Maintenance_MLOps"
collection = "Machine_Predictive_Data"
predictive_data_obj=PredictiveDataExtract()
records = predictive_data_obj.csv_to_json_convertor(FILE_PATH)
no_of_records = predictive_data_obj.insert_data_mongodb(records, DATABASE, collection)
print(no_of_records
以下是從 MongoDB 獲取數據、將數據分割為訓練和測試 CSV 文件並將其存儲為數據獲取產物的代碼片段:
class DataIngestion:
def __init__(self, data_ingestion_config:DataIngestionConfig):
"""
用提供的配置初始化 DataIngestion 類。
參數:
data_ingestion_config: DataIngestionConfig
包含數據獲取詳細信息的配置對象。
"""
try:
self.data_ingestion_config=data_ingestion_config
except Exception as e:
raise MachinePredictiveMaintenanceException(e, sys)
def export_collection_as_dataframe(self):
try:
database_name= self.data_ingestion_config.database_name
collection_name= self.data_ingestion_config.collection_name
self.mongo_client = pymongo.MongoClient(MONGO_DB_URL)
collection = self.mongo_client[database_name][collection_name]
df = pd.DataFrame(list(collection.find()))
if "_id" in df.columns.to_list():
df = df.drop(columns=["_id"], axis=1)
df.replace("na":np.nan,inplace=True)
return df
except Exception as e:
raise MachinePredictiveMaintenanceException(e, sys)
def export_data_into_feature_store(self,dataframe: pd.DataFrame):
try:
feature_store_file_path=self.data_ingestion_config.feature_store_file_path
# 創建文件夾
dir_path = os.path.dirname(feature_store_file_path)
os.makedirs(dir_path,exist_ok=True)
dataframe.to_csv(feature_store_file_path,index=False,header=True)
return dataframe
except Exception as e:
raise MachinePredictiveMaintenanceException(e,sys)
def split_data_as_train_test(self, dataframe:pd.DataFrame):
try:
train_set, test_set = train_test_split(
dataframe, test_size=self.data_ingestion_config.train_test_split_ratio
)
logging.info("對數據框進行了訓練測試分割")
logging.info(
"退出 Data_Ingestion 類的 split_data_as_train_test 方法"
)
dir_path = os.path.dirname(self.data_ingestion_config.training_file_path)
os.makedirs(dir_path, exist_ok=True)
logging.info(f"導出訓練和測試文件路徑。")
train_set.to_csv(
self.data_ingestion_config.training_file_path, index=False, header=True
)
test_set.to_csv(
self.data_ingestion_config.testing_file_path, index=False, header=True
)
logging.info(f"導出訓練和測試文件路徑。" )
except Exception as e:
raise MachinePredictiveMaintenanceException(e, sys)
def initiate_data_ingestion(self):
try:
dataframe = self.export_collection_as_dataframe()
dataframe = self.export_data_into_feature_store(dataframe)
self.split_data_as_train_test(dataframe)
dataingestionartifact= DataIngestionArtifact(trained_file_path=self.data_ingestion_config.training_file_path,
test_file_path=self.data_ingestion_config.testing_file_path)
return dataingestionartifact
except Exception as e:
raise MachinePredictiveMaintenanceException(e,sys)
數據驗證
在這一步,我們將檢查獲取的數據是否符合預期格式,確保所有必需的列都存在,並使用預定義的模式進行比較訓練和測試數據的差異。然後我們保存乾淨的數據並創建漂移報告,以便只有質量良好的數據用於下一步,即數據轉換。
數據驗證代碼片段:
class DataValidation:
def __init__(self, data_ingestion_artifact:DataIngestionArtifact,
data_validation_config: DataValidationConfig):
try:
self.data_ingestion_artifact= data_ingestion_artifact
self.data_validation_config= data_validation_config
self._schema_config = read_yaml_file(SCHEMA_FILE_PATH)
except Exception as e:
raise MachinePredictiveMaintenanceException(e, sys)
@staticmethod
def read_data(file_path) -> pd.DataFrame:
try:
return pd.read_csv(file_path)
except Exception as e:
raise MachinePredictiveMaintenanceException(e, sys)
def validate_number_of_columns(self, dataframe:pd.DataFrame)-> bool:
try:
number_of_columns = len(self._schema_config)
logging.info(f"所需的列數:number_of_columns")
logging.info(f"數據框的列數:len(dataframe.columns)")
if len(dataframe.columns) == number_of_columns:
return True
return False
except Exception as e:
raise MachinePredictiveMaintenanceException (e, sys)
def is_columns_exist(self, df:pd.DataFrame) -> bool:
try:
dataframe_columns = df.columns
missing_numerical_columns = []
missing_categorical_columns = []
for column in self._schema_config["numerical_columns"]:
if column not in dataframe_columns:
missing_numerical_columns.append(column)
if len(missing_numerical_columns) > 0:
logging.info(f"缺少數值列:missing_numerical_columns")
for column in self._schema_config["categorical_columns"]:
if column not in dataframe_columns:
missing_categorical_columns.append(column)
if len(missing_categorical_columns) > 0:
logging.info(f"缺少類別列:missing_categorical_columns")
return False if len(missing_categorical_columns)>0 or len(missing_numerical_columns)>0 else True
except Exception as e:
raise MachinePredictiveMaintenanceException(e, sys)
def detect_dataset_drift(self, base_df, current_df, threshold = 0.05) -> bool :
try:
status = True
report =
for column in base_df.columns:
d1 = base_df[column]
d2 = current_df[column]
is_same_dist = ks_2samp(d1, d2)
if threshold <= is_same_dist.pvalue:
is_found = False
else:
is_found = True
status = False
report.update(column:
"p_value": float(is_same_dist.pvalue),
"drift_status": is_found
)
drift_report_file_path = self.data_validation_config.drift_report_file_path
dir_path = os.path.dirname(drift_report_file_path)# 創建目錄
os.makedirs(dir_path, exist_ok=True)
write_yaml_file(file_path=drift_report_file_path, content=report)
except Exception as e:
raise MachinePredictiveMaintenanceException(e, sys)
def initiate_data_validation(self)-> DataValidationArtifact:
try:
validation_error_msg = ""
logging.info("開始數據驗證")
train_file_path = self.data_ingestion_artifact.trained_file_path
test_file_path = self.data_ingestion_artifact.test_file_path
# 從訓練和測試中讀取數據
train_dataframe = DataValidation.read_data(train_file_path)
test_dataframe = DataValidation.read_data(test_file_path)
# 驗證列數
status = self.validate_number_of_columns(dataframe=train_dataframe)
logging.info(f"訓練數據框中所有必需的列都存在:status")
if not status:
validation_error_msg += f"訓練數據框不包含所有列。\n"
status = self.validate_number_of_columns(dataframe=test_dataframe)
if not status:
validation_error_msg += f"測試數據框不包含所有列。\n"
status = self.is_columns_exist(df=train_dataframe)
if not status:
validation_error_msg += f"訓練數據框中缺少列。"
status = self.is_columns_exist(df=test_dataframe)
if not status:
validation_error_msg += f"測試數據框中缺少列。"
## 檢查數據漂移
status=self.detect_dataset_drift(base_df=train_dataframe,current_df=test_dataframe)
dir_path=os.path.dirname(self.data_validation_config.valid_train_file_path)
os.makedirs(dir_path,exist_ok=True)
train_dataframe.to_csv(
self.data_validation_config.valid_train_file_path, index=False, header=True
)
test_dataframe.to_csv(
self.data_validation_config.valid_test_file_path, index=False, header=True
)
data_validation_artifact = DataValidationArtifact(
validation_status=status,
valid_train_file_path=self.data_validation_config.valid_train_file_path,
valid_test_file_path=self.data_validation_config.valid_test_file_path,
invalid_train_file_path=None,
invalid_test_file_path=None,
drift_report_file_path=self.data_validation_config.drift_report_file_path,
)
return data_validation_artifact
except Exception as e:
raise MachinePredictiveMaintenanceException(e,sys)
以下是由數據驗證生成的漂移報告。
空氣溫度 [K]:
漂移狀態: true
p_value: 0.016622943467175914
故障類型:
漂移狀態: false
p_value: 1.0
過程溫度 [K]:
漂移狀態: false
p_value: 0.052940072765804994
產品 ID:
漂移狀態: false
p_value: 0.09120557172716418
轉速 [rpm]:
漂移狀態: false
p_value: 0.2673520066245566
目標:
漂移狀態: false
p_value: 0.999999998717466
工具磨損 [min]:
漂移狀態: false
p_value: 0.13090856779628832
扭矩 [Nm]:
漂移狀態: false
p_value: 0.5001773464540389
類型:
漂移狀態: false
p_value: 1.0
UDI:
漂移狀態: true
p_value: 0.022542489133976953
數據轉換
在這裡,我們將清理和轉換驗證過的數據,將溫度特徵從開爾文轉換為攝氏度,刪除不必要的列,並應用使用序數編碼和最小-最大縮放的轉換管道,並使用 SMOTEENN 處理數據不平衡。
轉換後的訓練和測試數據集將作為 .npy 文件保存,並連同序列化的預處理對象(即 Minmax scaler(preprocessing.pkl))一起封裝為產物,以便進一步的模型訓練。
class DataTransformation:
def __init__(self,data_validation_artifact: DataValidationArtifact,
data_transformation_config: DataTransformationConfig):
try:
self.data_validation_artifact: DataValidationArtifact = data_validation_artifact
self.data_transformation_config: DataTransformationConfig = data_transformation_config
self._schema_config = read_yaml_file(file_path=SCHEMA_FILE_PATH)
except Exception as e:
raise MachinePredictiveMaintenanceException(e,sys)
@staticmethod
def read_data(file_path) -> pd.DataFrame:
try:
return pd.read_csv(file_path)
except Exception as e:
raise MachinePredictiveMaintenanceException(e, sys)
def get_data_transformer_object(self):
try:
logging.info("從模式配置中獲取數值列")
scaler = MinMaxScaler()
# 從模式配置中獲取序數編碼的類別
ordinal_categories = self._schema_config.get('ordinal_categories', [])
ordinal_encoder = OrdinalEncoder(categories=ordinal_categories)
logging.info("初始化 MinMaxScaler,OrdinalEncoder 和類別")
ordinal_columns = self._schema_config['ordinal_columns']
scaling_features = self._schema_config['scaling_features']
preprocessor = ColumnTransformer(
[
("Ordinal_Encoder", ordinal_encoder, ordinal_columns),
("MinMaxScaling", scaler, scaling_features)
]
)
logging.info("從 ColumnTransformer 創建預處理器對象")
return preprocessor
except Exception as e:
raise MachinePredictiveMaintenanceException(e, sys)
def initiate_data_transformation(self) -> DataTransformationArtifact:
try:
logging.info("開始數據轉換")
preprocessor = self.get_data_transformer_object()
train_df = DataTransformation.read_data(self.data_validation_artifact.valid_train_file_path)
test_df = DataTransformation.read_data(self.data_validation_artifact.valid_test_file_path)
input_feature_train_df = train_df.drop(columns=[TARGET_COLUMN], axis=1)
target_feature_train_df = train_df[TARGET_COLUMN]
input_feature_train_df['空氣溫度 [c]'] = input_feature_train_df['空氣溫度 [K]'] - 273.15
input_feature_train_df['過程溫度 [c]'] = input_feature_train_df['過程溫度 [K]'] - 273.15
drop_cols = self._schema_config['drop_columns']
input_feature_train_df = drop_columns(df=input_feature_train_df, cols = drop_cols)
logging.info("完成訓練數據集的列刪除")
input_feature_test_df = test_df.drop(columns=[TARGET_COLUMN], axis=1)
target_feature_test_df = test_df[TARGET_COLUMN]
input_feature_test_df['空氣溫度 [c]'] = input_feature_test_df['空氣溫度 [K]'] - 273.15
input_feature_test_df['過程溫度 [c]'] = input_feature_test_df['過程溫度 [K]'] - 273.15
drop_cols = self._schema_config['drop_columns']
input_feature_test_df = drop_columns(df=input_feature_test_df, cols = drop_cols)
logging.info("完成測試數據集的列刪除")
input_feature_train_arr = preprocessor.fit_transform(input_feature_train_df)
input_feature_test_arr = preprocessor.transform(input_feature_test_df)
smt = SMOTEENN(sampling_strategy="minority")
input_feature_train_final, target_feature_train_final = smt.fit_resample(
input_feature_train_arr, target_feature_train_df
)
logging.info("對訓練數據集應用 SMOTEENN")
input_feature_test_final, target_feature_test_final = smt.fit_resample(
input_feature_test_arr, target_feature_test_df
)
train_arr = np.c_[
input_feature_train_final, np.array(target_feature_train_final)
]
test_arr = np.c_[
input_feature_test_final, np.array(target_feature_test_final)
]
save_numpy_array_data(self.data_transformation_config.transformed_train_file_path, array=train_arr, )
save_numpy_array_data(self.data_transformation_config.transformed_test_file_path,array=test_arr,)
save_object( self.data_transformation_config.transformed_object_file_path, preprocessor,)
save_object( "final_model/preprocessor.pkl", preprocessor,)
data_transformation_artifact=DataTransformationArtifact(
transformed_object_file_path=self.data_transformation_config.transformed_object_file_path,
transformed_train_file_path=self.data_transformation_config.transformed_train_file_path,
transformed_test_file_path=self.data_transformation_config.transformed_test_file_path
)
return data_transformation_artifact
except Exception as e:
raise MachinePredictiveMaintenanceException(e, sys)
訓練與評估
現在我們將在轉換後的訓練數據上訓練不同的分類模型,如決策樹、隨機森林、梯度提升、邏輯回歸和 AdaBoost 模型,並使用評估指標如 F1 分數、精確度和召回率來評估它們的性能,然後使用 MLflow 記錄這些詳細信息。
在選擇最佳模型後,它會將模型和轉換對象本地保存到產物中,並將評估指標用於部署。
class ModelTrainer:
def __init__(self, data_transformation_artifact: DataTransformationArtifact, model_trainer_config: ModelTrainerConfig):
try:
self.data_transformation_artifact = data_transformation_artifact
self.model_trainer_config = model_trainer_config
except Exception as e:
raise MachinePredictiveMaintenanceException(e, sys)
def track_mlflow(self, best_model, classification_metric, input_example):
"""
將模型和指標記錄到 MLflow。
參數:
best_model: 訓練過的模型對象。
classification_metric: 分類指標(F1、精確度、召回率)。
input_example: 模型的示例輸入數據樣本。
"""
try:
with mlflow.start_run():
f1_score=classification_metric.f1_score
precision_score=classification_metric.precision_score
recall_score=classification_metric.recall_score
mlflow.log_metric("f1_score",f1_score)
mlflow.log_metric("precision",precision_score)
mlflow.log_metric("recall_score",recall_score)
mlflow.sklearn.log_model(best_model,"model", input_example=input_example)
except Exception as e:
raise MachinePredictiveMaintenanceException(e, sys)
def train_model(self, X_train, y_train, X_test, y_test ):
"""
訓練多個模型並根據評估指標選擇最佳模型。
參數:
X_train: 訓練特徵。
y_train: 訓練標籤。
X_test: 測試特徵。
y_test: 測試標籤。
返回:
ModelTrainerArtifact: 包含有關訓練模型及其指標的產物。
"""
models =
"隨機森林": RandomForestClassifier(verbose=1),
"決策樹": DecisionTreeClassifier(),
"梯度提升": GradientBoostingClassifier(verbose=1),
"邏輯回歸": LogisticRegression(verbose=1),
"AdaBoost": AdaBoostClassifier(),
params=
"決策樹":
'criterion':['gini', 'entropy', 'log_loss'],
,
"隨機森林":
'n_estimators': [8,16,32,128,256]
,
"梯度提升":
'learning_rate':[.1,.01,.05,.001],
'subsample':[0.6,0.7,0.75,0.85,0.9],
'n_estimators': [8,16,32,64,128,256]
,
"邏輯回歸":,
"AdaBoost":
'learning_rate':[.1,.01,.001],
'n_estimators': [8,16,32,64,128,256]
model_report: dict = evaluate_models(X_train=X_train, y_train=y_train, X_test=X_test, y_test=y_test,
models=models, param=params)
best_model_score = max(sorted(model_report.values()))
logging.info(f"最佳模型分數: best_model_score")
best_model_name = list(model_report.keys())[
list(model_report.values()).index(best_model_score)
]
logging.info(f"最佳模型名稱: best_model_name")
best_model = models[best_model_name]
y_train_pred = best_model.predict(X_train)
classification_train_metric = get_classification_score(y_true=y_train, y_pred=y_train_pred)
input_example = X_train[:1]
print(input_example)
# 使用 mlflow 跟蹤實驗
self.track_mlflow(best_model, classification_train_metric, input_example)
y_test_pred=best_model.predict(X_test)
classification_test_metric = get_classification_score(y_true=y_test, y_pred=y_test_pred)
# 使用 mlflow 跟蹤實驗
self.track_mlflow(best_model, classification_test_metric, input_example)
preprocessor = load_object(file_path=self.data_transformation_artifact.transformed_object_file_path)
model_dir_path = os.path.dirname(self.model_trainer_config.trained_model_file_path)
os.makedirs(model_dir_path,exist_ok=True)
Machine_Predictive_Model = MachinePredictiveModel(model=best_model)
save_object(self.model_trainer_config.trained_model_file_path,obj=Machine_Predictive_Model)
save_object("final_model/model.pkl",best_model)
## 模型訓練產物
model_trainer_artifact=ModelTrainerArtifact(trained_model_file_path=self.model_trainer_config.trained_model_file_path,
train_metric_artifact=classification_train_metric,
test_metric_artifact=classification_test_metric
)
logging.info(f"模型訓練產物: model_trainer_artifact")
return model_trainer_artifact
def initiate_model_trainer(self) -> ModelTrainerArtifact:
try:
train_file_path = self.data_transformation_artifact.transformed_train_file_path
test_file_path = self.data_transformation_artifact.transformed_test_file_path
# 加載訓練數組和測試數組
train_arr = load_numpy_array_data(train_file_path)
test_arr = load_numpy_array_data(test_file_path)
X_train, y_train, X_test, y_test = (
train_arr[:, :-1],
train_arr[:, -1],
test_arr[:, :-1],
test_arr[:, -1],
)
model_trainer_artifact=self.train_model(X_train,y_train,X_test,y_test)
return model_trainer_artifact
except Exception as e:
raise MachinePredictiveMaintenanceException(e, sys)
現在,讓我們創建一個 training_pipeline.py 文件,將數據獲取、驗證、轉換和模型訓練的所有步驟順序整合成一個完整的管道。
def run_pipeline(self):
"""
執行整個訓練管道。
返回:
ModelTrainerArtifact: 包含有關訓練模型的元數據。
"""
try:
data_ingestion_artifact= self.data_ingestion()
data_validation_artifact= self.data_validation(data_ingestion_artifact=data_ingestion_artifact)
data_transformation_artifact= self.data_transformation(data_validation_artifact=data_validation_artifact)
model_trainer_artifact= self.model_trainer(data_transformation_artifact=data_transformation_artifact)
self.sync_artifact_dir_to_s3()
self.sync_saved_model_dir_to_s3()
return model_trainer_artifact
except Exception as e:
raise MachinePredictiveMaintenanceException(e,sys)
你可以直觀地看到我們是如何構建訓練管道的。
AWS 整合
在運行 training_pipeline.py 文件後,我們生成了產物並將其本地存儲。現在,我們將這些產物存儲在 AWS S3 存儲桶中,以實現基於雲的存儲和可訪問性。
使用 Docker 映像,我們將其通過 GitHub Actions 推送到 AWS ECR,然後使用 AWS EC2 部署到生產環境。我們將在接下來的部分中詳細討論這個過程。
AWS S3
按照以下步驟創建 AWS S3 存儲桶:
步驟1:下載 AWS CLI
你可以點擊這裡下載適用於 Windows、Linux 和 macOS 的 AWS CLI。
步驟2:登錄 AWS IAM 用戶
你可以使用你的 IAM 用戶憑證登錄控制台,並選擇適當的 AWS 區域(例如:us-east-1)。
登錄後,在 AWS 搜索欄中搜索「IAM」。導航到「用戶」,選擇你的用戶名,然後轉到「安全憑證」標籤。在「訪問密鑰」下,點擊「創建訪問密鑰」,選擇「CLI」,然後確認點擊「創建訪問密鑰」。
現在,打開你的終端並輸入 aws configure。在提示時輸入你的 AWS 訪問密鑰和秘密訪問密鑰,然後按 Enter。
你的 IAM 用戶已成功連接到你的專案。
步驟3:導航到 S3 服務
登錄後,在 AWS 搜索欄中搜索「S3」。
點擊 Amazon S3 服務並選擇「創建存儲桶」按鈕。接下來,輸入一個唯一的存儲桶名稱(例如:machinepredictive)。
檢查配置設置,然後點擊「創建存儲桶」。
你的存儲桶現在已創建,你可以開始上傳產物。現在,將以下代碼添加到你的 training_pipeline.py 文件中,然後再次運行它,以查看 AWS S3 存儲桶中的產物。
def sync_artifact_dir_to_s3(self):
"""
將產物目錄同步到 S3。
"""
try:
aws_bucket_url = f"s3://TRAINING_BUCKET_NAME/artifact/self.training_pipeline_config.timestamp"
self.s3_sync.sync_folder_to_s3(folder = self.training_pipeline_config.artifact_dir,aws_bucket_url=aws_bucket_url)
except Exception as e:
raise MachinePredictiveMaintenanceException(e,sys)
def sync_saved_model_dir_to_s3(self):
"""
將保存的模型目錄同步到 S3。
"""
try:
aws_bucket_url = f"s3://TRAINING_BUCKET_NAME/final_model/self.training_pipeline_config.timestamp"
self.s3_sync.sync_folder_to_s3(folder = self.training_pipeline_config.model_dir,aws_bucket_url=aws_bucket_url)
except Exception as e:
raise MachinePredictiveMaintenanceException(e,sys)
Amazon Elastic Container Registry (ECR)
按照以下步驟創建 ECR 存儲庫。
在 AWS 搜索欄中搜索「ECR」。
點擊「創建存儲庫」,並選擇「私有存儲庫」。
提供存儲庫名稱(例如:machinepredictive),然後點擊「創建」。
從你的 ECR 存儲庫中複製 URI,應該看起來像「788614365622.dkr.ecr.ap-southeast-2.amazonaws.com」,並將其保存到某個地方。我們稍後需要將其粘貼到 GitHub Secrets 中。
Docker 整合以進行部署
將我們的專案 Docker 化確保它在任何環境中平穩運行,而不會出現依賴問題。這是一個打包和輕鬆共享應用程式的必備工具。
保持 Docker 映像儘可能小是提高效率的重要因素。我們可以通過使用多階段技術和選擇輕量級的 Python 基礎映像來最小化其大小。
FROM python:3.10-slim-buster
WORKDIR /app
COPY . /app
RUN apt update -y && apt install awscli -y
RUN apt-get update && pip install -r requirements.txt
CMD ["python3", "app.py"]
設置 Action Secrets 和變量
為了安全地存儲敏感信息,如 AWS 憑證和存儲庫 URI,我們需要在我們的存儲庫中設置 GitHub Action Secrets。按照以下步驟操作:
步驟1:打開你的 GitHub 存儲庫
導航到你的 GitHub 存儲庫。
步驟2:轉到設置
在你的存儲庫頁面頂部,找到並點擊「設置」標籤。
步驟3:訪問 Secrets 和變量
在左側邊欄中,向下滾動到 Secrets 和變量 → 選擇 Actions。
步驟4:創建新密鑰
點擊「新建存儲庫密鑰」按鈕。
步驟5:添加 AWS 憑證
創建一個名為「AWS_ACCESS_KEY_ID」的密鑰,並粘貼你的 AWS 訪問密鑰。
創建另一個名為「AWS_SECRET_ACCESS_KEY」的密鑰,並粘貼你的 AWS 秘密訪問密鑰。
創建一個名為「AWS_REGION」的密鑰,並粘貼你選擇的區域(例如:us-east-1)。
步驟6:添加 AWS ECR 存儲庫 URI
複製你的 ECR 存儲庫 URI(例如:788614365622.dkr.ecr.ap-southeast-2.amazonaws.com)。
創建一個名為「AWS_ECR_LOGIN_URI」的新密鑰,並粘貼複製的 URI。
創建一個名為「ECR_REPOSITORY_NAME」的新密鑰,並粘貼 ECR 存儲庫的名稱,例如:machinepredictive。
AWS EC2
現在讓我們了解如何使用 AWS EC2 創建實例。按照以下步驟操作。
導航到 AWS 管理控制台中的 EC2 服務並點擊「啟動實例」。
為你的實例命名(例如:machinepredictive),並選擇 Ubuntu 作為操作系統。
選擇實例類型為 t2.large。
選擇你的默認密鑰對。
在網絡安全下,選擇默認 VPC,並根據需要配置安全組。
最後,點擊「啟動實例」。
AWS EC2 CLI
創建名為 machinepredictive 的實例後,選擇其實例 ID 並點擊「連接」。然後,在 EC2 實例連接下,點擊「再次連接」。這將打開 AWS CLI 界面,你可以在其中運行必要的命令。
現在,逐一在 CLI 中輸入這些命令以在你的 EC2 實例上設置 Docker:
sudo apt-get update -y
sudo apt-get upgrade
curl -fsSL https://get.docker.com -o get-docker.sh
sudo sh get-docker.sh
sudo usermod -aG docker ubuntu
newgrp docker
保持你的 CLI 會話打開。接下來,導航到你的 GitHub 存儲庫,點擊「設置」,然後轉到「操作」部分,在「運行器」下,點擊「新建自託管運行器」。選擇 Linux 運行器映像。現在,逐一在你的 CLI 中運行以下命令以下載和配置你的自託管運行器:
# 創建一個文件夾並進入其中
mkdir actions-runner && cd actions-runner
# 下載最新的運行器包
curl -o actions-runner-linux-x64-2.322.0.tar.gz -L https://github.com/actions/runner/releases/download/v2.322.0/actions-runner-linux-x64-2.322.0.tar.gz
# 可選:驗證哈希
echo "b13b784808359f31bc79b08a191f5f83757852957dd8fe3dbfcc38202ccf5768 actions-runner-linux-x64-2.322.0.tar.gz" | shasum -a 256 -c
# 解壓安裝程序
tar xzf ./actions-runner-linux-x64-2.322.0.tar.gz
# 配置運行器
./config.sh --url https://github.com/karthikponna/Predictive_Maintenance_MLOps --token "在此處粘貼你的令牌"
# 運行運行器
./run.sh
這將下載、配置並啟動你的自託管 GitHub Actions 運行器在你的 EC2 實例上。
設置自託管 GitHub Actions 運行器後,CLI 會提示你輸入運行器的名稱。輸入 self-hosted 並按 Enter。
使用 GitHub Actions 進行 CI/CD
你可以查看 .github/workflows/main.yml 代碼文件。
現在,讓我們深入了解這個 main.yml 文件的每個部分的作用。
- 持續集成工作:此工作在 Ubuntu 運行器上運行,以檢出代碼、進行代碼檢查並執行單元測試。
- 持續交付工作:在 CI 之後觸發,該工作安裝實用程序,配置 AWS 憑證,登錄 Amazon ECR,並構建、標記和推送你的 Docker 映像到 ECR。
- 持續部署工作:在自託管運行器上運行,該工作從 ECR 中提取最新的 Docker 映像,將其作為容器運行以服務用戶,並清理任何先前的映像或容器。
一旦持續部署工作成功完成,你將在 AWS CLI 中看到類似這樣的輸出。
打開你的 AWS EC2 實例,點擊其實例 ID。驗證實例狀態為「運行中」,並找到公共 IPv4 DNS。點擊「打開地址」選項,自動在瀏覽器中啟動你的 FastAPI 應用程序。現在,讓我們深入了解 FastAPI。
FastAPI
你可以查看 app.py 文件。
在這個 FastAPI 應用中,我創建了兩個主要路由,一個用於訓練模型,另一個用於生成預測。讓我們詳細探討每個路由。
訓練路由
/train 端點開始使用預定義數據集的模型訓練過程,使其易於更新和改進模型。這對於重新訓練模型以提高準確性或納入新數據特別有用,確保模型保持最新並達到最佳性能。
預測路由
/predict 端點通過 POST 接受 CSV 文件。該端點處理傳入數據,利用訓練好的模型生成預測,然後返回格式化為 JSON 的結果。這條路由非常適合將模型應用於新數據集,使其在大規模預測任務中高效。
在 /predict 路由中,我們包含了一個示例 test.csv 文件;你可以在這裡下載它。
結論
我們一起構建了一個完整的生產就緒的預測維護 MLOps 專案——從收集和預處理數據到訓練、評估和使用 Docker、AWS 和 FastAPI 部署我們的模型。這個專案展示了 MLOps 如何縮短開發和生產之間的距離,使構建穩健、可擴展的解決方案變得更容易。
記住,這個指南是關於學習和將這些技術應用於你的數據科學專案。不要猶豫去嘗試、創新和探索新的增強功能。感謝你一直陪伴到最後——繼續學習,繼續實踐,繼續成長!
GitHub 倉庫:https://github.com/karthikponna/Predictive_Maintenance_MLOps
關鍵要點
- 我們建立了一個端到端的 MLOps 管道,涵蓋數據獲取、模型訓練和部署。
- Docker、AWS 和 FastAPI 無縫協作,實現從開發到生產的轉變。
- 將我們的 ML 專案 Docker 化是關鍵——它確保在任何環境中平穩運行,沒有依賴問題。
- 持續部署確保模型在現實應用中保持高效和最新。
常見問題
A. Docker 確保我們的 ML 專案在任何環境中平穩運行,消除依賴問題。
A. AWS 服務如 EC2、S3 和 ECR 使我們的應用程序無縫部署、存儲和擴展。
A. MLflow 通過提供跟蹤實驗、版本控制模型和部署的工具,使機器學習開發變得更容易。
A. GitHub Actions 自動化 CI/CD 流程——運行測試、構建 Docker 映像和部署更新——確保從開發到生產的平滑過渡。
本文中顯示的媒體不屬於 Analytics Vidhya,並由作者自行決定使用。
本文由 AI 台灣 運用 AI 技術編撰,內容僅供參考,請自行核實相關資訊。
歡迎加入我們的 AI TAIWAN 台灣人工智慧中心 FB 社團,
隨時掌握最新 AI 動態與實用資訊!