移除其他无用脚本

This commit is contained in:
RYDE-WORK 2026-02-26 19:31:48 +08:00
parent 00f51f37f0
commit c6e0a974cc
8 changed files with 25 additions and 1868 deletions

111
Makefile
View File

@ -8,9 +8,7 @@ PYTHON_INTERPRETER = python
# --- CLI flag 变量 ---
MPNN_FLAG = $(if $(USE_MPNN),--use-mpnn,)
FREEZE_FLAG = $(if $(FREEZE_BACKBONE),--freeze-backbone,)
DEVICE_FLAG = $(if $(DEVICE),--device $(DEVICE),)
SCAFFOLD_SPLIT_FLAG = $(if $(filter 1,$(SCAFFOLD_SPLIT)),--scaffold-split,)
SEED_FLAG = $(if $(SEED),--seed $(SEED),)
N_TRIALS_FLAG = $(if $(N_TRIALS),--n-trials $(N_TRIALS),)
EPOCHS_PER_TRIAL_FLAG = $(if $(EPOCHS_PER_TRIAL),--epochs-per-trial $(EPOCHS_PER_TRIAL),)
@ -61,16 +59,6 @@ format:
preprocess: requirements
$(PYTHON_INTERPRETER) scripts/preprocess_internal.py
## Process dataset (interim -> processed)
.PHONY: data
data: requirements
$(PYTHON_INTERPRETER) scripts/process_data.py
## Process dataset for final training (interim -> processed/final, train:val=9:1, no test)
.PHONY: data_final
data_final: requirements
$(PYTHON_INTERPRETER) scripts/process_data_final.py
## Process external data for pretrain (external -> processed)
.PHONY: data_pretrain
data_pretrain: requirements
@ -81,83 +69,18 @@ data_pretrain: requirements
data_benchmark: requirements
$(PYTHON_INTERPRETER) scripts/process_benchmark_data.py
## Process internal data with CV splitting (interim -> processed/cv)
## Use SCAFFOLD_SPLIT=1 to enable amine-based scaffold splitting (default: random shuffle)
.PHONY: data_cv
data_cv: requirements
$(PYTHON_INTERPRETER) scripts/process_data_cv.py $(SCAFFOLD_SPLIT_FLAG)
#################################################################################
# TRAINING #
# BENCHMARKING #
#################################################################################
## Pretrain on external data (delivery only)
.PHONY: pretrain
pretrain: requirements
$(PYTHON_INTERPRETER) -m lnp_ml.modeling.pretrain main $(MPNN_FLAG) $(DEVICE_FLAG)
## Benchmark on baseline CV data: 5-fold train + test (delivery only)
.PHONY: benchmark
benchmark: requirements
$(PYTHON_INTERPRETER) -m lnp_ml.modeling.benchmark main $(MPNN_FLAG) $(DEVICE_FLAG)
$(PYTHON_INTERPRETER) -m lnp_ml.modeling.benchmark test $(DEVICE_FLAG)
## Train model (multi-task, from scratch)
.PHONY: train
train: requirements
$(PYTHON_INTERPRETER) -m lnp_ml.modeling.train $(MPNN_FLAG) $(DEVICE_FLAG)
## Finetune from pretrained checkpoint (use FREEZE_BACKBONE=1 to freeze backbone)
.PHONY: finetune
finetune: requirements
$(PYTHON_INTERPRETER) -m lnp_ml.modeling.train --init-from-pretrain models/pretrain_delivery.pt $(FREEZE_FLAG) $(MPNN_FLAG) $(DEVICE_FLAG)
## Final training using all data (train:val=9:1, no test set), with pretrained weights
.PHONY: train_final
train_final: requirements
$(PYTHON_INTERPRETER) -m lnp_ml.modeling.train \
--train-path data/processed/final/train.parquet \
--val-path data/processed/final/val.parquet \
--output-dir models/final \
--init-from-pretrain models/pretrain_delivery.pt \
$(FREEZE_FLAG) $(MPNN_FLAG) $(DEVICE_FLAG)
## Train with cross-validation on internal data only (5-fold, amine-based split)
.PHONY: train_cv
train_cv: requirements
$(PYTHON_INTERPRETER) -m lnp_ml.modeling.train_cv main $(FREEZE_FLAG) $(MPNN_FLAG) $(DEVICE_FLAG)
## Finetune with cross-validation on internal data (5-fold) with pretrained weights
.PHONY: finetune_cv
finetune_cv: requirements
$(PYTHON_INTERPRETER) -m lnp_ml.modeling.train_cv main --init-from-pretrain models/pretrain_delivery.pt $(FREEZE_FLAG) $(MPNN_FLAG) $(DEVICE_FLAG)
#################################################################################
# EVALUATION #
#################################################################################
## Evaluate pretrain model (delivery metrics)
.PHONY: test_pretrain
test_pretrain: requirements
$(PYTHON_INTERPRETER) -m lnp_ml.modeling.pretrain test $(MPNN_FLAG) $(DEVICE_FLAG)
## Evaluate CV finetuned models on test sets (auto-detects MPNN from checkpoint)
.PHONY: test_cv
test_cv: requirements
$(PYTHON_INTERPRETER) -m lnp_ml.modeling.train_cv test $(DEVICE_FLAG)
## Test model on test set (with detailed metrics, auto-detects MPNN from checkpoint)
.PHONY: test
test: requirements
$(PYTHON_INTERPRETER) -m lnp_ml.modeling.predict test $(DEVICE_FLAG)
## Run predictions
.PHONY: predict
predict: requirements
$(PYTHON_INTERPRETER) -m lnp_ml.modeling.predict $(DEVICE_FLAG)
#################################################################################
# HYPERPARAMETER TUNING #
# TRAINING (Nested CV + Optuna) #
#################################################################################
# 通用参数:
# SEED 随机种子 (默认: 42)
@ -167,26 +90,26 @@ predict: requirements
# OUTPUT_DIR 输出目录 (根据命令有不同默认值)
# INIT_PRETRAIN 预训练权重路径 (默认: models/pretrain_delivery.pt)
# NO_PRETRAIN=1 禁用预训练权重
# USE_SWA=1 启用 SWA (final train 阶段)
#
# 使用示例:
# make pretrain
# make train DEVICE=cuda N_TRIALS=30 USE_SWA=1 INIT_PRETRAIN=models/pretrain_delivery.pt
## Train with hyperparameter tuning
.PHONY: tune
tune: requirements
$(PYTHON_INTERPRETER) -m lnp_ml.modeling.train --tune $(MPNN_FLAG) $(DEVICE_FLAG)
## Nested CV with Optuna: outer 5-fold (test) + inner 3-fold (tune)
## 用于模型评估:外层 5-fold 产生无偏性能估计,内层 3-fold 做超参搜索
## 使用示例: make nested_cv_tune DEVICE=cuda N_TRIALS=30
.PHONY: nested_cv_tune
nested_cv_tune: requirements
## Pretrain on external data (delivery only)
.PHONY: pretrain
pretrain: requirements
$(PYTHON_INTERPRETER) -m lnp_ml.modeling.pretrain main $(MPNN_FLAG) $(DEVICE_FLAG)
## Train: nested CV evaluation + final model training
## Step 1: 外层 5-fold 产生无偏性能估计,内层 3-fold 做超参搜索
## Step 2: 3-fold 调参后用全量数据训练最终模型
.PHONY: train
train: requirements
$(PYTHON_INTERPRETER) -m lnp_ml.modeling.nested_cv_optuna \
$(DEVICE_FLAG) $(MPNN_FLAG) $(SEED_FLAG) $(INIT_PRETRAIN_FLAG) \
$(N_TRIALS_FLAG) $(EPOCHS_PER_TRIAL_FLAG) $(MIN_STRATUM_FLAG) $(OUTPUT_DIR_FLAG)
## Final training with Optuna: 3-fold CV tune + full data train
## 用于最终模型训练3-fold 调参后用全量数据训练(无 early-stop
## 使用示例: make final_optuna DEVICE=cuda N_TRIALS=30 USE_SWA=1
.PHONY: final_optuna
final_optuna: requirements
$(PYTHON_INTERPRETER) -m lnp_ml.modeling.final_train_optuna_cv \
$(DEVICE_FLAG) $(MPNN_FLAG) $(SEED_FLAG) $(INIT_PRETRAIN_FLAG) \
$(N_TRIALS_FLAG) $(EPOCHS_PER_TRIAL_FLAG) $(MIN_STRATUM_FLAG) $(OUTPUT_DIR_FLAG) $(USE_SWA_FLAG)

View File

@ -367,7 +367,7 @@ def run_optuna_cv(
@app.command()
def main(
input_path: Path = INTERIM_DATA_DIR / "internal.csv",
output_dir: Path = MODELS_DIR / "final_optuna",
output_dir: Path = MODELS_DIR / "final",
# CV 参数
n_folds: int = 3,
min_stratum_count: int = 5,

View File

@ -1,423 +0,0 @@
"""训练脚本:支持超参数调优"""
import json
from pathlib import Path
from typing import List, Optional, Union
import pandas as pd
import torch
from torch.utils.data import DataLoader
from loguru import logger
import typer
from lnp_ml.config import MODELS_DIR, PROCESSED_DATA_DIR
from lnp_ml.dataset import LNPDataset, collate_fn
from lnp_ml.modeling.models import LNPModel, LNPModelWithoutMPNN
from lnp_ml.modeling.trainer import (
train_epoch,
validate,
EarlyStopping,
LossWeights,
)
from lnp_ml.modeling.visualization import plot_multitask_loss_curves
# MPNN ensemble 默认路径
DEFAULT_MPNN_ENSEMBLE_DIR = MODELS_DIR / "mpnn" / "all_amine_split_for_LiON"
def find_mpnn_ensemble_paths(base_dir: Path = DEFAULT_MPNN_ENSEMBLE_DIR) -> List[str]:
"""
自动查找 MPNN ensemble model.pt 文件
base_dir 下查找所有 cv_*/fold_*/model_*/model.pt 文件
"""
model_paths = sorted(base_dir.glob("cv_*/fold_*/model_*/model.pt"))
if not model_paths:
raise FileNotFoundError(f"No model.pt files found in {base_dir}")
return [str(p) for p in model_paths]
app = typer.Typer()
def create_model(
d_model: int = 256,
num_heads: int = 8,
n_attn_layers: int = 4,
fusion_strategy: str = "attention",
head_hidden_dim: int = 128,
dropout: float = 0.1,
# MPNN 参数(可选)
mpnn_checkpoint: Optional[str] = None,
mpnn_ensemble_paths: Optional[List[str]] = None,
mpnn_device: str = "cpu",
) -> Union[LNPModel, LNPModelWithoutMPNN]:
"""创建模型(支持可选的 MPNN encoder"""
use_mpnn = mpnn_checkpoint is not None or mpnn_ensemble_paths is not None
if use_mpnn:
return LNPModel(
d_model=d_model,
num_heads=num_heads,
n_attn_layers=n_attn_layers,
fusion_strategy=fusion_strategy,
head_hidden_dim=head_hidden_dim,
dropout=dropout,
mpnn_checkpoint=mpnn_checkpoint,
mpnn_ensemble_paths=mpnn_ensemble_paths,
mpnn_device=mpnn_device,
)
else:
return LNPModelWithoutMPNN(
d_model=d_model,
num_heads=num_heads,
n_attn_layers=n_attn_layers,
fusion_strategy=fusion_strategy,
head_hidden_dim=head_hidden_dim,
dropout=dropout,
)
def train_model(
train_loader: DataLoader,
val_loader: DataLoader,
model: torch.nn.Module,
device: torch.device,
lr: float = 1e-4,
weight_decay: float = 1e-5,
epochs: int = 100,
patience: int = 15,
loss_weights: Optional[LossWeights] = None,
) -> dict:
"""
训练模型
Returns:
训练历史和最佳验证损失
"""
model = model.to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode="min", factor=0.5, patience=5, verbose=True
)
early_stopping = EarlyStopping(patience=patience)
history = {"train": [], "val": []}
best_val_loss = float("inf")
best_state = None
for epoch in range(epochs):
# Train
train_metrics = train_epoch(model, train_loader, optimizer, device, loss_weights)
# Validate
val_metrics = validate(model, val_loader, device, loss_weights)
# Log
logger.info(
f"Epoch {epoch+1}/{epochs} | "
f"Train Loss: {train_metrics['loss']:.4f} | "
f"Val Loss: {val_metrics['loss']:.4f}"
)
history["train"].append(train_metrics)
history["val"].append(val_metrics)
# Learning rate scheduling
scheduler.step(val_metrics["loss"])
# Save best model
if val_metrics["loss"] < best_val_loss:
best_val_loss = val_metrics["loss"]
best_state = {k: v.cpu().clone() for k, v in model.state_dict().items()}
logger.info(f" -> New best model (val_loss={best_val_loss:.4f})")
# Early stopping
if early_stopping(val_metrics["loss"]):
logger.info(f"Early stopping at epoch {epoch+1}")
break
# Restore best model
if best_state is not None:
model.load_state_dict(best_state)
return {
"history": history,
"best_val_loss": best_val_loss,
}
def run_hyperparameter_tuning(
train_loader: DataLoader,
val_loader: DataLoader,
device: torch.device,
n_trials: int = 20,
epochs_per_trial: int = 30,
) -> dict:
"""
使用 Optuna 进行超参数调优
Returns:
最佳超参数
"""
try:
import optuna
except ImportError:
logger.error("Optuna not installed. Run: pip install optuna")
raise
def objective(trial: optuna.Trial) -> float:
# 采样超参数
d_model = trial.suggest_categorical("d_model", [128, 256, 512])
num_heads = trial.suggest_categorical("num_heads", [4, 8])
n_attn_layers = trial.suggest_int("n_attn_layers", 2, 6)
fusion_strategy = trial.suggest_categorical(
"fusion_strategy", ["attention", "avg", "max"]
)
head_hidden_dim = trial.suggest_categorical("head_hidden_dim", [64, 128, 256])
dropout = trial.suggest_float("dropout", 0.05, 0.3)
lr = trial.suggest_float("lr", 1e-5, 1e-3, log=True)
weight_decay = trial.suggest_float("weight_decay", 1e-6, 1e-4, log=True)
# 创建模型
model = create_model(
d_model=d_model,
num_heads=num_heads,
n_attn_layers=n_attn_layers,
fusion_strategy=fusion_strategy,
head_hidden_dim=head_hidden_dim,
dropout=dropout,
)
# 训练
result = train_model(
train_loader=train_loader,
val_loader=val_loader,
model=model,
device=device,
lr=lr,
weight_decay=weight_decay,
epochs=epochs_per_trial,
patience=10,
)
return result["best_val_loss"]
# 运行优化
study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=n_trials, show_progress_bar=True)
logger.info(f"Best trial: {study.best_trial.number}")
logger.info(f"Best val_loss: {study.best_trial.value:.4f}")
logger.info(f"Best params: {study.best_trial.params}")
return study.best_trial.params
@app.command()
def main(
train_path: Path = PROCESSED_DATA_DIR / "train.parquet",
val_path: Path = PROCESSED_DATA_DIR / "val.parquet",
output_dir: Path = MODELS_DIR,
# 模型参数
d_model: int = 256,
num_heads: int = 8,
n_attn_layers: int = 4,
fusion_strategy: str = "attention",
head_hidden_dim: int = 128,
dropout: float = 0.1,
# MPNN 参数(可选)
use_mpnn: bool = False, # 启用 MPNN自动从默认路径加载 ensemble
mpnn_checkpoint: Optional[str] = None,
mpnn_ensemble_paths: Optional[str] = None, # 逗号分隔的路径列表
mpnn_device: str = "cpu",
# 训练参数
batch_size: int = 32,
lr: float = 1e-4,
weight_decay: float = 1e-5,
epochs: int = 100,
patience: int = 15,
# 超参数调优
tune: bool = False,
n_trials: int = 20,
epochs_per_trial: int = 30,
# 预训练权重加载
init_from_pretrain: Optional[Path] = None,
load_delivery_head: bool = True,
freeze_backbone: bool = False, # 冻结 backbone只训练 heads
# 设备
device: str = "cuda" if torch.cuda.is_available() else "cpu",
):
"""
训练 LNP 预测模型多任务 finetune
使用 --tune 启用超参数调优
使用 --init-from-pretrain 从预训练 checkpoint 初始化 backbone
使用 --use-mpnn 启用 MPNN encoder自动从 models/mpnn/all_amine_split_for_LiON 加载
使用 --freeze-backbone 冻结 backbone只训练多任务 heads
"""
logger.info(f"Using device: {device}")
device = torch.device(device)
# 加载数据
logger.info(f"Loading train data from {train_path}")
train_df = pd.read_parquet(train_path)
train_dataset = LNPDataset(train_df)
train_loader = DataLoader(
train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn
)
logger.info(f"Loading val data from {val_path}")
val_df = pd.read_parquet(val_path)
val_dataset = LNPDataset(val_df)
val_loader = DataLoader(
val_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn
)
logger.info(f"Train samples: {len(train_dataset)}, Val samples: {len(val_dataset)}")
output_dir.mkdir(parents=True, exist_ok=True)
# 超参数调优
if tune:
logger.info(f"Starting hyperparameter tuning with {n_trials} trials...")
best_params = run_hyperparameter_tuning(
train_loader=train_loader,
val_loader=val_loader,
device=device,
n_trials=n_trials,
epochs_per_trial=epochs_per_trial,
)
# 保存最佳参数
params_path = output_dir / "best_params.json"
with open(params_path, "w") as f:
json.dump(best_params, f, indent=2)
logger.success(f"Saved best params to {params_path}")
# 使用最佳参数重新训练
d_model = best_params["d_model"]
num_heads = best_params["num_heads"]
n_attn_layers = best_params["n_attn_layers"]
fusion_strategy = best_params["fusion_strategy"]
head_hidden_dim = best_params["head_hidden_dim"]
dropout = best_params["dropout"]
lr = best_params["lr"]
weight_decay = best_params["weight_decay"]
# 解析 MPNN 配置
# 优先级mpnn_checkpoint > mpnn_ensemble_paths > use_mpnn自动查找
ensemble_paths_list = None
if mpnn_ensemble_paths:
ensemble_paths_list = mpnn_ensemble_paths.split(",")
elif use_mpnn and mpnn_checkpoint is None:
# --use-mpnn 但没有指定具体路径,自动查找
logger.info(f"Auto-detecting MPNN ensemble from {DEFAULT_MPNN_ENSEMBLE_DIR}")
ensemble_paths_list = find_mpnn_ensemble_paths()
logger.info(f"Found {len(ensemble_paths_list)} MPNN models")
enable_mpnn = mpnn_checkpoint is not None or ensemble_paths_list is not None
# 创建模型
logger.info(f"Creating model (use_mpnn={enable_mpnn})...")
model = create_model(
d_model=d_model,
num_heads=num_heads,
n_attn_layers=n_attn_layers,
fusion_strategy=fusion_strategy,
head_hidden_dim=head_hidden_dim,
dropout=dropout,
mpnn_checkpoint=mpnn_checkpoint,
mpnn_ensemble_paths=ensemble_paths_list,
mpnn_device=mpnn_device,
)
# 加载预训练权重(如果指定)
if init_from_pretrain is not None:
logger.info(f"Loading pretrain weights from {init_from_pretrain}")
checkpoint = torch.load(init_from_pretrain, map_location="cpu")
# 检查配置是否兼容
pretrain_config = checkpoint.get("config", {})
if pretrain_config.get("d_model") != d_model:
logger.warning(
f"d_model mismatch: pretrain={pretrain_config.get('d_model')}, "
f"current={d_model}. Skipping pretrain loading."
)
else:
# 加载 backbone + (可选) delivery head
model.load_pretrain_weights(
pretrain_state_dict=checkpoint["model_state_dict"],
load_delivery_head=load_delivery_head,
strict=False,
)
logger.success(
f"Loaded pretrain weights (backbone + delivery_head={load_delivery_head})"
)
# 冻结 backbone如果指定
if freeze_backbone:
logger.info("Freezing backbone (token_projector, cross_attention, fusion)...")
frozen_count = 0
for name, param in model.named_parameters():
if name.startswith(("token_projector.", "cross_attention.", "fusion.")):
param.requires_grad = False
frozen_count += 1
logger.info(f"Frozen {frozen_count} parameter tensors")
# 打印模型信息
n_params_total = sum(p.numel() for p in model.parameters())
n_params_trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
logger.info(f"Model parameters: {n_params_total:,} total, {n_params_trainable:,} trainable")
# 训练
logger.info("Starting training...")
result = train_model(
train_loader=train_loader,
val_loader=val_loader,
model=model,
device=device,
lr=lr,
weight_decay=weight_decay,
epochs=epochs,
patience=patience,
)
# 保存模型
model_path = output_dir / "model.pt"
torch.save({
"model_state_dict": model.state_dict(),
"config": {
"d_model": d_model,
"num_heads": num_heads,
"n_attn_layers": n_attn_layers,
"fusion_strategy": fusion_strategy,
"head_hidden_dim": head_hidden_dim,
"dropout": dropout,
"use_mpnn": enable_mpnn,
},
"best_val_loss": result["best_val_loss"],
"init_from_pretrain": str(init_from_pretrain) if init_from_pretrain else None,
}, model_path)
logger.success(f"Saved model to {model_path}")
# 保存训练历史
history_path = output_dir / "history.json"
with open(history_path, "w") as f:
json.dump(result["history"], f, indent=2)
logger.success(f"Saved training history to {history_path}")
# 绘制多任务 loss 曲线图
loss_plot_path = output_dir / "loss_curves.png"
plot_multitask_loss_curves(
history=result["history"],
output_path=loss_plot_path,
title="Multi-task Training Loss Curves",
)
logger.success(f"Saved loss curves plot to {loss_plot_path}")
logger.success(f"Training complete! Best val_loss: {result['best_val_loss']:.4f}")
if __name__ == "__main__":
app()

View File

@ -1,730 +0,0 @@
"""Cross-Validation 训练脚本:在 5-fold 内部数据上进行多任务训练"""
import json
from pathlib import Path
from typing import Dict, List, Optional, Union
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from loguru import logger
from tqdm import tqdm
import typer
from lnp_ml.config import MODELS_DIR, PROCESSED_DATA_DIR
from lnp_ml.dataset import LNPDataset, collate_fn
from lnp_ml.modeling.models import LNPModel, LNPModelWithoutMPNN
from lnp_ml.modeling.trainer import (
train_epoch,
validate,
EarlyStopping,
LossWeights,
)
from lnp_ml.modeling.visualization import plot_multitask_loss_curves
# MPNN ensemble 默认路径
DEFAULT_MPNN_ENSEMBLE_DIR = MODELS_DIR / "mpnn" / "all_amine_split_for_LiON"
def find_mpnn_ensemble_paths(base_dir: Path = DEFAULT_MPNN_ENSEMBLE_DIR) -> List[str]:
"""自动查找 MPNN ensemble 的 model.pt 文件。"""
model_paths = sorted(base_dir.glob("cv_*/fold_*/model_*/model.pt"))
if not model_paths:
raise FileNotFoundError(f"No model.pt files found in {base_dir}")
return [str(p) for p in model_paths]
app = typer.Typer()
def create_model(
d_model: int = 256,
num_heads: int = 8,
n_attn_layers: int = 4,
fusion_strategy: str = "attention",
head_hidden_dim: int = 128,
dropout: float = 0.1,
mpnn_checkpoint: Optional[str] = None,
mpnn_ensemble_paths: Optional[List[str]] = None,
mpnn_device: str = "cpu",
) -> Union[LNPModel, LNPModelWithoutMPNN]:
"""创建模型(支持可选的 MPNN encoder"""
use_mpnn = mpnn_checkpoint is not None or mpnn_ensemble_paths is not None
if use_mpnn:
return LNPModel(
d_model=d_model,
num_heads=num_heads,
n_attn_layers=n_attn_layers,
fusion_strategy=fusion_strategy,
head_hidden_dim=head_hidden_dim,
dropout=dropout,
mpnn_checkpoint=mpnn_checkpoint,
mpnn_ensemble_paths=mpnn_ensemble_paths,
mpnn_device=mpnn_device,
)
else:
return LNPModelWithoutMPNN(
d_model=d_model,
num_heads=num_heads,
n_attn_layers=n_attn_layers,
fusion_strategy=fusion_strategy,
head_hidden_dim=head_hidden_dim,
dropout=dropout,
)
def train_fold(
fold_idx: int,
train_loader: DataLoader,
val_loader: DataLoader,
model: nn.Module,
device: torch.device,
output_dir: Path,
lr: float = 1e-4,
weight_decay: float = 1e-5,
epochs: int = 100,
patience: int = 15,
loss_weights: Optional[LossWeights] = None,
config: Optional[Dict] = None,
) -> Dict:
"""训练单个 fold"""
logger.info(f"\n{'='*60}")
logger.info(f"Training Fold {fold_idx}")
logger.info(f"{'='*60}")
model = model.to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode="min", factor=0.5, patience=5
)
early_stopping = EarlyStopping(patience=patience)
history = {"train": [], "val": []}
best_val_loss = float("inf")
best_state = None
for epoch in range(epochs):
# Train
train_metrics = train_epoch(model, train_loader, optimizer, device, loss_weights)
# Validate
val_metrics = validate(model, val_loader, device, loss_weights)
current_lr = optimizer.param_groups[0]["lr"]
# Log
logger.info(
f"Fold {fold_idx} Epoch {epoch+1}/{epochs} | "
f"Train Loss: {train_metrics['loss']:.4f} | "
f"Val Loss: {val_metrics['loss']:.4f} | "
f"LR: {current_lr:.2e}"
)
history["train"].append(train_metrics)
history["val"].append(val_metrics)
# Learning rate scheduling
scheduler.step(val_metrics["loss"])
# Save best model
if val_metrics["loss"] < best_val_loss:
best_val_loss = val_metrics["loss"]
best_state = {k: v.cpu().clone() for k, v in model.state_dict().items()}
logger.info(f" -> New best model (val_loss={best_val_loss:.4f})")
# Early stopping
if early_stopping(val_metrics["loss"]):
logger.info(f"Early stopping at epoch {epoch+1}")
break
# 保存最佳模型
fold_output_dir = output_dir / f"fold_{fold_idx}"
fold_output_dir.mkdir(parents=True, exist_ok=True)
checkpoint_path = fold_output_dir / "model.pt"
torch.save({
"model_state_dict": best_state,
"config": config,
"best_val_loss": best_val_loss,
"fold_idx": fold_idx,
}, checkpoint_path)
logger.success(f"Saved fold {fold_idx} model to {checkpoint_path}")
# 保存训练历史
history_path = fold_output_dir / "history.json"
with open(history_path, "w") as f:
json.dump(history, f, indent=2)
# 绘制多任务 loss 曲线图
loss_plot_path = fold_output_dir / "loss_curves.png"
plot_multitask_loss_curves(
history=history,
output_path=loss_plot_path,
title=f"Fold {fold_idx} Multi-task Loss Curves",
)
logger.info(f"Saved fold {fold_idx} loss curves to {loss_plot_path}")
return {
"fold_idx": fold_idx,
"best_val_loss": best_val_loss,
"epochs_trained": len(history["train"]),
"final_train_loss": history["train"][-1]["loss"] if history["train"] else 0,
}
@app.command()
def main(
data_dir: Path = PROCESSED_DATA_DIR / "cv",
output_dir: Path = MODELS_DIR / "finetune_cv",
# 模型参数
d_model: int = 256,
num_heads: int = 8,
n_attn_layers: int = 4,
fusion_strategy: str = "attention",
head_hidden_dim: int = 128,
dropout: float = 0.1,
# MPNN 参数(可选)
use_mpnn: bool = False,
mpnn_checkpoint: Optional[str] = None,
mpnn_ensemble_paths: Optional[str] = None,
mpnn_device: str = "cpu",
# 训练参数
batch_size: int = 32,
lr: float = 1e-4,
weight_decay: float = 1e-5,
epochs: int = 100,
patience: int = 15,
# 预训练权重加载
init_from_pretrain: Optional[Path] = None,
load_delivery_head: bool = True,
freeze_backbone: bool = False,
# 设备
device: str = "cuda" if torch.cuda.is_available() else "cpu",
):
"""
基于 Cross-Validation 训练 LNP 模型多任务
5-fold 内部数据上训练 5 个模型
使用 --use-mpnn 启用 MPNN encoder
使用 --init-from-pretrain 从预训练 checkpoint 初始化
使用 --freeze-backbone 冻结 backbone只训练 heads
"""
logger.info(f"Using device: {device}")
device = torch.device(device)
# 查找所有 fold 目录
fold_dirs = sorted([d for d in data_dir.iterdir() if d.is_dir() and d.name.startswith("fold_")])
if not fold_dirs:
logger.error(f"No fold_* directories found in {data_dir}")
logger.info("Please run 'make data_cv' first to process CV data.")
raise typer.Exit(1)
logger.info(f"Found {len(fold_dirs)} folds: {[d.name for d in fold_dirs]}")
output_dir.mkdir(parents=True, exist_ok=True)
# 解析 MPNN 配置
ensemble_paths_list = None
if mpnn_ensemble_paths:
ensemble_paths_list = mpnn_ensemble_paths.split(",")
elif use_mpnn and mpnn_checkpoint is None:
logger.info(f"Auto-detecting MPNN ensemble from {DEFAULT_MPNN_ENSEMBLE_DIR}")
ensemble_paths_list = find_mpnn_ensemble_paths()
logger.info(f"Found {len(ensemble_paths_list)} MPNN models")
enable_mpnn = mpnn_checkpoint is not None or ensemble_paths_list is not None
# 模型配置
config = {
"d_model": d_model,
"num_heads": num_heads,
"n_attn_layers": n_attn_layers,
"fusion_strategy": fusion_strategy,
"head_hidden_dim": head_hidden_dim,
"dropout": dropout,
"use_mpnn": enable_mpnn,
"lr": lr,
"weight_decay": weight_decay,
"batch_size": batch_size,
"epochs": epochs,
"patience": patience,
"init_from_pretrain": str(init_from_pretrain) if init_from_pretrain else None,
"freeze_backbone": freeze_backbone,
}
# 保存配置
config_path = output_dir / "config.json"
with open(config_path, "w") as f:
json.dump(config, f, indent=2)
logger.info(f"Saved config to {config_path}")
# 加载预训练权重(如果指定)
pretrain_state = None
if init_from_pretrain is not None:
logger.info(f"Loading pretrain weights from {init_from_pretrain}")
checkpoint = torch.load(init_from_pretrain, map_location="cpu")
pretrain_config = checkpoint.get("config", {})
if pretrain_config.get("d_model") != d_model:
logger.warning(
f"d_model mismatch: pretrain={pretrain_config.get('d_model')}, "
f"current={d_model}. Skipping pretrain loading."
)
else:
pretrain_state = checkpoint["model_state_dict"]
# 训练每个 fold
fold_results = []
for fold_dir in tqdm(fold_dirs, desc="Training folds"):
fold_idx = int(fold_dir.name.split("_")[1])
# 加载数据
train_df = pd.read_parquet(fold_dir / "train.parquet")
val_df = pd.read_parquet(fold_dir / "val.parquet")
logger.info(f"\nFold {fold_idx}: train={len(train_df)}, val={len(val_df)}")
# 创建 Dataset 和 DataLoader
train_dataset = LNPDataset(train_df)
val_dataset = LNPDataset(val_df)
train_loader = DataLoader(
train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn
)
val_loader = DataLoader(
val_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn
)
# 创建新模型(每个 fold 独立初始化)
model = create_model(
d_model=d_model,
num_heads=num_heads,
n_attn_layers=n_attn_layers,
fusion_strategy=fusion_strategy,
head_hidden_dim=head_hidden_dim,
dropout=dropout,
mpnn_checkpoint=mpnn_checkpoint,
mpnn_ensemble_paths=ensemble_paths_list,
mpnn_device=device.type,
)
# 加载预训练权重
if pretrain_state is not None:
model.load_pretrain_weights(
pretrain_state_dict=pretrain_state,
load_delivery_head=load_delivery_head,
strict=False,
)
logger.info(f"Loaded pretrain weights (backbone + delivery_head={load_delivery_head})")
# 冻结 backbone如果指定
if freeze_backbone:
frozen_count = 0
for name, param in model.named_parameters():
if name.startswith(("token_projector.", "cross_attention.", "fusion.")):
param.requires_grad = False
frozen_count += 1
logger.info(f"Frozen {frozen_count} parameter tensors")
# 打印模型信息(仅第一个 fold
if fold_idx == 0:
n_params_total = sum(p.numel() for p in model.parameters())
n_params_trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
logger.info(f"Model parameters: {n_params_total:,} total, {n_params_trainable:,} trainable")
# 训练
result = train_fold(
fold_idx=fold_idx,
train_loader=train_loader,
val_loader=val_loader,
model=model,
device=device,
output_dir=output_dir,
lr=lr,
weight_decay=weight_decay,
epochs=epochs,
patience=patience,
config=config,
)
fold_results.append(result)
# 汇总结果
logger.info("\n" + "=" * 60)
logger.info("CROSS-VALIDATION TRAINING COMPLETE")
logger.info("=" * 60)
val_losses = [r["best_val_loss"] for r in fold_results]
logger.info(f"\n[Per-Fold Results]")
for r in fold_results:
logger.info(
f" Fold {r['fold_idx']}: "
f"Val Loss={r['best_val_loss']:.4f}, "
f"Epochs={r['epochs_trained']}"
)
logger.info(f"\n[Summary Statistics]")
logger.info(f" Val Loss: {np.mean(val_losses):.4f} ± {np.std(val_losses):.4f}")
# 保存 CV 结果
cv_results = {
"fold_results": fold_results,
"summary": {
"val_loss_mean": float(np.mean(val_losses)),
"val_loss_std": float(np.std(val_losses)),
},
"config": config,
}
results_path = output_dir / "cv_results.json"
with open(results_path, "w") as f:
json.dump(cv_results, f, indent=2)
logger.success(f"Saved CV results to {results_path}")
@app.command()
def test(
data_dir: Path = PROCESSED_DATA_DIR / "cv",
model_dir: Path = MODELS_DIR / "finetune_cv",
output_path: Path = MODELS_DIR / "finetune_cv" / "test_results.json",
batch_size: int = 64,
device: str = "cuda" if torch.cuda.is_available() else "cpu",
):
"""
在测试集上评估 CV 训练的模型
使用每个 fold 的模型在对应的测试集上评估然后汇总结果
"""
from scipy.special import rel_entr
from sklearn.metrics import (
mean_squared_error,
mean_absolute_error,
r2_score,
accuracy_score,
precision_score,
recall_score,
f1_score,
)
def kl_divergence(p: np.ndarray, q: np.ndarray, eps: float = 1e-10) -> float:
"""计算 KL 散度 KL(p || q)"""
p = np.clip(p, eps, 1.0)
q = np.clip(q, eps, 1.0)
return float(np.sum(rel_entr(p, q), axis=-1).mean())
def js_divergence(p: np.ndarray, q: np.ndarray, eps: float = 1e-10) -> float:
"""计算 JS 散度"""
p = np.clip(p, eps, 1.0)
q = np.clip(q, eps, 1.0)
m = 0.5 * (p + q)
return float(0.5 * (np.sum(rel_entr(p, m), axis=-1) + np.sum(rel_entr(q, m), axis=-1)).mean())
logger.info(f"Using device: {device}")
device = torch.device(device)
# 查找所有 fold 目录
fold_dirs = sorted([d for d in data_dir.iterdir() if d.is_dir() and d.name.startswith("fold_")])
if not fold_dirs:
logger.error(f"No fold_* directories found in {data_dir}")
raise typer.Exit(1)
logger.info(f"Found {len(fold_dirs)} folds")
fold_results = []
# 用于汇总所有 fold 的预测
all_preds = {
"size": [], "delivery": [], "pdi": [], "ee": [], "toxic": [], "biodist": []
}
all_targets = {
"size": [], "delivery": [], "pdi": [], "ee": [], "toxic": [], "biodist": []
}
for fold_dir in tqdm(fold_dirs, desc="Evaluating folds"):
fold_idx = int(fold_dir.name.split("_")[1])
model_path = model_dir / f"fold_{fold_idx}" / "model.pt"
test_path = fold_dir / "test.parquet"
if not model_path.exists():
logger.warning(f"Fold {fold_idx}: model not found at {model_path}, skipping")
continue
if not test_path.exists():
logger.warning(f"Fold {fold_idx}: test data not found at {test_path}, skipping")
continue
# 加载模型
checkpoint = torch.load(model_path, map_location=device)
config = checkpoint["config"]
use_mpnn = config.get("use_mpnn", False)
# 总是重新查找 MPNN 路径
if use_mpnn:
mpnn_paths = find_mpnn_ensemble_paths()
else:
mpnn_paths = None
model = create_model(
d_model=config["d_model"],
num_heads=config["num_heads"],
n_attn_layers=config["n_attn_layers"],
fusion_strategy=config["fusion_strategy"],
head_hidden_dim=config["head_hidden_dim"],
dropout=config["dropout"],
mpnn_ensemble_paths=mpnn_paths,
mpnn_device=device.type,
)
model.load_state_dict(checkpoint["model_state_dict"])
model = model.to(device)
model.eval()
# 加载测试数据
test_df = pd.read_parquet(test_path)
test_dataset = LNPDataset(test_df)
test_loader = DataLoader(
test_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn
)
# 收集当前 fold 的预测
fold_preds = {k: [] for k in all_preds.keys()}
fold_targets = {k: [] for k in all_targets.keys()}
with torch.no_grad():
pbar = tqdm(test_loader, desc=f"Fold {fold_idx} [Test]", leave=False)
for batch in pbar:
smiles = batch["smiles"]
tabular = {k: v.to(device) for k, v in batch["tabular"].items()}
targets = batch["targets"]
masks = batch["mask"]
outputs = model(smiles, tabular)
# Size
if "size" in masks and masks["size"].any():
mask = masks["size"]
fold_preds["size"].extend(
outputs["size"].squeeze(-1)[mask].cpu().numpy().tolist()
)
fold_targets["size"].extend(
targets["size"][mask].cpu().numpy().tolist()
)
# Delivery
if "delivery" in masks and masks["delivery"].any():
mask = masks["delivery"]
fold_preds["delivery"].extend(
outputs["delivery"].squeeze(-1)[mask].cpu().numpy().tolist()
)
fold_targets["delivery"].extend(
targets["delivery"][mask].cpu().numpy().tolist()
)
# PDI (classification)
if "pdi" in masks and masks["pdi"].any():
mask = masks["pdi"]
pdi_preds = outputs["pdi"][mask].argmax(dim=-1).cpu().numpy()
pdi_targets = targets["pdi"][mask].cpu().numpy()
fold_preds["pdi"].extend(pdi_preds.tolist())
fold_targets["pdi"].extend(pdi_targets.tolist())
# EE (classification)
if "ee" in masks and masks["ee"].any():
mask = masks["ee"]
ee_preds = outputs["ee"][mask].argmax(dim=-1).cpu().numpy()
ee_targets = targets["ee"][mask].cpu().numpy()
fold_preds["ee"].extend(ee_preds.tolist())
fold_targets["ee"].extend(ee_targets.tolist())
# Toxic (classification)
if "toxic" in masks and masks["toxic"].any():
mask = masks["toxic"]
toxic_preds = outputs["toxic"][mask].argmax(dim=-1).cpu().numpy()
toxic_targets = targets["toxic"][mask].cpu().numpy().astype(int)
fold_preds["toxic"].extend(toxic_preds.tolist())
fold_targets["toxic"].extend(toxic_targets.tolist())
# Biodist (distribution)
if "biodist" in masks and masks["biodist"].any():
mask = masks["biodist"]
biodist_preds = outputs["biodist"][mask].cpu().numpy()
biodist_targets = targets["biodist"][mask].cpu().numpy()
fold_preds["biodist"].extend(biodist_preds.tolist())
fold_targets["biodist"].extend(biodist_targets.tolist())
# 计算当前 fold 的指标
fold_metrics = {"fold_idx": fold_idx, "n_samples": len(test_df)}
# 回归任务指标
for task in ["size", "delivery"]:
if fold_preds[task]:
p = np.array(fold_preds[task])
t = np.array(fold_targets[task])
fold_metrics[task] = {
"n": len(p),
"rmse": float(np.sqrt(mean_squared_error(t, p))),
"mae": float(mean_absolute_error(t, p)),
"r2": float(r2_score(t, p)),
}
# 分类任务指标
for task in ["pdi", "ee", "toxic"]:
if fold_preds[task]:
p = np.array(fold_preds[task])
t = np.array(fold_targets[task])
fold_metrics[task] = {
"n": len(p),
"accuracy": float(accuracy_score(t, p)),
"precision": float(precision_score(t, p, average="macro", zero_division=0)),
"recall": float(recall_score(t, p, average="macro", zero_division=0)),
"f1": float(f1_score(t, p, average="macro", zero_division=0)),
}
# 分布任务指标
if fold_preds["biodist"]:
p = np.array(fold_preds["biodist"])
t = np.array(fold_targets["biodist"])
fold_metrics["biodist"] = {
"n": len(p),
"kl_divergence": kl_divergence(t, p),
"js_divergence": js_divergence(t, p),
}
fold_results.append(fold_metrics)
# 汇总到全局
for task in all_preds.keys():
all_preds[task].extend(fold_preds[task])
all_targets[task].extend(fold_targets[task])
# 打印当前 fold 结果
log_parts = [f"Fold {fold_idx}: n={len(test_df)}"]
for task in ["delivery", "size"]:
if task in fold_metrics and isinstance(fold_metrics[task], dict):
log_parts.append(f"{task}_RMSE={fold_metrics[task]['rmse']:.4f}")
log_parts.append(f"{task}_R²={fold_metrics[task]['r2']:.4f}")
for task in ["pdi", "ee", "toxic"]:
if task in fold_metrics and isinstance(fold_metrics[task], dict):
log_parts.append(f"{task}_acc={fold_metrics[task]['accuracy']:.4f}")
log_parts.append(f"{task}_f1={fold_metrics[task]['f1']:.4f}")
if "biodist" in fold_metrics and isinstance(fold_metrics["biodist"], dict):
log_parts.append(f"biodist_KL={fold_metrics['biodist']['kl_divergence']:.4f}")
log_parts.append(f"biodist_JS={fold_metrics['biodist']['js_divergence']:.4f}")
logger.info(", ".join(log_parts))
# 计算跨 fold 汇总统计
summary_stats = {}
for task in ["size", "delivery"]:
rmses = [r[task]["rmse"] for r in fold_results if task in r and isinstance(r[task], dict)]
r2s = [r[task]["r2"] for r in fold_results if task in r and isinstance(r[task], dict)]
if rmses:
summary_stats[task] = {
"rmse_mean": float(np.mean(rmses)),
"rmse_std": float(np.std(rmses)),
"r2_mean": float(np.mean(r2s)),
"r2_std": float(np.std(r2s)),
}
for task in ["pdi", "ee", "toxic"]:
accs = [r[task]["accuracy"] for r in fold_results if task in r and isinstance(r[task], dict)]
f1s = [r[task]["f1"] for r in fold_results if task in r and isinstance(r[task], dict)]
if accs:
summary_stats[task] = {
"accuracy_mean": float(np.mean(accs)),
"accuracy_std": float(np.std(accs)),
"f1_mean": float(np.mean(f1s)),
"f1_std": float(np.std(f1s)),
}
# 分布任务汇总
kls = [r["biodist"]["kl_divergence"] for r in fold_results if "biodist" in r and isinstance(r["biodist"], dict)]
jss = [r["biodist"]["js_divergence"] for r in fold_results if "biodist" in r and isinstance(r["biodist"], dict)]
if kls:
summary_stats["biodist"] = {
"kl_mean": float(np.mean(kls)),
"kl_std": float(np.std(kls)),
"js_mean": float(np.mean(jss)),
"js_std": float(np.std(jss)),
}
# 计算整体 pooled 指标
overall = {}
for task in ["size", "delivery"]:
if all_preds[task]:
p = np.array(all_preds[task])
t = np.array(all_targets[task])
overall[task] = {
"n_samples": len(p),
"mse": float(mean_squared_error(t, p)),
"rmse": float(np.sqrt(mean_squared_error(t, p))),
"mae": float(mean_absolute_error(t, p)),
"r2": float(r2_score(t, p)),
}
for task in ["pdi", "ee", "toxic"]:
if all_preds[task]:
p = np.array(all_preds[task])
t = np.array(all_targets[task])
overall[task] = {
"n_samples": len(p),
"accuracy": float(accuracy_score(t, p)),
"precision": float(precision_score(t, p, average="macro", zero_division=0)),
"recall": float(recall_score(t, p, average="macro", zero_division=0)),
"f1": float(f1_score(t, p, average="macro", zero_division=0)),
}
# 分布任务
if all_preds["biodist"]:
p = np.array(all_preds["biodist"])
t = np.array(all_targets["biodist"])
overall["biodist"] = {
"n_samples": len(p),
"kl_divergence": kl_divergence(t, p),
"js_divergence": js_divergence(t, p),
}
# 打印汇总结果
logger.info("\n" + "=" * 60)
logger.info("CV TEST EVALUATION RESULTS")
logger.info("=" * 60)
logger.info(f"\n[Summary Statistics (across {len(fold_results)} folds)]")
for task, stats in summary_stats.items():
if "rmse_mean" in stats:
logger.info(f" {task}: RMSE={stats['rmse_mean']:.4f}±{stats['rmse_std']:.4f}, R²={stats['r2_mean']:.4f}±{stats['r2_std']:.4f}")
elif "accuracy_mean" in stats:
logger.info(f" {task}: Accuracy={stats['accuracy_mean']:.4f}±{stats['accuracy_std']:.4f}, F1={stats['f1_mean']:.4f}±{stats['f1_std']:.4f}")
elif "kl_mean" in stats:
logger.info(f" {task}: KL={stats['kl_mean']:.4f}±{stats['kl_std']:.4f}, JS={stats['js_mean']:.4f}±{stats['js_std']:.4f}")
logger.info(f"\n[Overall (all samples pooled)]")
for task, metrics in overall.items():
if "rmse" in metrics:
logger.info(f" {task} (n={metrics['n_samples']}): RMSE={metrics['rmse']:.4f}, MAE={metrics['mae']:.4f}, R²={metrics['r2']:.4f}")
elif "accuracy" in metrics:
logger.info(f" {task} (n={metrics['n_samples']}): Accuracy={metrics['accuracy']:.4f}, Precision={metrics['precision']:.4f}, Recall={metrics['recall']:.4f}, F1={metrics['f1']:.4f}")
elif "kl_divergence" in metrics:
logger.info(f" {task} (n={metrics['n_samples']}): KL={metrics['kl_divergence']:.4f}, JS={metrics['js_divergence']:.4f}")
# 保存结果
results = {
"fold_results": fold_results,
"summary_stats": summary_stats,
"overall": overall,
}
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w") as f:
json.dump(results, f, indent=2)
logger.success(f"\nSaved test results to {output_path}")
if __name__ == "__main__":
app()

View File

@ -1,158 +0,0 @@
"""数据处理脚本:将原始数据转换为模型可用的格式"""
from pathlib import Path
import pandas as pd
import typer
from loguru import logger
from lnp_ml.config import INTERIM_DATA_DIR, PROCESSED_DATA_DIR
from lnp_ml.dataset import (
process_dataframe,
SMILES_COL,
COMP_COLS,
HELP_COLS,
TARGET_REGRESSION,
TARGET_CLASSIFICATION_PDI,
TARGET_CLASSIFICATION_EE,
TARGET_TOXIC,
TARGET_BIODIST,
get_phys_cols,
get_exp_cols,
)
app = typer.Typer()
@app.command()
def main(
input_path: Path = INTERIM_DATA_DIR / "internal.csv",
output_dir: Path = PROCESSED_DATA_DIR,
train_ratio: float = 0.56,
val_ratio: float = 0.14,
seed: int = 42,
):
"""
处理原始数据并划分训练/验证/测试集
输出文件:
- train.parquet: 训练集
- val.parquet: 验证集
- test.parquet: 测试集
- feature_columns.txt: 特征列名配置
"""
logger.info(f"Loading data from {input_path}")
df = pd.read_csv(input_path)
logger.info(f"Loaded {len(df)} samples")
# 处理数据
logger.info("Processing dataframe...")
df = process_dataframe(df)
# 定义要保留的列
phys_cols = get_phys_cols()
exp_cols = get_exp_cols()
keep_cols = (
[SMILES_COL]
+ COMP_COLS
+ phys_cols
+ HELP_COLS
+ exp_cols
+ TARGET_REGRESSION
+ TARGET_CLASSIFICATION_PDI
+ TARGET_CLASSIFICATION_EE
+ [TARGET_TOXIC]
+ TARGET_BIODIST
)
# 只保留存在的列
keep_cols = [c for c in keep_cols if c in df.columns]
df = df[keep_cols]
# 随机打乱并划分
logger.info("Splitting dataset...")
df = df.sample(frac=1, random_state=seed).reset_index(drop=True)
n = len(df)
n_train = int(n * train_ratio)
n_val = int(n * val_ratio)
train_df = df.iloc[:n_train]
val_df = df.iloc[n_train:n_train + n_val]
test_df = df.iloc[n_train + n_val:]
logger.info(f"Train: {len(train_df)}, Val: {len(val_df)}, Test: {len(test_df)}")
# 保存
output_dir.mkdir(parents=True, exist_ok=True)
train_path = output_dir / "train.parquet"
val_path = output_dir / "val.parquet"
test_path = output_dir / "test.parquet"
train_df.to_parquet(train_path, index=False)
val_df.to_parquet(val_path, index=False)
test_df.to_parquet(test_path, index=False)
logger.success(f"Saved train to {train_path}")
logger.success(f"Saved val to {val_path}")
logger.success(f"Saved test to {test_path}")
# 保存列名配置
config_path = output_dir / "feature_columns.txt"
with open(config_path, "w") as f:
f.write("# Feature columns configuration\n\n")
f.write(f"# SMILES\n{SMILES_COL}\n\n")
f.write(f"# comp token [{len(COMP_COLS)}]\n")
f.write("\n".join(COMP_COLS) + "\n\n")
f.write(f"# phys token [{len(phys_cols)}]\n")
f.write("\n".join(phys_cols) + "\n\n")
f.write(f"# help token [{len(HELP_COLS)}]\n")
f.write("\n".join(HELP_COLS) + "\n\n")
f.write(f"# exp token [{len(exp_cols)}]\n")
f.write("\n".join(exp_cols) + "\n\n")
f.write("# Targets\n")
f.write("## Regression\n")
f.write("\n".join(TARGET_REGRESSION) + "\n")
f.write("## PDI classification\n")
f.write("\n".join(TARGET_CLASSIFICATION_PDI) + "\n")
f.write("## EE classification\n")
f.write("\n".join(TARGET_CLASSIFICATION_EE) + "\n")
f.write("## Toxic\n")
f.write(f"{TARGET_TOXIC}\n")
f.write("## Biodistribution\n")
f.write("\n".join(TARGET_BIODIST) + "\n")
logger.success(f"Saved feature config to {config_path}")
# 打印统计信息
logger.info("\n=== Dataset Statistics ===")
logger.info(f"Total samples: {n}")
logger.info(f"SMILES unique: {df[SMILES_COL].nunique()}")
# 缺失值统计
logger.info("\nMissing values in targets:")
for col in TARGET_REGRESSION + [TARGET_TOXIC]:
if col in df.columns:
missing = df[col].isna().sum()
logger.info(f" {col}: {missing} ({100*missing/n:.1f}%)")
# PDI 分布
if all(c in df.columns for c in TARGET_CLASSIFICATION_PDI):
pdi_sum = df[TARGET_CLASSIFICATION_PDI].sum()
logger.info(f"\nPDI distribution:")
for col, count in pdi_sum.items():
logger.info(f" {col}: {int(count)}")
# EE 分布
if all(c in df.columns for c in TARGET_CLASSIFICATION_EE):
ee_sum = df[TARGET_CLASSIFICATION_EE].sum()
logger.info(f"\nEE distribution:")
for col, count in ee_sum.items():
logger.info(f" {col}: {int(count)}")
if __name__ == "__main__":
app()

View File

@ -1,302 +0,0 @@
"""内部数据 Cross-Validation 划分脚本:支持随机划分或基于 Amine 的分组划分"""
from pathlib import Path
from typing import List
import numpy as np
import pandas as pd
import typer
from loguru import logger
from lnp_ml.config import INTERIM_DATA_DIR, PROCESSED_DATA_DIR
from lnp_ml.dataset import (
process_dataframe,
SMILES_COL,
COMP_COLS,
HELP_COLS,
TARGET_REGRESSION,
TARGET_CLASSIFICATION_PDI,
TARGET_CLASSIFICATION_EE,
TARGET_TOXIC,
TARGET_BIODIST,
get_phys_cols,
get_exp_cols,
)
app = typer.Typer()
def random_cv_split(
df: pd.DataFrame,
n_folds: int = 5,
seed: int = 42,
) -> List[dict]:
"""
随机 shuffle 进行 Cross-Validation 划分
步骤
1. 打乱所有样本
2. 将样本分成 n_folds 个容器
3. 对于每个 fold i
- validation = container[i]
- test = container[(i+1) % n_folds]
- train = 其余所有
Args:
df: 输入 DataFrame
n_folds: 折数
seed: 随机种子
Returns:
List of dicts每个 dict 包含 train_df, val_df, test_df
"""
# 打乱所有样本
df_shuffled = df.sample(frac=1, random_state=seed).reset_index(drop=True)
n_samples = len(df_shuffled)
logger.info(f"Total {n_samples} samples for random CV split")
# 将样本分成 n_folds 个容器
indices = np.arange(n_samples)
containers = np.array_split(indices, n_folds)
# 打印每个容器的大小
for i, container in enumerate(containers):
logger.info(f" Container {i}: {len(container)} samples")
# 生成每个 fold 的数据
fold_splits = []
for i in range(n_folds):
val_indices = containers[i]
test_indices = containers[(i + 1) % n_folds]
train_indices = np.concatenate([
containers[j] for j in range(n_folds)
if j != i and j != (i + 1) % n_folds
])
train_df = df_shuffled.iloc[train_indices].reset_index(drop=True)
val_df = df_shuffled.iloc[val_indices].reset_index(drop=True)
test_df = df_shuffled.iloc[test_indices].reset_index(drop=True)
fold_splits.append({
"train": train_df,
"val": val_df,
"test": test_df,
})
logger.info(
f"Fold {i}: train={len(train_df)}, val={len(val_df)}, test={len(test_df)}"
)
return fold_splits
def amine_based_cv_split(
df: pd.DataFrame,
n_folds: int = 5,
seed: int = 42,
amine_col: str = "Amine",
) -> List[dict]:
"""
基于 Amine 列进行 Cross-Validation 划分
步骤
1. amine_col 分组
2. 打乱分组顺序
3. 将分组 round-robin 分配到 n_folds 个容器
4. 对于每个 fold i
- validation = container[i]
- test = container[(i+1) % n_folds]
- train = 其余所有
Args:
df: 输入 DataFrame
n_folds: 折数
seed: 随机种子
amine_col: 用于分组的列名
Returns:
List of dicts每个 dict 包含 train_df, val_df, test_df
"""
# 获取唯一的 amine 并打乱
unique_amines = df[amine_col].unique()
rng = np.random.RandomState(seed)
rng.shuffle(unique_amines)
logger.info(f"Found {len(unique_amines)} unique amines")
# Round-robin 分配到 n_folds 个容器
containers = [[] for _ in range(n_folds)]
for i, amine in enumerate(unique_amines):
containers[i % n_folds].append(amine)
# 打印每个容器的大小
for i, container in enumerate(containers):
container_samples = df[df[amine_col].isin(container)]
logger.info(f" Container {i}: {len(container)} amines, {len(container_samples)} samples")
# 生成每个 fold 的数据
fold_splits = []
for i in range(n_folds):
val_amines = set(containers[i])
test_amines = set(containers[(i + 1) % n_folds])
train_amines = set()
for j in range(n_folds):
if j != i and j != (i + 1) % n_folds:
train_amines.update(containers[j])
train_df = df[df[amine_col].isin(train_amines)].reset_index(drop=True)
val_df = df[df[amine_col].isin(val_amines)].reset_index(drop=True)
test_df = df[df[amine_col].isin(test_amines)].reset_index(drop=True)
fold_splits.append({
"train": train_df,
"val": val_df,
"test": test_df,
})
logger.info(
f"Fold {i}: train={len(train_df)} ({len(train_amines)} amines), "
f"val={len(val_df)} ({len(val_amines)} amines), "
f"test={len(test_df)} ({len(test_amines)} amines)"
)
return fold_splits
@app.command()
def main(
input_path: Path = INTERIM_DATA_DIR / "internal.csv",
output_dir: Path = PROCESSED_DATA_DIR / "cv",
n_folds: int = 5,
seed: int = 42,
amine_col: str = "Amine",
scaffold_split: bool = typer.Option(
False,
"--scaffold-split",
help="使用基于 Amine 的 scaffold splitting默认随机 shuffle",
),
):
"""
Cross-Validation 数据划分
支持两种划分方式
- 随机划分默认直接 shuffle 所有样本
- Scaffold splitting--scaffold-split基于 Amine 分组确保同一 Amine 的数据在同一组
划分比例约为 train:val:test 3:1:1
输出结构:
- processed/cv/fold_0/train.parquet
- processed/cv/fold_0/val.parquet
- processed/cv/fold_0/test.parquet
- processed/cv/fold_1/...
- processed/cv/feature_columns.txt
"""
logger.info(f"Loading data from {input_path}")
df = pd.read_csv(input_path)
logger.info(f"Loaded {len(df)} samples")
# 处理数据列对齐、one-hot 生成等)
logger.info("Processing dataframe...")
df = process_dataframe(df)
# 如果使用 scaffold split检查 amine 列是否存在
if scaffold_split:
# 重新加载原始数据获取 Amine 列process_dataframe 可能不会保留它)
original_df = pd.read_csv(input_path)
if amine_col not in original_df.columns:
logger.error(f"Column '{amine_col}' not found in data. Available columns: {list(original_df.columns)}")
raise typer.Exit(1)
if amine_col not in df.columns:
df[amine_col] = original_df[amine_col].values
# 定义要保留的列
phys_cols = get_phys_cols()
exp_cols = get_exp_cols()
keep_cols = (
[SMILES_COL]
+ COMP_COLS
+ phys_cols
+ HELP_COLS
+ exp_cols
+ TARGET_REGRESSION
+ TARGET_CLASSIFICATION_PDI
+ TARGET_CLASSIFICATION_EE
+ [TARGET_TOXIC]
+ TARGET_BIODIST
)
# 只保留存在的列
keep_cols = [c for c in keep_cols if c in df.columns]
# 进行 CV 划分
if scaffold_split:
logger.info(f"\nPerforming {n_folds}-fold amine-based scaffold CV split (seed={seed})...")
fold_splits = amine_based_cv_split(df, n_folds=n_folds, seed=seed, amine_col=amine_col)
split_method = f"Amine-based scaffold (column: {amine_col})"
else:
logger.info(f"\nPerforming {n_folds}-fold random CV split (seed={seed})...")
fold_splits = random_cv_split(df, n_folds=n_folds, seed=seed)
split_method = "Random shuffle"
# 保存每个 fold
output_dir.mkdir(parents=True, exist_ok=True)
for i, split in enumerate(fold_splits):
fold_dir = output_dir / f"fold_{i}"
fold_dir.mkdir(parents=True, exist_ok=True)
# 只保留需要的列
train_df = split["train"][keep_cols].reset_index(drop=True)
val_df = split["val"][keep_cols].reset_index(drop=True)
test_df = split["test"][keep_cols].reset_index(drop=True)
# 保存
train_df.to_parquet(fold_dir / "train.parquet", index=False)
val_df.to_parquet(fold_dir / "val.parquet", index=False)
test_df.to_parquet(fold_dir / "test.parquet", index=False)
logger.success(f"Saved fold {i} to {fold_dir}")
# 保存列名配置
config_path = output_dir / "feature_columns.txt"
with open(config_path, "w") as f:
f.write("# Feature columns configuration\n\n")
f.write(f"# SMILES\n{SMILES_COL}\n\n")
f.write(f"# comp token [{len(COMP_COLS)}]\n")
f.write("\n".join(COMP_COLS) + "\n\n")
f.write(f"# phys token [{len(phys_cols)}]\n")
f.write("\n".join(phys_cols) + "\n\n")
f.write(f"# help token [{len(HELP_COLS)}]\n")
f.write("\n".join(HELP_COLS) + "\n\n")
f.write(f"# exp token [{len(exp_cols)}]\n")
f.write("\n".join(exp_cols) + "\n\n")
f.write("# Targets\n")
f.write("## Regression\n")
f.write("\n".join(TARGET_REGRESSION) + "\n")
f.write("## PDI classification\n")
f.write("\n".join(TARGET_CLASSIFICATION_PDI) + "\n")
f.write("## EE classification\n")
f.write("\n".join(TARGET_CLASSIFICATION_EE) + "\n")
f.write("## Toxic\n")
f.write(f"{TARGET_TOXIC}\n")
f.write("## Biodistribution\n")
f.write("\n".join(TARGET_BIODIST) + "\n")
logger.success(f"Saved feature config to {config_path}")
# 打印汇总
logger.info("\n" + "=" * 60)
logger.info("CV DATA PROCESSING COMPLETE")
logger.info("=" * 60)
logger.info(f"Output directory: {output_dir}")
logger.info(f"Number of folds: {n_folds}")
logger.info(f"Splitting method: {split_method}")
logger.info(f"Random seed: {seed}")
if __name__ == "__main__":
app()

View File

@ -1,153 +0,0 @@
"""最终训练数据处理脚本train:val = 9:1无测试集"""
from pathlib import Path
import pandas as pd
import typer
from loguru import logger
from lnp_ml.config import INTERIM_DATA_DIR, PROCESSED_DATA_DIR
from lnp_ml.dataset import (
process_dataframe,
SMILES_COL,
COMP_COLS,
HELP_COLS,
TARGET_REGRESSION,
TARGET_CLASSIFICATION_PDI,
TARGET_CLASSIFICATION_EE,
TARGET_TOXIC,
TARGET_BIODIST,
get_phys_cols,
get_exp_cols,
)
app = typer.Typer()
@app.command()
def main(
input_path: Path = INTERIM_DATA_DIR / "internal.csv",
output_dir: Path = PROCESSED_DATA_DIR / "final",
train_ratio: float = 0.9,
seed: int = 42,
):
"""
处理原始数据并划分训练/验证集无测试集
用于最终训练使用所有数据
输出文件:
- final/train.parquet: 训练集 (90%)
- final/val.parquet: 验证集 (10%)
- final/feature_columns.txt: 特征列名配置
"""
logger.info(f"Loading data from {input_path}")
df = pd.read_csv(input_path)
logger.info(f"Loaded {len(df)} samples")
# 处理数据
logger.info("Processing dataframe...")
df = process_dataframe(df)
# 定义要保留的列
phys_cols = get_phys_cols()
exp_cols = get_exp_cols()
keep_cols = (
[SMILES_COL]
+ COMP_COLS
+ phys_cols
+ HELP_COLS
+ exp_cols
+ TARGET_REGRESSION
+ TARGET_CLASSIFICATION_PDI
+ TARGET_CLASSIFICATION_EE
+ [TARGET_TOXIC]
+ TARGET_BIODIST
)
# 只保留存在的列
keep_cols = [c for c in keep_cols if c in df.columns]
df = df[keep_cols]
# 随机打乱并划分
logger.info(f"Splitting dataset (train:val = {train_ratio}:{1-train_ratio:.1f})...")
df = df.sample(frac=1, random_state=seed).reset_index(drop=True)
n = len(df)
n_train = int(n * train_ratio)
train_df = df.iloc[:n_train]
val_df = df.iloc[n_train:]
logger.info(f"Train: {len(train_df)}, Val: {len(val_df)}")
# 保存
output_dir.mkdir(parents=True, exist_ok=True)
train_path = output_dir / "train.parquet"
val_path = output_dir / "val.parquet"
train_df.to_parquet(train_path, index=False)
val_df.to_parquet(val_path, index=False)
logger.success(f"Saved train to {train_path}")
logger.success(f"Saved val to {val_path}")
# 保存列名配置
config_path = output_dir / "feature_columns.txt"
with open(config_path, "w") as f:
f.write("# Feature columns configuration (final training)\n\n")
f.write(f"# SMILES\n{SMILES_COL}\n\n")
f.write(f"# comp token [{len(COMP_COLS)}]\n")
f.write("\n".join(COMP_COLS) + "\n\n")
f.write(f"# phys token [{len(phys_cols)}]\n")
f.write("\n".join(phys_cols) + "\n\n")
f.write(f"# help token [{len(HELP_COLS)}]\n")
f.write("\n".join(HELP_COLS) + "\n\n")
f.write(f"# exp token [{len(exp_cols)}]\n")
f.write("\n".join(exp_cols) + "\n\n")
f.write("# Targets\n")
f.write("## Regression\n")
f.write("\n".join(TARGET_REGRESSION) + "\n")
f.write("## PDI classification\n")
f.write("\n".join(TARGET_CLASSIFICATION_PDI) + "\n")
f.write("## EE classification\n")
f.write("\n".join(TARGET_CLASSIFICATION_EE) + "\n")
f.write("## Toxic\n")
f.write(f"{TARGET_TOXIC}\n")
f.write("## Biodistribution\n")
f.write("\n".join(TARGET_BIODIST) + "\n")
logger.success(f"Saved feature config to {config_path}")
# 打印统计信息
logger.info("\n=== Dataset Statistics ===")
logger.info(f"Total samples: {n}")
logger.info(f"SMILES unique: {df[SMILES_COL].nunique()}")
# 缺失值统计
logger.info("\nMissing values in targets:")
for col in TARGET_REGRESSION + [TARGET_TOXIC]:
if col in df.columns:
missing = df[col].isna().sum()
logger.info(f" {col}: {missing} ({100*missing/n:.1f}%)")
# PDI 分布
if all(c in df.columns for c in TARGET_CLASSIFICATION_PDI):
pdi_sum = df[TARGET_CLASSIFICATION_PDI].sum()
logger.info(f"\nPDI distribution:")
for col, count in pdi_sum.items():
logger.info(f" {col}: {int(count)}")
# EE 分布
if all(c in df.columns for c in TARGET_CLASSIFICATION_EE):
ee_sum = df[TARGET_CLASSIFICATION_EE].sum()
logger.info(f"\nEE distribution:")
for col, count in ee_sum.items():
logger.info(f" {col}: {int(count)}")
if __name__ == "__main__":
app()

View File

@ -18,7 +18,7 @@ app = typer.Typer()
def main(
input_path: Path = EXTERNAL_DATA_DIR / "all_data_LiON.csv",
output_dir: Path = PROCESSED_DATA_DIR,
train_ratio: float = 0.7,
train_ratio: float = 0.85,
seed: int = 42,
):
"""