diff --git a/Makefile b/Makefile index ccdb605..3835fb4 100644 --- a/Makefile +++ b/Makefile @@ -15,6 +15,7 @@ EPOCHS_PER_TRIAL_FLAG = $(if $(EPOCHS_PER_TRIAL),--epochs-per-trial $(EPOCHS_PER MIN_STRATUM_FLAG = $(if $(MIN_STRATUM_COUNT),--min-stratum-count $(MIN_STRATUM_COUNT),) OUTPUT_DIR_FLAG = $(if $(OUTPUT_DIR),--output-dir $(OUTPUT_DIR),) USE_SWA_FLAG = $(if $(USE_SWA),--use-swa,) +PARALLEL_FLAG = $(if $(PARALLEL),--parallel,) INIT_PRETRAIN_FLAG = $(if $(NO_PRETRAIN),,--init-from-pretrain $(or $(INIT_PRETRAIN),models/pretrain_delivery.pt)) ################################################################################# @@ -91,10 +92,12 @@ benchmark: requirements # INIT_PRETRAIN 预训练权重路径 (默认: models/pretrain_delivery.pt) # NO_PRETRAIN=1 禁用预训练权重 # USE_SWA=1 启用 SWA (final train 阶段) +# PARALLEL=1 并行运行外层 fold (nested CV 阶段,需足够 GPU 显存) # # 使用示例: # make pretrain # make train DEVICE=cuda N_TRIALS=30 USE_SWA=1 INIT_PRETRAIN=models/pretrain_delivery.pt +# make train DEVICE=cuda PARALLEL=1 ## Pretrain on external data (delivery only) @@ -109,7 +112,7 @@ pretrain: requirements train: requirements $(PYTHON_INTERPRETER) -m lnp_ml.modeling.nested_cv_optuna \ $(DEVICE_FLAG) $(MPNN_FLAG) $(SEED_FLAG) $(INIT_PRETRAIN_FLAG) \ - $(N_TRIALS_FLAG) $(EPOCHS_PER_TRIAL_FLAG) $(MIN_STRATUM_FLAG) $(OUTPUT_DIR_FLAG) + $(N_TRIALS_FLAG) $(EPOCHS_PER_TRIAL_FLAG) $(MIN_STRATUM_FLAG) $(OUTPUT_DIR_FLAG) $(PARALLEL_FLAG) $(PYTHON_INTERPRETER) -m lnp_ml.modeling.final_train_optuna_cv \ $(DEVICE_FLAG) $(MPNN_FLAG) $(SEED_FLAG) $(INIT_PRETRAIN_FLAG) \ $(N_TRIALS_FLAG) $(EPOCHS_PER_TRIAL_FLAG) $(MIN_STRATUM_FLAG) $(OUTPUT_DIR_FLAG) $(USE_SWA_FLAG) diff --git a/lnp_ml/modeling/nested_cv_optuna.py b/lnp_ml/modeling/nested_cv_optuna.py index 0f7781c..df323dc 100644 --- a/lnp_ml/modeling/nested_cv_optuna.py +++ b/lnp_ml/modeling/nested_cv_optuna.py @@ -503,6 +503,173 @@ def run_inner_optuna( return best_params, epoch_mean, study +# ============ 单 fold 执行(可跨进程调用) ============ + +def _run_single_outer_fold( + outer_fold: int, + outer_train_idx: np.ndarray, + outer_test_idx: np.ndarray, + df: pd.DataFrame, + strata: np.ndarray, + fold_dir: Path, + n_trials: int, + epochs_per_trial: int, + inner_patience: int, + batch_size: int, + n_inner_folds: int, + use_mpnn: bool, + seed: int, + pretrain_state_dict: Optional[Dict], + pretrain_config: Optional[Dict], + load_delivery_head: bool, + device_str: str, +) -> Dict: + """ + 执行单个外层 fold 的完整流程(内层调参 + 外层训练 + 评估)。 + + 所有参数均为可序列化类型,以支持 spawn 多进程。 + """ + device = torch.device(device_str) + fold_dir = Path(fold_dir) + fold_dir.mkdir(parents=True, exist_ok=True) + + full_dataset = LNPDataset(df) + + logger.info(f"\n{'='*60}") + logger.info(f"OUTER FOLD {outer_fold}") + logger.info(f"{'='*60}") + logger.info(f"Train: {len(outer_train_idx)}, Test: {len(outer_test_idx)}") + + # 保存 split indices + with open(fold_dir / "splits.json", "w") as f: + json.dump({ + "outer_train_idx": outer_train_idx.tolist(), + "outer_test_idx": outer_test_idx.tolist(), + }, f) + + # 内层 Optuna 调参 + logger.info(f"\nRunning inner Optuna with {n_trials} trials...") + study_path = fold_dir / "optuna_study.sqlite3" + + best_params, epoch_mean, study = run_inner_optuna( + full_dataset=full_dataset, + inner_train_indices=outer_train_idx, + strata=strata, + device=device, + n_trials=n_trials, + epochs_per_trial=epochs_per_trial, + patience=inner_patience, + batch_size=batch_size, + n_inner_folds=n_inner_folds, + use_mpnn=use_mpnn, + seed=seed + outer_fold, + study_path=study_path, + pretrain_state_dict=pretrain_state_dict, + pretrain_config=pretrain_config, + load_delivery_head=load_delivery_head, + ) + + # 保存最佳参数 + with open(fold_dir / "best_params.json", "w") as f: + json.dump(best_params, f, indent=2) + + with open(fold_dir / "epoch_mean.json", "w") as f: + json.dump({"epoch_mean": epoch_mean}, f) + + # 外层训练(使用最优超参,固定 epoch 数,不 early-stop) + logger.info(f"\nTraining outer fold with best params, epochs={epoch_mean}...") + + train_subset = Subset(full_dataset, outer_train_idx.tolist()) + test_subset = Subset(full_dataset, outer_test_idx.tolist()) + + train_loader = DataLoader( + train_subset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn + ) + test_loader = DataLoader( + test_subset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn + ) + + class_weights = compute_class_weights_from_loader(train_loader) + + model = create_model( + d_model=best_params["d_model"], + num_heads=best_params["num_heads"], + n_attn_layers=best_params["n_attn_layers"], + fusion_strategy=best_params["fusion_strategy"], + head_hidden_dim=best_params["head_hidden_dim"], + dropout=best_params["dropout"], + use_mpnn=use_mpnn, + mpnn_device=device.type, + ) + + if pretrain_state_dict is not None and pretrain_config is not None: + loaded = load_pretrain_weights_to_model( + model, pretrain_state_dict, best_params["d_model"], + pretrain_config, load_delivery_head + ) + if loaded: + logger.info(f"Loaded pretrain weights for outer fold {outer_fold}") + + train_result = train_fixed_epochs( + model=model, + train_loader=train_loader, + val_loader=None, + device=device, + lr=best_params["lr"], + weight_decay=best_params["weight_decay"], + epochs=epoch_mean, + class_weights=class_weights, + use_cosine_annealing=True, + backbone_lr_ratio=best_params.get("backbone_lr_ratio", 1.0), + ) + + model.load_state_dict(train_result["final_state"]) + model = model.to(device) + + config = { + "d_model": best_params["d_model"], + "num_heads": best_params["num_heads"], + "n_attn_layers": best_params["n_attn_layers"], + "fusion_strategy": best_params["fusion_strategy"], + "head_hidden_dim": best_params["head_hidden_dim"], + "dropout": best_params["dropout"], + "use_mpnn": use_mpnn, + } + + torch.save({ + "model_state_dict": train_result["final_state"], + "config": config, + "epoch_mean": epoch_mean, + "best_params": best_params, + }, fold_dir / "model.pt") + + with open(fold_dir / "history.json", "w") as f: + json.dump(train_result["history"], f, indent=2) + + # 在测试集上评估 + logger.info("Evaluating on outer test set...") + test_metrics = evaluate_on_test(model, test_loader, device) + + with open(fold_dir / "test_metrics.json", "w") as f: + json.dump(test_metrics, f, indent=2) + + logger.info(f"\nOuter Fold {outer_fold} Test Results:") + for task, metrics in test_metrics.items(): + if "rmse" in metrics: + logger.info(f" {task}: RMSE={metrics['rmse']:.4f}, R²={metrics['r2']:.4f}") + elif "accuracy" in metrics: + logger.info(f" {task}: Acc={metrics['accuracy']:.4f}, F1={metrics['f1']:.4f}") + elif "kl_divergence" in metrics: + logger.info(f" {task}: KL={metrics['kl_divergence']:.4f}, JS={metrics['js_divergence']:.4f}") + + return { + "fold": outer_fold, + "best_params": best_params, + "epoch_mean": epoch_mean, + "test_metrics": test_metrics, + } + + # ============ 主流程 ============ @app.command() @@ -525,6 +692,8 @@ def main( load_delivery_head: bool = False, # MPNN use_mpnn: bool = False, + # 并行 + parallel: bool = False, # 设备 device: str = "cuda" if torch.cuda.is_available() else "cpu", ): @@ -535,6 +704,7 @@ def main( 外层训练不使用 early-stopping,epoch 数使用内层 best trial 的 epoch_mean。 使用 --init-from-pretrain 从预训练 checkpoint 初始化模型权重。 + 使用 --parallel 同时运行所有外层 fold(需要足够 GPU 显存)。 """ if optuna is None: logger.error("Optuna not installed. Run: pip install optuna") @@ -579,166 +749,55 @@ def main( with open(run_dir / "strata_info.json", "w") as f: json.dump(strata_info, f, indent=2, default=str) - # 创建完整数据集 - full_dataset = LNPDataset(df) - n_samples = len(full_dataset) + # 创建完整数据集(仅用于获取样本数做 split) + n_samples = len(LNPDataset(df)) - # 外层 CV + # 外层 CV split outer_cv = StratifiedKFold( n_splits=n_outer_folds, shuffle=True, random_state=seed ) - outer_results = [] - + device_str = str(device) + fold_args = [] for outer_fold, (outer_train_idx, outer_test_idx) in enumerate( outer_cv.split(np.arange(n_samples), strata) ): - logger.info(f"\n{'='*60}") - logger.info(f"OUTER FOLD {outer_fold}") - logger.info(f"{'='*60}") - logger.info(f"Train: {len(outer_train_idx)}, Test: {len(outer_test_idx)}") - - fold_dir = run_dir / f"outer_fold_{outer_fold}" - fold_dir.mkdir(parents=True, exist_ok=True) - - # 保存 split indices - splits = { - "outer_train_idx": outer_train_idx.tolist(), - "outer_test_idx": outer_test_idx.tolist(), - } - with open(fold_dir / "splits.json", "w") as f: - json.dump(splits, f) - - # 内层 Optuna 调参 - logger.info(f"\nRunning inner Optuna with {n_trials} trials...") - study_path = fold_dir / "optuna_study.sqlite3" - - best_params, epoch_mean, study = run_inner_optuna( - full_dataset=full_dataset, - inner_train_indices=outer_train_idx, + fold_args.append(dict( + outer_fold=outer_fold, + outer_train_idx=outer_train_idx, + outer_test_idx=outer_test_idx, + df=df, strata=strata, - device=device, + fold_dir=run_dir / f"outer_fold_{outer_fold}", n_trials=n_trials, epochs_per_trial=epochs_per_trial, - patience=inner_patience, + inner_patience=inner_patience, batch_size=batch_size, n_inner_folds=n_inner_folds, use_mpnn=use_mpnn, - seed=seed + outer_fold, - study_path=study_path, + seed=seed, pretrain_state_dict=pretrain_state_dict, pretrain_config=pretrain_config, load_delivery_head=load_delivery_head, - ) + device_str=device_str, + )) + + if parallel: + import multiprocessing as mp + from concurrent.futures import ProcessPoolExecutor - # 保存最佳参数 - with open(fold_dir / "best_params.json", "w") as f: - json.dump(best_params, f, indent=2) - - with open(fold_dir / "epoch_mean.json", "w") as f: - json.dump({"epoch_mean": epoch_mean}, f) - - # 外层训练(使用最优超参,固定 epoch 数,不 early-stop) - logger.info(f"\nTraining outer fold with best params, epochs={epoch_mean}...") - - # 创建 DataLoader - train_subset = Subset(full_dataset, outer_train_idx.tolist()) - test_subset = Subset(full_dataset, outer_test_idx.tolist()) - - train_loader = DataLoader( - train_subset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn - ) - test_loader = DataLoader( - test_subset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn - ) - - # 计算类权重 - class_weights = compute_class_weights_from_loader(train_loader) - - # 创建模型 - model = create_model( - d_model=best_params["d_model"], - num_heads=best_params["num_heads"], - n_attn_layers=best_params["n_attn_layers"], - fusion_strategy=best_params["fusion_strategy"], - head_hidden_dim=best_params["head_hidden_dim"], - dropout=best_params["dropout"], - use_mpnn=use_mpnn, - mpnn_device=device.type, - ) - - # 加载预训练权重 - if pretrain_state_dict is not None and pretrain_config is not None: - loaded = load_pretrain_weights_to_model( - model, pretrain_state_dict, best_params["d_model"], - pretrain_config, load_delivery_head - ) - if loaded: - logger.info(f"Loaded pretrain weights for outer fold {outer_fold}") - - # 训练(固定 epoch,不 early-stop) - train_result = train_fixed_epochs( - model=model, - train_loader=train_loader, - val_loader=None, # 外层不用验证集 - device=device, - lr=best_params["lr"], - weight_decay=best_params["weight_decay"], - epochs=epoch_mean, - class_weights=class_weights, - use_cosine_annealing=True, - backbone_lr_ratio=best_params.get("backbone_lr_ratio", 1.0), - ) - - # 加载最终权重 - model.load_state_dict(train_result["final_state"]) - model = model.to(device) - - # 保存模型 - config = { - "d_model": best_params["d_model"], - "num_heads": best_params["num_heads"], - "n_attn_layers": best_params["n_attn_layers"], - "fusion_strategy": best_params["fusion_strategy"], - "head_hidden_dim": best_params["head_hidden_dim"], - "dropout": best_params["dropout"], - "use_mpnn": use_mpnn, - } - - torch.save({ - "model_state_dict": train_result["final_state"], - "config": config, - "epoch_mean": epoch_mean, - "best_params": best_params, - }, fold_dir / "model.pt") - - # 保存训练历史 - with open(fold_dir / "history.json", "w") as f: - json.dump(train_result["history"], f, indent=2) - - # 在测试集上评估 - logger.info("Evaluating on outer test set...") - test_metrics = evaluate_on_test(model, test_loader, device) - - with open(fold_dir / "test_metrics.json", "w") as f: - json.dump(test_metrics, f, indent=2) - - # 打印测试结果 - logger.info(f"\nOuter Fold {outer_fold} Test Results:") - for task, metrics in test_metrics.items(): - if "rmse" in metrics: - logger.info(f" {task}: RMSE={metrics['rmse']:.4f}, R²={metrics['r2']:.4f}") - elif "accuracy" in metrics: - logger.info(f" {task}: Acc={metrics['accuracy']:.4f}, F1={metrics['f1']:.4f}") - elif "kl_divergence" in metrics: - logger.info(f" {task}: KL={metrics['kl_divergence']:.4f}, JS={metrics['js_divergence']:.4f}") - - outer_results.append({ - "fold": outer_fold, - "best_params": best_params, - "epoch_mean": epoch_mean, - "test_metrics": test_metrics, - }) + ctx = mp.get_context("spawn") + logger.info(f"Running {n_outer_folds} outer folds in PARALLEL (spawn)") + with ProcessPoolExecutor(max_workers=n_outer_folds, mp_context=ctx) as executor: + futures = [executor.submit(_run_single_outer_fold, **args) for args in fold_args] + outer_results = [f.result() for f in futures] + outer_results.sort(key=lambda r: r["fold"]) + else: + logger.info(f"Running {n_outer_folds} outer folds SEQUENTIALLY") + outer_results = [] + for args in fold_args: + result = _run_single_outer_fold(**args) + outer_results.append(result) # 汇总结果 logger.info("\n" + "=" * 60)