決策樹演算法一直讓我感到著迷。它們容易實現,並在各種分類和回歸任務中取得良好結果。結合提升技術,決策樹在許多應用中仍然是最先進的技術。
像是 sklearn、Lightgbm、xgboost 和 catboost 等框架到目前為止都做得非常好。然而,在過去幾個月中,我發現缺乏對箭頭數據集的支持。雖然 lightgbm 最近已經增加了對此的支持,但在大多數其他框架中仍然缺乏。箭頭數據格式可能非常適合決策樹,因為它具有優化的列結構,能有效處理數據。Pandas 已經增加了對此的支持,Polars 也利用了這些優勢。
Polars 在大多數其他數據框架中顯示出顯著的性能優勢。它有效地使用數據,避免不必要的數據複製。它還提供了一個流式引擎,允許處理比內存更大的數據。因此,我決定使用 Polars 作為從頭開始構建決策樹的後端。
目標是探索使用 Polars 進行決策樹的優勢,尤其是在內存和運行時間方面。當然,還有學習更多關於 Polars 的知識,如何有效地定義表達式,以及流式引擎的使用。
實現的代碼可以在這個 repository 中找到。
代碼概述
為了對代碼有初步的了解,我將首先展示 DecisionTreeClassifier 的結構:
第一個重要的事情可以在導入中看到。我希望保持導入部分的整潔,並且依賴項越少越好。這成功地只依賴於 Polars、pickle 和 typing。
init 方法允許定義是否應該使用 Polars 流式引擎。此外,這裡還可以設置樹的最大深度。另一個特徵是在定義類別列時,這些列使用目標編碼的方式處理,與數值特徵不同。
可以保存和加載決策樹模型。它以嵌套字典的形式表示,可以作為 pickle 檔案保存到磁碟。
Polars 的魔法發生在 fit() 和 build_tree() 方法中。這些方法接受 LazyFrames 和 DataFrames,支持內存處理和流式處理。
有兩個預測方法可用,predict() 和 predict_many()。predict() 方法可以用於小的示例數據,數據需要以字典的形式提供。如果我們有一個大的測試集,使用 predict_many() 方法會更有效率。在這裡,數據可以作為 Polars DataFrame 或 LazyFrame 提供。
“`python
import pickle
from typing import Iterable, List, Union
import polars as pl
class DecisionTreeClassifier:
def __init__(self, streaming=False, max_depth=None, categorical_columns=None):
…
def save_model(self, path: str) -> None:
…
def load_model(self, path: str) -> None:
…
def apply_categorical_mappings(self, data: Union[pl.DataFrame, pl.LazyFrame]) -> Union[pl.DataFrame, pl.LazyFrame]:
…
def fit(self, data: Union[pl.DataFrame, pl.LazyFrame], target_name: str) -> None:
…
def predict_many(self, data: Union[pl.DataFrame, pl.LazyFrame]) -> List[Union[int, float]]:
…
def predict(self, data: Iterable[dict]):
…
def get_majority_class(self, df: Union[pl.DataFrame, pl.LazyFrame], target_name: str) -> str:
…
def _build_tree(
self,
data: Union[pl.DataFrame, pl.LazyFrame],
feature_names: list[str],
target_name: str,
unique_targets: list[int],
depth: int,
) -> dict:
…
“`
訓練樹
要訓練決策樹分類器,需要使用 fit() 方法。
“`python
def fit(self, data: Union[pl.DataFrame, pl.LazyFrame], target_name: str) -> None:
“””
Fit 方法用於訓練決策樹。
:param data: 包含訓練數據的 Polars DataFrame 或 LazyFrame。
:param target_name: 目標列的名稱
“””
columns = data.collect_schema().names()
feature_names = [col for col in columns if col != target_name]
# 縮小數據類型
data = data.select(pl.all().shrink_dtype()).with_columns(
pl.col(target_name).cast(pl.UInt64).shrink_dtype().alias(target_name)
)
# 準備使用目標編碼的類別列
if self.categorical_columns:
categorical_mappings = {}
for categorical_column in self.categorical_columns:
categorical_mappings[categorical_column] = {
value: index
for index, value in enumerate(
data.lazy()
.group_by(categorical_column)
.agg(pl.col(target_name).mean().alias(“avg”))
.sort(“avg”)
.collect(streaming=self.streaming)[categorical_column]
)
}
self.categorical_mappings = categorical_mappings
data = self.apply_categorical_mappings(data)
unique_targets = data.select(target_name).unique()
if isinstance(unique_targets, pl.LazyFrame):
unique_targets = unique_targets.collect(streaming=self.streaming)
unique_targets = unique_targets[target_name].to_list()
self.tree = self._build_tree(data, feature_names, target_name, unique_targets, depth=0)
“`
它接收一個 Polars LazyFrame 或 DataFrame,包含所有特徵和目標列。要識別目標列,需要提供 target_name。
Polars 提供了一種方便的方法來優化數據的內存使用。
“`python
data.select(pl.all().shrink_dtype())
“`
這樣,所有列都被選擇並評估。它會將數據類型轉換為最小可能的值。
類別編碼
為了編碼類別值,使用目標編碼。為此,所有類別特徵的實例將被聚合,並計算平均目標值。然後,根據平均目標值對實例進行排序,並分配一個排名。這個排名將用作特徵值的表示。
“`python
(
data.lazy()
.group_by(categorical_column)
.agg(pl.col(target_name).mean().alias(“avg”))
.sort(“avg”)
.collect(streaming=self.streaming)[categorical_column]
)
“`
因為可以提供 Polars DataFrames 和 LazyFrames,所以我首先使用 data.lazy()。如果給定的數據是 DataFrame,則會轉換為 LazyFrame。如果已經是 LazyFrame,則僅返回自身。這樣可以確保對 LazyFrames 和 DataFrames 的數據以相同的方式進行處理,並且可以使用 collect() 方法,這僅對 LazyFrames 可用。
為了說明在擬合過程中不同步驟計算的結果,我將其應用於心臟病預測的數據集。該數據集可以在 Kaggle 上找到,並在數據庫內容許可下發布。
以下是葡萄糖水平的類別特徵表示的示例:
“`
┌──────┬──────┬──────────┐
│ rank ┆ gluc ┆ avg │
│ — ┆ — ┆ — │
│ u32 ┆ i8 ┆ f64 │
╞══════╪══════╪══════════╡
│ 0 ┆ 1 ┆ 0.476139 │
│ 1 ┆ 2 ┆ 0.586319 │
│ 2 ┆ 3 ┆ 0.620972 │
└──────┴──────┴──────────┘
“`
對於每個葡萄糖水平,計算患有心臟病的概率。這些數據被排序並排名,以便每個水平都映射到一個排名值。
獲取目標值
作為 fit() 方法的最後一部分,確定唯一的目標值。
“`python
unique_targets = data.select(target_name).unique()
if isinstance(unique_targets, pl.LazyFrame):
unique_targets = unique_targets.collect(streaming=self.streaming)
unique_targets = unique_targets[target_name].to_list()
self.tree = self._build_tree(data, feature_names, target_name, unique_targets, depth=0)
“`
這是調用 _build_tree() 方法進行遞歸之前的最後準備。
構建樹
在 fit() 方法中準備好數據後,將調用 _build_tree() 方法。這是遞歸進行的,直到滿足停止條件,例如達到樹的最大深度。第一次調用是從 fit() 方法執行的,深度為零。
“`python
def _build_tree(
self,
data: Union[pl.DataFrame, pl.LazyFrame],
feature_names: list[str],
target_name: str,
unique_targets: list[int],
depth: int,
) -> dict:
“””
遞歸構建決策樹。
如果達到最大深度,則返回一個葉子節點,並顯示主要類別。
否則,找到最佳分割並為左右子節點創建內部節點。
:param data: 要評估的數據框。
:param feature_names: 特徵列的名稱。
:param target_name: 目標列的名稱。
:param unique_targets: 唯一的目標值。
:param depth: 樹的當前深度。
:return: 表示節點的字典。
“””
if self.max_depth is not None and depth >= self.max_depth:
return {“type”: “leaf”, “value”: self.get_majority_class(data, target_name)}
# 這裡使數據變為懶加載,以避免在每次循環迭代中評估。
data = data.lazy()
# 評估每個特徵的熵:
information_gain_dfs = []
for feature_name in feature_names:
feature_data = data.select([feature_name, target_name]).filter(pl.col(feature_name).is_not_null())
feature_data = feature_data.rename({feature_name: “feature_value”})
# 尚未進行流式處理
information_gain_df = (
feature_data.group_by(“feature_value”)
.agg(
[
pl.col(target_name)
.filter(pl.col(target_name) == target_value)
.len()
.alias(f”class_{target_value}_count”)
for target_value in unique_targets
]
+ [pl.col(target_name).len().alias(“count_examples”)]
)
.sort(“feature_value”)
.select(
[
pl.col(f”class_{target_value}_count”).cum_sum().alias(f”cum_sum_class_{target_value}_count”)
for target_value in unique_targets
]
+ [
pl.col(f”class_{target_value}_count”).sum().alias(f”sum_class_{target_value}_count”)
for target_value in unique_targets
]
+ [
pl.col(“count_examples”).cum_sum().alias(“cum_sum_count_examples”),
pl.col(“count_examples”).sum().alias(“sum_count_examples”),
]
+ [
# 來自之前的選擇
pl.col(“feature_value”),
]
)
.filter(
# 至少有一個示例可用
pl.col(“sum_count_examples”) > pl.col(“cum_sum_count_examples”)
)
.select(
[
(pl.col(f”cum_sum_class_{target_value}_count”) / pl.col(“cum_sum_count_examples”)).alias(
f”left_proportion_class_{target_value}”
)
for target_value in unique_targets
]
+ [
(
(pl.col(f”sum_class_{target_value}_count”) – pl.col(f”cum_sum_class_{target_value}_count”))
/ (pl.col(“sum_count_examples”) – pl.col(“cum_sum_count_examples”))
).alias(f”right_proportion_class_{target_value}”)
for target_value in unique_targets
]
+ [
(pl.col(f”sum_class_{target_value}_count”) / pl.col(“sum_count_examples”)).alias(
f”parent_proportion_class_{target_value}”
)
for target_value in unique_targets
]
+ [
# 來自之前的選擇
pl.col(“cum_sum_count_examples”),
pl.col(“sum_count_examples”),
pl.col(“feature_value”),
]
)
.select(
(
-1
* pl.sum_horizontal(
[
(
pl.col(f”left_proportion_class_{target_value}”)
* pl.col(f”left_proportion_class_{target_value}”).log(base=2)
).fill_nan(0.0)
for target_value in unique_targets
]
)
).alias(“left_entropy”),
(
-1
* pl.sum_horizontal(
[
(
pl.col(f”right_proportion_class_{target_value}”)
* pl.col(f”right_proportion_class_{target_value}”).log(base=2)
).fill_nan(0.0)
for target_value in unique_targets
]
)
).alias(“right_entropy”),
(
-1
* pl.sum_horizontal(
[
(
pl.col(f”parent_proportion_class_{target_value}”)
* pl.col(f”parent_proportion_class_{target_value}”).log(base=2)
).fill_nan(0.0)
for target_value in unique_targets
]
)
).alias(“parent_entropy”),
# 來自之前的選擇
pl.col(“cum_sum_count_examples”),
pl.col(“sum_count_examples”),
pl.col(“feature_value”),
)
.select(
(
pl.col(“cum_sum_count_examples”) / pl.col(“sum_count_examples”) * pl.col(“left_entropy”)
+ (pl.col(“sum_count_examples”) – pl.col(“cum_sum_count_examples”))
/ pl.col(“sum_count_examples”)
* pl.col(“right_entropy”)
).alias(“child_entropy”),
# 來自之前的選擇
pl.col(“parent_entropy”),
pl.col(“feature_value”),
)
.select(
(pl.col(“parent_entropy”) – pl.col(“child_entropy”)).alias(“information_gain”),
# 來自之前的選擇
pl.col(“parent_entropy”),
pl.col(“feature_value”),
)
.filter(pl.col(“information_gain”).is_not_nan())
.sort(“information_gain”, descending=True)
.head(1)
.with_columns(feature=pl.lit(feature_name))
)
information_gain_dfs.append(information_gain_df)
if isinstance(information_gain_dfs[0], pl.LazyFrame):
information_gain_dfs = pl.collect_all(information_gain_dfs, streaming=self.streaming)
information_gain_dfs = pl.concat(information_gain_dfs, how=”vertical_relaxed”).sort(
“information_gain”, descending=True
)
information_gain = 0
if len(information_gain_dfs) > 0:
best_params = information_gain_dfs.row(0, named=True)
information_gain = best_params[“information_gain”]
if information_gain > 0:
left_mask = data.select(filter=pl.col(best_params[“feature”]) <= best_params["feature_value"])
if isinstance(left_mask, pl.LazyFrame):
left_mask = left_mask.collect(streaming=self.streaming)
left_mask = left_mask[“filter”]
# 分割數據
left_df = data.filter(left_mask)
right_df = data.filter(~left_mask)
left_subtree = self._build_tree(left_df, feature_names, target_name, unique_targets, depth + 1)
right_subtree = self._build_tree(right_df, feature_names, target_name, unique_targets, depth + 1)
if isinstance(data, pl.LazyFrame):
target_distribution = (
data.select(target_name)
.collect(streaming=self.streaming)[target_name]
.value_counts()
.sort(target_name)[“count”]
.to_list()
)
else:
target_distribution = data[target_name].value_counts().sort(target_name)[“count”].to_list()
return {
“type”: “node”,
“feature”: best_params[“feature”],
“threshold”: best_params[“feature_value”],
“information_gain”: best_params[“information_gain”],
“entropy”: best_params[“parent_entropy”],
“target_distribution”: target_distribution,
“left”: left_subtree,
“right”: right_subtree,
}
else:
return {“type”: “leaf”, “value”: self.get_majority_class(data, target_name)}
“`
這個方法是構建樹的核心,我將逐步解釋。首先,當進入該方法時,檢查是否滿足最大深度的停止條件。
“`python
if self.max_depth is not None and depth >= self.max_depth:
return {“type”: “leaf”, “value”: self.get_majority_class(data, target_name)}
“`
如果當前深度等於或大於 max_depth,則返回一個葉子類型的節點。葉子的值對應於數據的主要類別。這樣計算如下:
“`python
def get_majority_class(self, df: Union[pl.DataFrame, pl.LazyFrame], target_name: str) -> str:
“””
返回數據框的主要類別。
:param df: 要評估的數據框。
:param target_name: 目標列的名稱。
:return: 主要類別。
“””
majority_class = df.group_by(target_name).len().filter(pl.col(“len”) == pl.col(“len”).max()).select(target_name)
if isinstance(majority_class, pl.LazyFrame):
majority_class = majority_class.collect(streaming=self.streaming)
return majority_class[target_name][0]
“`
為了獲得主要類別,通過在目標列上進行分組並使用 len() 聚合來確定每個目標的行數。返回在大多數行中存在的目標實例作為主要類別。
信息增益作為分割標準
為了找到數據的良好分割,使用信息增益。
要獲得信息增益,需要計算父熵和子熵。

在 Polars 中計算信息增益
信息增益是針對特徵列中存在的每個特徵值計算的。
“`python
information_gain_df = (
feature_data.group_by(“feature_value”)
.agg(
[
pl.col(target_name)
.filter(pl.col(target_name) == target_value)
.len()
.alias(f”class_{target_value}_count”)
for target_value in unique_targets
]
+ [pl.col(target_name).len().alias(“count_examples”)]
)
.sort(“feature_value”)
“`
特徵值被分組,並將每個目標值的計數分配給它。此外,該特徵值的行總數也保存為 count_examples。在最後一步中,數據按 feature_value 排序。這是為了在下一步計算分割。
對於心臟病數據集,經過第一次計算步驟後,數據看起來像這樣:
“`
┌───────────────┬───────────────┬───────────────┬────────────────┐
│ feature_value ┆ class_0_count ┆ class_1_count ┆ count_examples │
│ — ┆ — ┆ — ┆ — │
│ i8 ┆ u32 ┆ u32 ┆ u32 │
╞═══════════════╪═══════════════╪═══════════════╪════════════════╡
│ 29 ┆ 2 ┆ 0 ┆ 2 │
│ 30 ┆ 1 ┆ 0 ┆ 1 │
│ 39 ┆ 1068 ┆ 331 ┆ 1399 │
│ 40 ┆ 975 ┆ 263 ┆ 1238 │
│ 41 ┆ 1052 ┆ 438 ┆ 1490 │
│ … ┆ … ┆ … ┆ … │
│ 60 ┆ 1054 ┆ 1460 ┆ 2514 │
│ 61 ┆ 695 ┆ 1408 ┆ 2103 │
│ 62 ┆ 566 ┆ 1125 ┆ 1691 │
│ 63 ┆ 572 ┆ 1517 ┆ 2089 │
│ 64 ┆ 479 ┆ 1217 ┆ 1696 │
└───────────────┴───────────────┴───────────────┴────────────────┘
“`
這裡,特徵年齡以年為單位進行處理。類別 0 代表“沒有心臟病”,類別 1 代表“心臟病”。數據按年齡特徵排序,列中包含類別 0、類別 1 的計數,以及具有相應特徵值的示例的總計數。
在下一步中,計算每個特徵值的類別計數的累積和。
“`python
.select(
[
pl.col(f”class_{target_value}_count”).cum_sum().alias(f”cum_sum_class_{target_value}_count”)
for target_value in unique_targets
]
+ [
pl.col(f”class_{target_value}_count”).sum().alias(f”sum_class_{target_value}_count”)
for target_value in unique_targets
]
+ [
pl.col(“count_examples”).cum_sum().alias(“cum_sum_count_examples”),
pl.col(“count_examples”).sum().alias(“sum_count_examples”),
]
+ [
# 來自之前的選擇
pl.col(“feature_value”),
]
)
.filter(
# 至少有一個示例可用
pl.col(“sum_count_examples”) > pl.col(“cum_sum_count_examples”)
)
“`
這樣做的直覺是,當在特定特徵值上執行分割時,它包括來自較小特徵值的目標值計數。為了能計算比例,計算目標值的總和。相同的過程也重複用於 count_examples,計算累積和和總和。
計算後,數據看起來像這樣:
“`
┌──────────────┬─────────────┬─────────────┬─────────────┬─────────────┬─────────────┬─────────────┐
│ cum_sum_clas ┆ cum_sum_cla ┆ sum_class_0 ┆ sum_class_1 ┆ cum_sum_cou ┆ sum_count_e ┆ feature_val │
│ s_0_count ┆ ss_1_count ┆ _count ┆ _count ┆ nt_examples ┆ xamples ┆ ue │
│ — ┆ — ┆ — ┆ — ┆ — ┆ — ┆ — │
│ u32 ┆ u32 ┆ u32 ┆ u32 ┆ u32 ┆ u32 ┆ i8 │
╞══════════════╪═════════════╪═════════════╪═════════════╪═════════════╪═════════════╪═════════════╡
│ 3 ┆ 0 ┆ 27717 ┆ 26847 ┆ 3 ┆ 54564 ┆ 29 │
│ 4 ┆ 0 ┆ 27717 ┆ 26847 ┆ 4 ┆ 54564 ┆ 30 │
│ 1097 ┆ 324 ┆ 27717 ┆ 26847 ┆ 1421 ┆ 54564 ┆ 39 │
│ 2090 ┆ 595 ┆ 27717 ┆ 26847 ┆ 2685 ┆ 54564 ┆ 40 │
│ 3155 ┆ 1025 ┆ 27717 ┆ 26847 ┆ 4180 ┆ 54564 ┆ 41 │
│ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … │
│ 24302 ┆ 20162 ┆ 27717 ┆ 26847 ┆ 44464 ┆ 54564 ┆ 59 │
│ 25356 ┆ 21581 ┆ 27717 ┆ 26847 ┆ 46937 ┆ 54564 ┆ 60 │
│ 26046 ┆ 23020 ┆ 27717 ┆ 26847 ┆ 49066 ┆ 54564 ┆ 61 │
│ 26615 ┆ 24131 ┆ 27717 ┆ 26847 ┆ 50746 ┆ 54564 ┆ 62 │
│ 27216 ┆ 25652 ┆ 27717 ┆ 26847 ┆ 52868 ┆ 54564 ┆ 63 │
└──────────────┴─────────────┴─────────────┴─────────────┴─────────────┴─────────────┴─────────────┘
“`
在下一步中,計算每個特徵值的比例。
“`python
.select(
[
(pl.col(f”cum_sum_class_{target_value}_count”) / pl.col(“cum_sum_count_examples”)).alias(
f”left_proportion_class_{target_value}”
)
for target_value in unique_targets
]
+ [
(
(pl.col(f”sum_class_{target_value}_count”) – pl.col(f”cum_sum_class_{target_value}_count”))
/ (pl.col(“sum_count_examples”) – pl.col(“cum_sum_count_examples”))
).alias(f”right_proportion_class_{target_value}”)
for target_value in unique_targets
]
+ [
(pl.col(f”sum_class_{target_value}_count”) / pl.col(“sum_count_examples”)).alias(
f”parent_proportion_class_{target_value}”
)
for target_value in unique_targets
]
+ [
# 來自之前的選擇
pl.col(“cum_sum_count_examples”),
pl.col(“sum_count_examples”),
pl.col(“feature_value”),
]
)
“`
為了計算比例,可以使用前一步的結果。對於左側比例,將每個目標值的累積和除以示例計數的累積和。對於右側比例,我們需要知道每個目標值的右側示例數量。這是通過從目標值的累積和中減去目標值的總和來計算的。相同的計算也用於確定右側的示例計數,通過從示例計數的總和中減去累積和來計算。此外,還計算父比例。這是通過將目標值計數的總和除以示例的總計數來完成的。
這是這一步驟後的結果數據:
“`
┌───────────┬───────────┬───────────┬───────────┬───┬───────────┬───────────┬───────────┬──────────┐
│ left_prop ┆ left_prop ┆ right_pro ┆ right_pro ┆ … ┆ parent_pr ┆ cum_sum_c ┆ sum_count ┆ feature_ │
│ ortion_cl ┆ ortion_cl ┆ portion_c ┆ portion_c ┆ ┆ oportion_ ┆ ount_exam ┆ _examples ┆ value │
│ — ┆ — ┆ — ┆ — ┆ ┆ — ┆ — ┆ u32 ┆ — │
│ f64 ┆ f64 ┆ f64 ┆ f64 ┆ ┆ f64 ┆ u32 ┆ ┆ │
╞═══════════╪═══════════╪═══════════╪═══════════╪═══╪═══════════╪═══════════╪═══════════╪══════════╡
│ 1.0 ┆ 0.0 ┆ 0.506259 ┆ 0.493741 ┆ … ┆ 0.493714 ┆ 3 ┆ 54564 ┆ 29 │
│ 1.0 ┆ 0.0 ┆ 0.50625 ┆ 0.49375 ┆ … ┆ 0.493714 ┆ 4 ┆ 54564 ┆ 30 │
│ 0.754902 ┆ 0.245098 ┆ 0.499605 ┆ 0.500395 ┆ … ┆ 0.493714 ┆ 1428 ┆ 54564 ┆ 39 │
│ 0.765596 ┆ 0.234404 ┆ 0.492739 ┆ 0.507261 ┆ … ┆ 0.493714 ┆ 2709 ┆ 54564 ┆ 40 │
│ 0.741679 ┆ 0.258321 ┆ 0.486929 ┆ 0.513071 ┆ … ┆ 0.493714 ┆ 4146 ┆ 54564 ┆ 41 │
│ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … │
│ 0.545735 ┆ 0.454265 ┆ 0.333563 ┆ 0.666437 ┆ … ┆ 0.493714 ┆ 44419 ┆ 54564 ┆ 59 │
│ 0.539065 ┆ 0.460935 ┆ 0.305025 ┆ 0.694975 ┆ … ┆ 0.493714 ┆ 46922 ┆ 54564 ┆ 60 │
│ 0.529725 ┆ 0.470275 ┆ 0.297071 ┆ 0.702929 ┆ … ┆ 0.493714 ┆ 49067 ┆ 54564 ┆ 61 │
│ 0.523006 ┆ 0.476994 ┆ 0.282551 ┆ 0.717449 ┆ … ┆ 0.493714 ┆ 50770 ┆ 54564 ┆ 62 │
│ 0.513063 ┆ 0.486937 ┆ 0.296188 ┆ 0.703812 ┆ … ┆ 0.493714 ┆ 52859 ┆ 54564 ┆ 63 │
└───────────┴───────────┴───────────┴───────────┴───┴───────────┴───────────┴───────────┴──────────┘
“`
現在有了比例,熵可以計算。
“`python
.select(
(
-1
* pl.sum_horizontal(
[
(
pl.col(f”left_proportion_class_{target_value}”)
* pl.col(f”left_proportion_class_{target_value}”).log(base=2)
).fill_nan(0.0)
for target_value in unique_targets
]
)
).alias(“left_entropy”),
(
-1
* pl.sum_horizontal(
[
(
pl.col(f”right_proportion_class_{target_value}”)
* pl.col(f”right_proportion_class_{target_value}”).log(base=2)
).fill_nan(0.0)
for target_value in unique_targets
]
)
).alias(“right_entropy”),
(
-1
* pl.sum_horizontal(
[
(
pl.col(f”parent_proportion_class_{target_value}”)
* pl.col(f”parent_proportion_class_{target_value}”).log(base=2)
).fill_nan(0.0)
for target_value in unique_targets
]
)
).alias(“parent_entropy”),
# 來自之前的選擇
pl.col(“cum_sum_count_examples”),
pl.col(“sum_count_examples”),
pl.col(“feature_value”),
)
“`
在熵的計算中使用方程式 2。左熵使用左比例計算,右熵使用右比例。對於父熵,使用父比例。在這個實現中,使用 pl.sum_horizontal() 計算比例的總和,以利用 Polars 的可能優化。這也可以用 Python 原生的 sum() 方法替代。
包含熵值的數據看起來如下:
“`
┌──────────────┬───────────────┬────────────────┬─────────────────┬────────────────┬───────────────┐
│ left_entropy ┆ right_entropy ┆ parent_entropy ┆ cum_sum_count_e ┆ sum_count_exam ┆ feature_value │
│ — ┆ — ┆ — ┆ xamples ┆ ples ┆ — │
│ f64 ┆ f64 ┆ f64 ┆ — ┆ — ┆ i8 │
│ ┆ ┆ ┆ u32 ┆ u32 ┆ │
╞══════════════╪═══════════════╪════════════════╪═════════════════╪════════════════╪═══════════════╡
│ -0.0 ┆ 0.999854 ┆ 0.999853 ┆ 3 ┆ 54564 ┆ 29 │
│ -0.0 ┆ 0.999854 ┆ 0.999853 ┆ 4 ┆ 54564 ┆ 30 │
│ 0.783817 ┆ 1.0 ┆ 0.999853 ┆ 1427 ┆ 54564 ┆ 39 │
│ 0.767101 ┆ 0.999866 ┆ 0.999853 ┆ 2694 ┆ 54564 ┆ 40 │
│ 0.808516 ┆ 0.999503 ┆ 0.999853 ┆ 4177 ┆ 54564 ┆ 41 │
│ … ┆ … ┆ … ┆ … ┆ … ┆ … │
│ 0.993752 ┆ 0.918461 ┆ 0.999853 ┆ 44483 ┆ 54564 ┆ 59 │
│ 0.995485 ┆ 0.890397 ┆ 0.999853 ┆ 46944 ┆ 54564 ┆ 60 │
│ 0.997367 ┆ 0.880977 ┆ 0.999853 ┆ 49106 ┆ 54564 ┆ 61 │
│ 0.99837 ┆ 0.859431 ┆ 0.999853 ┆ 50800 ┆ 54564 ┆ 62 │
│ 0.999436 ┆ 0.872346 ┆ 0.999853 ┆ 52877 ┆ 54564 ┆ 63 │
└──────────────┴───────────────┴────────────────┴─────────────────┴────────────────┴───────────────┘
“`
快到了!最後一步是計算子熵,並利用它來獲得信息增益。
“`python
.select(
(
pl.col(“cum_sum_count_examples”) / pl.col(“sum_count_examples”) * pl.col(“left_entropy”)
+ (pl.col(“sum_count_examples”) – pl.col(“cum_sum_count_examples”))
/ pl.col(“sum_count_examples”)
* pl.col(“right_entropy”)
).alias(“child_entropy”),
# 來自之前的選擇
pl.col(“parent_entropy”),
pl.col(“feature_value”),
)
.select(
(pl.col(“parent_entropy”) – pl.col(“child_entropy”)).alias(“information_gain”),
# 來自之前的選擇
pl.col(“parent_entropy”),
pl.col(“feature_value”),
)
.filter(pl.col(“information_gain”).is_not_nan())
.sort(“information_gain”, descending=True)
.head(1)
.with_columns(feature=pl.lit(feature_name))
“`
對於子熵,左熵和右熵按特徵值的示例數進行加權。這兩個加權熵值的總和用作子熵。要計算信息增益,我們只需從父熵中減去子熵,如方程式 1 所示。最佳特徵值通過按信息增益排序並選擇第一行來確定。它將附加到收集所有特徵的最佳特徵值的列表中。
在應用 .head(1) 之前,數據看起來如下:
“`
┌──────────────────┬────────────────┬───────────────┐
│ information_gain ┆ parent_entropy ┆ feature_value │
│ — ┆ — ┆ — │
│ f64 ┆ f64 ┆ i8 │
╞══════════════════╪════════════════╪═══════════════╡
│ 0.028388 ┆ 0.999928 ┆ 54 │
│ 0.027719 ┆ 0.999928 ┆ 52 │
│ 0.027283 ┆ 0.999928 ┆ 53 │
│ 0.026826 ┆ 0.999928 ┆ 50 │
│ 0.026812 ┆ 0.999928 ┆ 51 │
│ … ┆ … ┆ … │
│ 0.010928 ┆ 0.999928 ┆ 62 │
│ 0.005872 ┆ 0.999928 ┆ 39 │
│ 0.004155 ┆ 0.999928 ┆ 63 │
│ 0.000072 ┆ 0.999928 ┆ 30 │
│ 0.000054 ┆ 0.999928 ┆ 29 │
└──────────────────┴────────────────┴───────────────┘
“`
在這裡可以看到,年齡特徵值 54 具有最高的信息增益。這個特徵值將被收集到年齡特徵中,並需要與其他特徵競爭。
選擇最佳分割並定義子樹
要選擇最佳分割,需要在所有特徵中找到最高的信息增益。
“`python
if isinstance(information_gain_dfs[0], pl.LazyFrame):
information_gain_dfs = pl.collect_all(information_gain_dfs, streaming=self.streaming)
information_gain_dfs = pl.concat(information_gain_dfs, how=”vertical_relaxed”).sort(
“information_gain”, descending=True
)
“`
為此,對 information_gain_dfs 使用 pl.collect_all() 方法。這可以並行評估所有 LazyFrames,使處理非常高效。結果是一個 Polars DataFrame 的列表,按信息增益進行串接和排序。
對於心臟病示例,數據看起來像這樣:
“`
┌──────────────────┬────────────────┬───────────────┬─────────────┐
│ information_gain ┆ parent_entropy ┆ feature_value ┆ feature │
│ — ┆ — ┆ — ┆ — │
│ f64 ┆ f64 ┆ f64 ┆ str │
╞══════════════════╪════════════════╪═══════════════╪═════════════╡
│ 0.138032 ┆ 0.999909 ┆ 129.0 ┆ ap_hi │
│ 0.09087 ┆ 0.999909 ┆ 85.0 ┆ ap_lo │
│ 0.029966 ┆ 0.999909 ┆ 0.0 ┆ cholesterol │
│ 0.028388 ┆ 0.999909 ┆ 54.0 ┆ age_years │
│ 0.01968 ┆ 0.999909 ┆ 27.435041 ┆ bmi │
│ … ┆ … ┆ … ┆ … │
│ 0.000851 ┆ 0.999909 ┆ 0.0 ┆ active │
│ 0.000351 ┆ 0.999909 ┆ 156.0 ┆ height │
│ 0.000223 ┆ 0.999909 ┆ 0.0 ┆ smoke │
│ 0.000098 ┆ 0.999909 ┆ 0.0 ┆ alco │
│ 0.000031 ┆ 0.999909 ┆ 0.0 ┆ gender │
└──────────────────┴────────────────┴───────────────┴─────────────┘
“`
在所有特徵中,ap_hi(收縮壓)特徵值 129 產生最佳的信息增益,因此將被選擇作為第一次分割。
“`python
information_gain = 0
if len(information_gain_dfs) > 0:
best_params = information_gain_dfs.row(0, named=True)
information_gain = best_params[“information_gain”]
“`
在某些情況下,information_gain_dfs 可能是空的,例如,當所有分割結果僅在左側或右側有示例時。如果是這樣,信息增益為零。否則,我們將獲得具有最高信息增益的特徵值。
“`python
if information_gain > 0:
left_mask = data.select(filter=pl.col(best_params[“feature”]) <= best_params["feature_value"])
if isinstance(left_mask, pl.LazyFrame):
left_mask = left_mask.collect(streaming=self.streaming)
left_mask = left_mask[“filter”]
# 分割數據
left_df = data.filter(left_mask)
right_df = data.filter(~left_mask)
left_subtree = self._build_tree(left_df, feature_names, target_name, unique_targets, depth + 1)
right_subtree = self._build_tree(right_df, feature_names, target_name, unique_targets, depth + 1)
if isinstance(data, pl.LazyFrame):
target_distribution = (
data.select(target_name)
.collect(streaming=self.streaming)[target_name]
.value_counts()
.sort(target_name)[“count”]
.to_list()
)
else:
target_distribution = data[target_name].value_counts().sort(target_name)[“count”].to_list()
return {
“type”: “node”,
“feature”: best_params[“feature”],
“threshold”: best_params[“feature_value”],
“information_gain”: best_params[“information_gain”],
“entropy”: best_params[“parent_entropy”],
“target_distribution”: target_distribution,
“left”: left_subtree,
“right”: right_subtree,
}
else:
return {“type”: “leaf”, “value”: self.get_majority_class(data, target_name)}
“`
當信息增益大於零時,定義子樹。為此,使用導致最佳信息增益的特徵值定義左掩碼。將掩碼應用於父數據以獲得左數據框。左掩碼的否定用於定義右數據框。兩個左和右數據框都用於再次調用 _build_tree() 方法,深度增加 1。最後一步是計算目標分佈。這用作節點的附加信息,並將與其他信息一起在繪製樹時顯示。
當信息增益為零時,將返回一個葉子實例。這包含給定數據的主要類別。
進行預測
可以以兩種不同的方式進行預測。如果輸入數據較小,可以使用 predict() 方法。
“`python
def predict(self, data: Iterable[dict]):
def _predict_sample(node, sample):
if node[“type”] == “leaf”:
return node[“value”]
if sample[node[“feature”]] <= node["threshold"]:
return _predict_sample(node[“left”], sample)
else:
return _predict_sample(node[“right”], sample)
predictions = [_predict_sample(self.tree, sample) for sample in data]
return predictions
“`
在這裡,數據可以作為字典的可迭代項提供。每個字典包含特徵名稱作為鍵,特徵值作為值。通過使用 _predict_sample() 方法,沿著樹的路徑進行跟隨,直到到達葉子節點。這裡包含了分配給相應示例的類別。
“`python
def predict_many(self, data: Union[pl.DataFrame, pl.LazyFrame]) -> List[Union[int, float]]:
“””
預測方法。
:param data: Polars DataFrame 或 LazyFrame。
:return: 預測目標值的列表。
“””
if self.categorical_mappings:
data = self.apply_categorical_mappings(data)
def _predict_many(node, temp_data):
if node[“type”] == “node”:
left = _predict_many(node[“left”], temp_data.filter(pl.col(node[“feature”]) <= node["threshold"]))
right = _predict_many(node[“right”], temp_data.filter(pl.col(node[“feature”]) > node[“threshold”]))
return pl.concat([left, right], how=”diagonal_relaxed”)
else:
return temp_data.select(pl.col(“temp_prediction_index”), pl.lit(node[“value”]).alias(“prediction”))
data = data.with_row_index(“temp_prediction_index”)
predictions = _predict_many(self.tree, data).sort(“temp_prediction_index”).select(pl.col(“prediction”))
# 將預測轉換為列表
if isinstance(predictions, pl.LazyFrame):
# 儘管執行計劃顯示沒有流式處理,但在這裡使用流式處理顯著提高性能並減少內存使用。
predictions = predictions.collect(streaming=True)
predictions = predictions[“prediction”].to_list()
return predictions
“`
如果要預測一個大的示例集,使用 predict_many() 方法會更有效率。這利用了 Polars 在並行處理和內存效率方面的優勢。
數據可以作為 Polars DataFrame 或 LazyFrame 提供。與訓練過程中的 _build_tree() 方法類似,遞歸調用 _predict_many() 方法。數據中的所有示例都被過濾到子樹中,直到到達葉子節點。走同一路徑到達葉子節點的示例將獲得相同的預測值。過程結束時,所有示例的子框架再次串接。由於無法保持順序,因此在過程開始時設置了一個臨時預測索引。當所有預測完成後,通過該索引進行排序以恢復原始順序。
在數據集上使用分類器
決策樹分類器的使用示例可以在這裡找到。決策樹在心臟病數據集上進行訓練。定義訓練集和測試集以測試實現的性能。訓練後,樹被繪製並保存到文件中。
在最大深度為四的情況下,結果樹看起來如下:

它在給定數據上達到 73% 的訓練和測試準確率。
運行時間比較
使用 Polars 作為決策樹後端的一個目標是探索運行時間和內存使用情況,並與其他框架進行比較。為此,我創建了一個內存分析腳本,可以在這裡找到。
該腳本比較了這個實現,稱為“efficient-trees”,與 sklearn 和 lightgbm。對於 efficient-trees,測試了懶加載流式變體和非懶加載的內存變體。

在圖中可以看到,lightgbm 是最快和最節省內存的框架。由於它在不久前引入了使用箭頭數據集的可能性,因此可以有效地處理數據。然而,由於整個數據集仍需要加載,無法進行流式處理,因此仍存在潛在的擴展問題。
第二好的框架是 efficient-trees,無論是否使用流式處理。雖然不使用流式處理的 efficient-trees 具有更好的運行時間,但流式變體使用的內存更少。
sklearn 實現的內存使用和運行時間結果最差。由於數據需要作為 numpy 陣列提供,內存使用量大幅增加。運行時間可以解釋為僅使用一個 CPU 核心。尚未支持多線程或多處理。
深入了解:Polars 中的流式處理
從框架的比較中可以看出,流式處理數據而不是將其保存在內存中,對所有其他框架都有很大不同。然而,流式引擎仍被視為實驗性功能,並非所有操作都與流式處理兼容。
為了更好地理解背景發生了什麼,查看執行計劃是有用的。讓我們回到訓練過程,獲取以下操作的執行計劃:
“`python
def fit(self, data: Union[pl.DataFrame, pl.LazyFrame], target_name: str) -> None:
“””
Fit 方法用於訓練決策樹。
:param data: 包含訓練數據的 Polars DataFrame 或 LazyFrame。
:param target_name: 目標列的名稱
“””
columns = data.collect_schema().names()
feature_names = [col for col in columns if col != target_name]
# 縮小數據類型
data = data.select(pl.all().shrink_dtype()).with_columns(
pl.col(target_name).cast(pl.UInt64).shrink_dtype().alias(target_name)
)
“`
可以使用以下命令創建數據的執行計劃:
“`python
data.explain(streaming=True)
“`
這將返回 LazyFrame 的執行計劃。
“`
WITH_COLUMNS:
[col(“cardio”).strict_cast(UInt64).shrink_dtype().alias(“cardio”)]
SELECT [col(“gender”).shrink_dtype(), col(“height”).shrink_dtype(), col(“weight”).shrink_dtype(), col(“ap_hi”).shrink_dtype(), col(“ap_lo”).shrink_dtype(), col(“cholesterol”).shrink_dtype(), col(“gluc”).shrink_dtype(), col(“smoke”).shrink_dtype(), col(“alco”).shrink_dtype(), col(“active”).shrink_dtype(), col(“cardio”).shrink_dtype(), col(“age_years”).shrink_dtype(), col(“bmi”).shrink_dtype()] FROM
STREAMING:
DF [“gender”, “height”, “weight”, “ap_hi”]; PROJECT 13/13 COLUMNS; SELECTION: None
“`
這裡重要的關鍵字是 STREAMING。可以看到,初始數據集加載在流式模式下進行,但在縮小數據類型時,整個數據集需要加載到內存中。由於縮小數據類型不是必要的部分,我暫時將其刪除,以探索流式處理支持的操作。
下一個問題操作是分配類別特徵。
“`python
def apply_categorical_mappings(self, data: Union[pl.DataFrame, pl.LazyFrame]) -> Union[pl.DataFrame, pl.LazyFrame]:
“””
在輸入框架上應用類別映射。
:param data: 具有類別列的 Polars DataFrame 或 LazyFrame。
:return: 具有映射類別列的 Polars DataFrame 或 LazyFrame
“””
return data.with_columns(
[pl.col(col).replace(self.categorical_mappings[col]).cast(pl.UInt32) for col in self.categorical_columns]
)
“`
替換表達式不支持流式模式。即使刪除類型轉換,流式處理也未被使用,這可以從執行計劃中看出。
“`
WITH_COLUMNS:
[col(“gender”).replace([Series, Series]), col(“cholesterol”).replace([Series, Series]), col(“gluc”).replace([Series, Series]), col(“smoke”).replace([Series, Series]), col(“alco”).replace([Series, Series]), col(“active”).replace([Series, Series])]
STREAMING:
DF [“gender”, “height”, “weight”, “ap_hi”]; PROJECT */13 COLUMNS; SELECTION: None
“`
接下來,我還刪除了對類別特徵的支持。接下來發生的是計算信息增益。
“`python
information_gain_df = (
feature_data.group_by(“feature_value”)
.agg(
[
pl.col(target_name)
.filter(pl.col(target_name) == target_value)
.len()
.alias(f”class_{target_value}_count”)
for target_value in unique_targets
]
+ [pl.col(target_name).len().alias(“count_examples”)]
)
.sort(“feature_value”)
)
“`
不幸的是,在計算的第一部分,流式模式不再受支持。在這裡,使用 pl.col().filter() 阻止了我們流式處理數據。
“`
SORT BY [col(“feature_value”)]
AGGREGATE
[col(“cardio”).filter([(col(“cardio”)) == (1)]).count().alias(“class_1_count”), col(“cardio”).filter([(col(“cardio”)) == (0)]).count().alias(“class_0_count”), col(“cardio”).count().alias(“count_examples”)] BY [col(“feature_value”)] FROM
STREAMING:
RENAME
simple π 2/2 [“gender”, “cardio”]
DF [“gender”, “height”, “weight”, “ap_hi”]; PROJECT 2/13 COLUMNS; SELECTION: col(“gender”).is_not_null()
“`
這並不容易改變,因此我將在這裡停止探索。可以得出結論,在使用 Polars 後端的決策樹實現中,流式處理的全部潛力尚無法使用,因為重要的運算符仍然缺乏流式支持。由於流式模式正在積極開發中,未來可能會在流式模式下運行大多數運算符,甚至整個決策樹的計算。
結論
在這篇博客文章中,我展示了使用 Polars 作為後端的決策樹的自定義實現。我展示了實現細節,並將其與其他決策樹框架進行了比較。比較顯示,這個實現可以在運行時間和內存使用方面超越 sklearn。但仍然有其他框架,如 lightgbm,提供更好的運行時間和更高效的處理。在使用 Polars 後端時,流式模式具有很大的潛力。目前,一些運算符阻止了端到端的流式處理方法,因為缺乏流式支持,但這正在積極開發中。當 Polars 在這方面取得進展時,值得重新檢視這個實現並再次與其他框架進行比較。
本文由 AI 台灣 運用 AI 技術編撰,內容僅供參考,請自行核實相關資訊。
歡迎加入我們的 AI TAIWAN 台灣人工智慧中心 FB 社團,
隨時掌握最新 AI 動態與實用資訊!