前言
-
Optuna是一个自动超参数优化软件框架,专为机器学习而设计。Github项目地址 -
TPE(Tree-structured Parzen Estimator)是一种用于超参数优化的算法,它被广泛应用于机器学习模型中。TPE 算法基于贝叶斯优化技术,特别适用于高维空间中的优化问题,并且在许多情况下比网格搜索和随机搜索等传统方法更高效。
-
TPE算法主要的迭代过程为:
- 将历史评估结果分为两组:表现较好的一组称为“精英集”,表现较差的一组称为“非精英集”。
- 使用核密度估计(Kernel Density Estimation, KDE)为这两组建立概率分布模型
- 对于“精英集”,使用一个概率密度函数 L L L来建模。对于“非精英集”,使用另一个概率密度函数G来建模。
- 计算每个候选超参数被“精英集”模型 L L L与“非精英级”模型 G G G选中的概率比值,即似然比 r = L ( x ) G ( x ) r = \frac{L(x)}{G(x)} r=G(x)L(x)
- 选择较高似然比的超参数配置进行下一次评估。
-
关于贝叶斯优化参数的完整体系,大家可以参考通俗科普文:贝叶斯优化与SMBO、高斯过程回归、TPE
-
下面的所有注释代码以及IDE断点都可以在我的
Github项目Some-Paper-CN中下载。 -
在进行源码解析之前,大家需要先从
github将Optuna包的代码下载,然后将项目文件夹中的optuna文件夹单独拿出来,放入一个新的文件夹下,比如我的新文件夹名bayes,则项目文件树为:
-bayes-optuna-_gp-_hypervolume-artifacts-...
- 然后我们在项目文件夹下新建一个示例代码
demo_opt.py,则项目文件树为:
-bayes-optuna-_gp-_hypervolume-artifacts-...-demo_opt.py
- 在
pycharm中打开demo_opt.py文件,填入下面内容:
import optuna
from optuna.samplers import TPESamplerdef objective(trial):x = trial.suggest_float("x", -10, 10)return (x - 2) ** 2study = optuna.create_study(sampler=TPESampler(n_startup_trials=5))
study.optimize(objective, n_trials=10, n_jobs=1)print(study.best_params)
- 上面代码中,函数
objective()是需要优化的目标函数。一个trial对象对应目标函数的一次执行,在每次调用目标函数时在内部实例化,简单的说,trial包含了你在目标函数中想要优化的参数,比如objective()中就是x。suggest_float()方法用于在提供的范围内均匀选择参数。trial.suggest_float("x", -10, 10)表示x是一个下界为-10,上界为10的浮点型变量。 optuna.create_study用于创建一个优化试验,我们选择TPE采样器,一般情况下不需要单独传入TPESampler,因为默认情况下选用的就是TPE算法,主要是为了后面debug方便,减少TPE采样器的初始点数量,即n_startup_trials参数。n_trials表示优化迭代次数,n_jobs表示多线程并行数,这里为了debug方便,就使用单线程。- 最后打印出最佳参数
study.best_params。
[I 2024-07-26 09:35:12,767] A new study created in memory with name: no-name-450203ba-ce83-4faf-95dd-272018b1b4a0
[I 2024-07-26 09:35:12,768] Trial 0 finished with value: 26.552880606524642 and parameters: {'x': -3.1529487292738168}. Best is trial 0 with value: 26.552880606524642.
[I 2024-07-26 09:35:12,769] Trial 1 finished with value: 135.31943872421613 and parameters: {'x': -9.632688370459174}. Best is trial 0 with value: 26.552880606524642.
[I 2024-07-26 09:35:12,769] Trial 2 finished with value: 6.537924402961384 and parameters: {'x': 4.55693652697156}. Best is trial 2 with value: 6.537924402961384.
[I 2024-07-26 09:35:12,769] Trial 3 finished with value: 3.172219742815308 and parameters: {'x': 0.2189273617240346}. Best is trial 3 with value: 3.172219742815308.
[I 2024-07-26 09:35:12,769] Trial 4 finished with value: 7.88047052211213 and parameters: {'x': -0.8072175765537182}. Best is trial 3 with value: 3.172219742815308.
[I 2024-07-26 09:35:12,773] Trial 5 finished with value: 61.66533914051286 and parameters: {'x': 9.852728133617823}. Best is trial 3 with value: 3.172219742815308.
[I 2024-07-26 09:35:12,776] Trial 6 finished with value: 50.7648690183395 and parameters: {'x': -5.1249469484578976}. Best is trial 3 with value: 3.172219742815308.
[I 2024-07-26 09:35:12,780] Trial 7 finished with value: 1.2692823125247181 and parameters: {'x': 3.1266242996335194}. Best is trial 7 with value: 1.2692823125247181.
[I 2024-07-26 09:35:12,783] Trial 8 finished with value: 13.162920682323872 and parameters: {'x': 5.6280739631826515}. Best is trial 7 with value: 1.2692823125247181.
[I 2024-07-26 09:35:12,786] Trial 9 finished with value: 6.07077183734144 and parameters: {'x': 4.463893633528331}. Best is trial 7 with value: 1.2692823125247181.
{'x': 3.1266242996335194}
关键代码解析
- 确认
demo_opt.py文件运行没有问题后,我们先进入create_study()函数,按住Ctrl,点击create_study,跳转到optuna/study/study.py文件,create_study()函数:
def create_study(*,storage: str | storages.BaseStorage | None = None,sampler: "samplers.BaseSampler" | None = None,pruner: pruners.BasePruner | None = None,study_name: str | None = None,direction: str | StudyDirection | None = None,load_if_exists: bool = False,directions: Sequence[str | StudyDirection] | None = None,
) -> Study:...study_name = storage.get_study_name_from_id(study_id)study = Study(study_name=study_name, storage=storage, sampler=sampler, pruner=pruner)return study
- 该函数主要是对输入的参数进行错误检查,或者判定,因为我们只传入了
sampler参数,其他的参数均为默认,这里就不过多解释了,直接看最后两句。study_name用来记录get_study_name_from_id()方法生成的试验id,study接收了一个实例化后的Study,点击Study跳转到optuna/study/study.py文件,Study类,先看初始化方法。
def __init__(self,study_name: str,storage: str | storages.BaseStorage,sampler: "samplers.BaseSampler" | None = None,pruner: pruners.BasePruner | None = None,) -> None:self.study_name = study_namestorage = storages.get_storage(storage)study_id = storage.get_study_id_from_name(study_name)self._study_id = study_idself._storage = storageself._directions = storage.get_study_directions(study_id)# sampler默认为TPESamplerself.sampler = sampler or samplers.TPESampler()# 修剪器m默认为MedianPrunerself.pruner = pruner or pruners.MedianPruner()self._thread_local = _ThreadLocalStudyAttribute()self._stop_flag = False
- 从上面的初始化方法可以看到,当不传入
sampler参数时,默认sampler是TPESampler。我们回到demo_opt.py文件,按照代码执行顺序,点击optimize,跳转到optuna/study/study.py,Study类的optimize方法:
def optimize(self,func: ObjectiveFuncType,n_trials: int | None = None,timeout: float | None = None,n_jobs: int = 1,catch: Iterable[type[Exception]] | type[Exception] = (),callbacks: Iterable[Callable[[Study, FrozenTrial], None]] | None = None,gc_after_trial: bool = False,show_progress_bar: bool = False,) -> None:_optimize(study=self,func=func,n_trials=n_trials,timeout=timeout,n_jobs=n_jobs,catch=tuple(catch) if isinstance(catch, Iterable) else (catch,),callbacks=callbacks,gc_after_trial=gc_after_trial,show_progress_bar=show_progress_bar,)
- 可以看到
optimize方法内部调用了另外一个函数_optimize(),点击_optimize()函数,跳转到optuna/study/_optimize.py文件,_optimize()函数:
def _optimize(study: "optuna.Study",func: "optuna.study.study.ObjectiveFuncType",n_trials: int | None = None,timeout: float | None = None,n_jobs: int = 1,catch: tuple[type[Exception], ...] = (),callbacks: Iterable[Callable[["optuna.Study", FrozenTrial], None]] | None = None,gc_after_trial: bool = False,show_progress_bar: bool = False,
) -> None:...try:if n_jobs == 1:_optimize_sequential(study,func,n_trials,timeout,catch,callbacks,gc_after_trial,reseed_sampler_rng=False,time_start=None,progress_bar=progress_bar,)...
_optimize()函数前面依然对传入的参数进行了一系列检查,这里因为篇幅原因就不展示了,我们主要看函数中的try分支,因为在前面n_jobs参数设定的1,所以进入if分支(后面的else是多线程运行方式),点击_optimize_sequential()函数,跳转到optuna/study/_optimize.py文件,_optimize_sequential()函数
def _optimize_sequential(study: "optuna.Study",func: "optuna.study.study.ObjectiveFuncType",n_trials: int | None,timeout: float | None,catch: tuple[type[Exception], ...],callbacks: Iterable[Callable[["optuna.Study", FrozenTrial], None]] | None,gc_after_trial: bool,reseed_sampler_rng: bool,time_start: datetime.datetime | None,progress_bar: pbar_module._ProgressBar | None,
) -> None:study._thread_local.in_optimize_loop = True# 是否固定采样器的随机数种子if reseed_sampler_rng:study.sampler.reseed_rng()i_trial = 0if time_start is None:time_start = datetime.datetime.now()while True:# 优化停止标识符if study._stop_flag:breakif n_trials is not None:if i_trial >= n_trials:breaki_trial += 1if timeout is not None:elapsed_seconds = (datetime.datetime.now() - time_start).total_seconds()if elapsed_seconds >= timeout:breaktry:# 开始优化训练frozen_trial = _run_trial(study, func, catch)finally:if gc_after_trial:# 使用垃圾回收机制gc.collect()if callbacks is not None:for callback in callbacks:callback(study, frozen_trial)if progress_bar is not None:elapsed_seconds = (datetime.datetime.now() - time_start).total_seconds()progress_bar.update(elapsed_seconds, study)study._storage.remove_session()
- 可以看到
_optimize_sequential()函数在前面做了一些准备工作,包括保护主线程(study._thread_local.in_optimize_loop),固定随机数种子,记录开始时间。在try分支中开始启动的优化训练了,点击_rub_trial跳转到optuna/study/_optimize.py文件,_rub_trial()函数
def _run_trial(study: "optuna.Study",func: "optuna.study.study.ObjectiveFuncType",catch: tuple[type[Exception], ...],
) -> trial_module.FrozenTrial:# 检查记录存储器是否处于工作状态if is_heartbeat_enabled(study._storage):optuna.storages.fail_stale_trials(study)# 创建一组实验trial = study.ask()state: TrialState | None = Nonevalue_or_values: float | Sequence[float] | None = Nonefunc_err: Exception | KeyboardInterrupt | None = Nonefunc_err_fail_exc_info: Any | None = None# 记录器开始记录with get_heartbeat_thread(trial._trial_id, study._storage):try:value_or_values = func(trial)except exceptions.TrialPruned as e:state = TrialState.PRUNEDfunc_err = eexcept (Exception, KeyboardInterrupt) as e:state = TrialState.FAILfunc_err = efunc_err_fail_exc_info = sys.exc_info()try:# 记录实验参数和结果frozen_trial = _tell_with_warning(study=study,trial=trial,value_or_values=value_or_values,state=state,suppress_warning=True,)except Exception:frozen_trial = study._storage.get_trial(trial._trial_id)raisefinally:if frozen_trial.state == TrialState.COMPLETE:# 打印参数、目标函数结果、最佳结果等信息study._log_completed_trial(frozen_trial)...
_run_trial()函数的关键行为有:检查记录器是否处于活动状态,创建一组实验,记录器在优化过程中记录结果,在记录器记录块中,可以看到value_or_values = func(trial),func是传入的目标函数,通过传入参数trial得到了结果,说明细节是在目标函数内部完成的。此时回到demo_opt.py文件,objective()函数
def objective(trial):x = trial.suggest_float("x", -10, 10)return (x - 2) ** 2
- 可以看到,
x的值是通过trial.suggest_float()给出,点击suggest_float,跳转到optuna/trial/_trial.py文件,Trial类中的suggest_float()函数
def suggest_float(self,name: str,low: float,high: float,*,step: float | None = None,log: bool = False,) -> float:# 创建分布器distribution = FloatDistribution(low, high, log=log, step=step)# 在分布器中取数值suggested_value = self._suggest(name, distribution)self._check_distribution(name, distribution)return suggested_value
- 可以看到,该函数先通过
FloatDistribution类实例化了一个浮点数分布器,这里有关FloatDistribution类就不多解释,大家感兴趣可以跳转到具体实现看看。然后通过_suggest()方法在创建的浮点分布器中进行取值,这个_suggest()方法就很重要了,点击_suggest跳转到optuna/trial/_trial.py文件,Trial类中的_suggest()方法
def _suggest(self, name: str, distribution: BaseDistribution) -> Any:storage = self.storagetrial_id = self._trial_idtrial = self._get_latest_trial()if name in trial.distributions:distributions.check_distribution_compatibility(trial.distributions[name], distribution)param_value = trial.params[name]else:if self._is_fixed_param(name, distribution):param_value = self._fixed_params[name]elif distribution.single():param_value = distributions._get_single_value(distribution)elif self._is_relative_param(name, distribution):param_value = self.relative_params[name]else:study = pruners._filter_study(self.study, trial)# 对给定的分布进行采样param_value = self.study.sampler.sample_independent(study, trial, name, distribution)# 检验`param_value`是否为空值param_value_in_internal_repr = distribution.to_internal_repr(param_value)storage.set_trial_param(trial_id, name, param_value_in_internal_repr, distribution)self._cached_frozen_trial.distributions[name] = distributionself._cached_frozen_trial.params[name] = param_valuereturn param_value
_suggest()方法的关键行为在于通过一系列对分布的判断,对给定的分布进行合理的采样,大家主要关注param_value = self.study.sampler.sample_independent(),前面我们已经传入了sampler为TPESampler,所以跳转到optuna/samplers/_tpe/sampler.py文件,TPESampler类中的sample_independent()方法:
def sample_independent(self,study: Study,trial: FrozenTrial,param_name: str,param_distribution: BaseDistribution,) -> Any:states = (TrialState.COMPLETE, TrialState.PRUNED)trials = study._get_trials(deepcopy=False, states=states, use_cache=True)# 如果样本数量不足,则进行随机试验。if len(trials) < self._n_startup_trials:# 在给定分布中进行随机采样return self._random_sampler.sample_independent(study, trial, param_name, param_distribution)...return self._sample(study, trial, {param_name: param_distribution})[param_name]
- 可以看到,在
sample_independent()方法中,若初始样本数量不足,则进行随机试验,初始样本个数的阈值是由_n_startup_trials决定的,因为在debug过程中要循环多次,所以我在demo_opt.py文件中设定了_n_startup_trials=5。 - 当初始样本满足阈值时,程序执行到最后一行,即
_sample()方法,点击_sample跳转到optuna/samplers/_tpe/sampler.py文件,TPESampler类中的_sample()方法
def _sample(self, study: Study, trial: FrozenTrial, search_space: dict[str, BaseDistribution]) -> dict[str, Any]:if self._constant_liar:states = [TrialState.COMPLETE, TrialState.PRUNED, TrialState.RUNNING]else:states = [TrialState.COMPLETE, TrialState.PRUNED]use_cache = not self._constant_liartrials = study._get_trials(deepcopy=False, states=states, use_cache=use_cache)n = sum(trial.state != TrialState.RUNNING for trial in trials) below_trials, above_trials = _split_trials(study,trials,self._gamma(n),self._constraints_func is not None,)# 在below_trials数据上创建Parzen估计器mpe_below = self._build_parzen_estimator(study, search_space, below_trials, handle_below=True)# 在above_trials数据上创建Parzen估计器mpe_above = self._build_parzen_estimator(study, search_space, above_trials, handle_below=False)# 从below_trials数据上创建Parzen估计器中采集_n_ei_candidates个样本samples_below = mpe_below.sample(self._rng.rng, self._n_ei_candidates)acq_func_vals = self._compute_acquisition_func(samples_below, mpe_below, mpe_above)ret = TPESampler._compare(samples_below, acq_func_vals)for param_name, dist in search_space.items():ret[param_name] = dist.to_external_repr(ret[param_name])return ret
_sample()方法非常非常重要,这里大家一定要仔细理解,除去开头的运行状态记录器,按照代码执行顺序,先看一看_split_trials()函数,跳转到optuna/samplers/_tpe/sampler.py文件,_split_trials()函数
def _split_trials(study: Study, trials: list[FrozenTrial], n_below: int, constraints_enabled: bool
) -> tuple[list[FrozenTrial], list[FrozenTrial]]:complete_trials = []pruned_trials = []running_trials = []infeasible_trials = []for trial in trials:if trial.state == TrialState.RUNNING:running_trials.append(trial)elif constraints_enabled and _get_infeasible_trial_score(trial) > 0:infeasible_trials.append(trial)elif trial.state == TrialState.COMPLETE:# 将完成的实验添加到complete_trials列表中complete_trials.append(trial)elif trial.state == TrialState.PRUNED:pruned_trials.append(trial)else:assert False# 将数据分为below和abovebelow_complete, above_complete = _split_complete_trials(complete_trials, study, n_below)# 确保`n_below`为非负数,防止意外的试验拆分n_below = max(0, n_below - len(below_complete))below_pruned, above_pruned = _split_pruned_trials(pruned_trials, study, n_below)# 确保`n_below`为非负数,防止意外的试验拆分n_below = max(0, n_below - len(below_pruned))below_infeasible, above_infeasible = _split_infeasible_trials(infeasible_trials, n_below)below_trials = below_complete + below_pruned + below_infeasibleabove_trials = above_complete + above_pruned + above_infeasible + running_trials# 按照trial.number对below_trials进行升序排列below_trials.sort(key=lambda trial: trial.number)# 按照trial.number对above_trials进行升序排列above_trials.sort(key=lambda trial: trial.number)return below_trials, above_trials
- 可以看到,该函数先初始化了4个列表,用于存储不同类型的试验记录,对于在
demo_opt.py中的例子,只有complete_trials,没有pruned_trials、running_trials、infeasible_trials,所以它们都为空列表,那重点需要关注的就是_split_complete_trials()函数了,点击_split_complete_trials跳转到optuna/samplers/_tpe/sampler.py文件,_split_complete_trials()函数:
def _split_complete_trials(trials: Sequence[FrozenTrial], study: Study, n_below: int
) -> tuple[list[FrozenTrial], list[FrozenTrial]]:# 获取below值n_below = min(n_below, len(trials))# 若为单目标优化if len(study.directions) <= 1:return _split_complete_trials_single_objective(trials, study, n_below)else:return _split_complete_trials_multi_objective(trials, study, n_below)
- 可以看到
_split_complete_trials()函数先通过比较获取了n_below的值,因为仅有一个目标函数,所以为单目标优化,进入if分支,返回_split_complete_trials_single_objective()函数结果,点击_split_complete_trials_single_objective,跳转到optuna/samplers/_tpe/sampler.py文件,_split_complete_trials_single_objective()函数
def _split_complete_trials_single_objective(trials: Sequence[FrozenTrial], study: Study, n_below: int
) -> tuple[list[FrozenTrial], list[FrozenTrial]]:if study.direction == StudyDirection.MINIMIZE:# 将trial.value转换为浮点数类型,并以trial.value并进行升序排列sorted_trials = sorted(trials, key=lambda trial: cast(float, trial.value))else:sorted_trials = sorted(trials, key=lambda trial: cast(float, trial.value), reverse=True)# 返回前n_below个trials和n_below后个trialsreturn sorted_trials[:n_below], sorted_trials[n_below:]
- 可以看到,
_split_complete_trials_single_objective()函数的关键行为就是将试验的值按照升序排列,返回前n_below个trials(即:精英集)和n_below后个trials(即:非精英集)。 - 我们回到上层方法,
optuna/samplers/_tpe/sampler.py文件,_split_trials()函数,below_complete,above_complete分别接收了精英集、非精英集。因为pruned_trials、running_trials、infeasible_trials都是空列表,所以后面的代码就不作过多说明了,继续回到上层方法,optuna/samplers/_tpe/sampler.py文件,TPESampler类中的_sample()方法。 - 下一步,
_sample()方法分别在below_complete,above_complete上建立了Parzen估计器,调用了同一种方法_build_parzen_estimator(),点击_build_parzen_estimator,跳转到optuna/samplers/_tpe/sampler.py文件,TPESampler类中的_build_parzen_estimator()方法,
def _build_parzen_estimator(self,study: Study,search_space: dict[str, BaseDistribution],trials: list[FrozenTrial],handle_below: bool,) -> _ParzenEstimator:observations = self._get_internal_repr(trials, search_space)if handle_below and study._is_multi_objective():...else:mpe = self._parzen_estimator_cls(observations, search_space, self._parzen_estimator_parameters)if not isinstance(mpe, _ParzenEstimator):raise RuntimeError("_parzen_estimator_cls must override _ParzenEstimator.")return mpe
- 前面说过了,对于
demo_opt.py文件中的例子,是一个单目标优化问题,所以_build_parzen_estimator()函数中的if分支不用看,直接看else,在else分支中,调用了_parzen_estimator_cls()方法,点击_parzen_estimator_cls跳转到optuna/samplers/_tpe/parzen_estimator.py文件,_ParzenEstimator类中的初始化方法:
class _ParzenEstimator:def __init__(self,observations: dict[str, np.ndarray],search_space: dict[str, BaseDistribution],parameters: _ParzenEstimatorParameters,predetermined_weights: np.ndarray | None = None,) -> None:if parameters.consider_prior:if parameters.prior_weight is None:raise ValueError("Prior weight must be specified when consider_prior==True.")elif parameters.prior_weight <= 0:raise ValueError("Prior weight must be positive.")self._search_space = search_spacetransformed_observations = self._transform(observations)assert predetermined_weights is None or len(transformed_observations) == len(predetermined_weights)weights = (predetermined_weightsif predetermined_weights is not Noneelse self._call_weights_func(parameters.weights, len(transformed_observations)))if len(transformed_observations) == 0:weights = np.array([1.0])elif parameters.consider_prior:assert parameters.prior_weight is not Noneweights = np.append(weights, [parameters.prior_weight])# 均分权重weights /= weights.sum()# 混合分布中采样数据(示例中全为FloatDistribution)self._mixture_distribution = _MixtureOfProductDistribution(weights=weights,distributions=[self._calculate_distributions(transformed_observations[:, i], param, search_space[param], parameters)for i, param in enumerate(search_space)],)
- 可以看到,初始化方法先对权重进行了初始化,然后将权重(
weights)和分布(distributions)传入了_MixtureOfProductDistribution类中,在看_MixtureOfProductDistribution类前,先看一下传入的distributions参数来源于_calculate_distributions()方法,点击_calculate_distributions跳转到optuna/samplers/_tpe/parzen_estimator.py文件,_ParzenEstimator类中的_calculate_distributions()方法:
def _calculate_distributions(self,transformed_observations: np.ndarray,param_name: str,search_space: BaseDistribution,parameters: _ParzenEstimatorParameters,) -> _BatchedDistributions:if isinstance(search_space, CategoricalDistribution):return self._calculate_categorical_distributions(transformed_observations, param_name, search_space, parameters)else:assert isinstance(search_space, (FloatDistribution, IntDistribution))if search_space.log:low = np.log(search_space.low)high = np.log(search_space.high)else:low = search_space.lowhigh = search_space.highstep = search_space.stepif step is not None and search_space.log:low = np.log(search_space.low - step / 2)high = np.log(search_space.high + step / 2)step = Nonereturn self._calculate_numerical_distributions(transformed_observations, low, high, step, parameters)
- 因为目标函数变量
x是浮点数分布FloatDistribution,所以第1个if分支,应该走else,又因为我们没有传入log参数为True(对数浮点分布),所以第2个if分支,应该走else,得到low和high变量,方法最后返回的是_calculate_numerical_distributions()方法的结果,我们继续点击_calculate_numerical_distributions,跳转到optuna/samplers/_tpe/parzen_estimator.py文件,_ParzenEstimator类中的_calculate_numerical_distributions()方法
def _calculate_numerical_distributions(self,observations: np.ndarray,low: float,high: float,step: float | None,parameters: _ParzenEstimatorParameters,) -> _BatchedDistributions:step_or_0 = step or 0mus = observations# 是否考虑先验consider_prior = parameters.consider_prior or len(observations) == 0def compute_sigmas() -> np.ndarray:if parameters.multivariate:SIGMA0_MAGNITUDE = 0.2sigma = (SIGMA0_MAGNITUDE* max(len(observations), 1) ** (-1.0 / (len(self._search_space) + 4))* (high - low + step_or_0))sigmas = np.full(shape=(len(observations),), fill_value=sigma)else:# TODO(contramundum53): Remove dependency on prior_mu# 先验均值,取搜索范围的中点prior_mu = 0.5 * (low + high)# 如果考虑先验,则添加prior_mu到mus的末尾mus_with_prior = np.append(mus, prior_mu) if consider_prior else mus# 获取按mus_with_prior排序的索引sorted_indices = np.argsort(mus_with_prior)# 根据排序后的索引获取排序后的mus值sorted_mus = mus_with_prior[sorted_indices]# 创建一个比mus_with_prior长2的位置来存储带端点的排序mussorted_mus_with_endpoints = np.empty(len(mus_with_prior) + 2, dtype=float)# 设置第一个位置为搜索区间的下界减半步长sorted_mus_with_endpoints[0] = low - step_or_0 / 2# 中间位置填入排序后的mussorted_mus_with_endpoints[1:-1] = sorted_mus# 最后一个位置为搜索区间的上界加半步长sorted_mus_with_endpoints[-1] = high + step_or_0 / 2# 计算相邻mus之间的最大距离作为初步sigma估计sorted_sigmas = np.maximum(sorted_mus_with_endpoints[1:-1] - sorted_mus_with_endpoints[0:-2],sorted_mus_with_endpoints[2:] - sorted_mus_with_endpoints[1:-1],)# 如果不考虑端点且有足够的点if not parameters.consider_endpoints and sorted_mus_with_endpoints.shape[0] >= 4:# 第一个sigma设置为第三和第二个点的距离sorted_sigmas[0] = sorted_mus_with_endpoints[2] - sorted_mus_with_endpoints[1]# 最后一个sigma设置为倒数第二和倒数第三个点的距离sorted_sigmas[-1] = (sorted_mus_with_endpoints[-2] - sorted_mus_with_endpoints[-3])# 重新排序sigmas并裁剪到观测值的数量sigmas = sorted_sigmas[np.argsort(sorted_indices)][: len(observations)]# We adjust the range of the 'sigmas' according to the 'consider_magic_clip' flag.# 根据consider_magic_clip标志调整sigmas的范围# 最大sigma,基于搜索范围maxsigma = 1.0 * (high - low + step_or_0)# 如果考虑magic clipif parameters.consider_magic_clip:# TODO(contramundum53): Remove dependency of minsigma on consider_prior.# 最小sigma,基于观测值数量和是否考虑先验minsigma = (1.0* (high - low + step_or_0)/ min(100.0, (1.0 + len(observations) + consider_prior)))else:# 如果不考虑magic clip,则最小sigma为一个非常小的数EPSminsigma = EPS# 返回限制在minsigma和maxsigma之间的sigmas数组return np.asarray(np.clip(sigmas, minsigma, maxsigma))sigmas = compute_sigmas()if consider_prior:prior_mu = 0.5 * (low + high)prior_sigma = 1.0 * (high - low + step_or_0)mus = np.append(mus, [prior_mu])sigmas = np.append(sigmas, [prior_sigma])if step is None:return _BatchedTruncNormDistributions(mus, sigmas, low, high)else:return _BatchedDiscreteTruncNormDistributions(mus, sigmas, low, high, step)
_calculate_numerical_distributions()方法的关键行为是估计样本集的均值和方差,具体的过程细节我已经在上面的代码中逐行注释了,最后因为在上层方法(optuna/samplers/_tpe/parzen_estimator.py文件,_ParzenEstimator类中的_calculate_distributions())调用_calculate_numerical_distributions()方法时,没有传入step参数,所以step是None,返回_BatchedTruncNormDistributions类,点击_BatchedTruncNormDistributions,跳转到optuna/samplers/_tpe/probability_distributions.py文件,_BatchedTruncNormDistributions类:
class _BatchedTruncNormDistributions(NamedTuple):mu: np.ndarraysigma: np.ndarraylow: floathigh: float
- 可以看到,该类只是起到了一个记录数据的作用。那现在我们回到上层函数,
optuna/samplers/_tpe/parzen_estimator.py文件,_ParzenEstimator类中的初始化方法。 - 现在我们可以点击
_MixtureOfProductDistribution,跳转到optuna/samplers/_tpe/probability_distributions.py文件,_MixtureOfProductDistribution类:
class _MixtureOfProductDistribution(NamedTuple):weights: np.ndarraydistributions: list[_BatchedDistributions]
- 发现
optuna/samplers/_tpe/parzen_estimator.py文件,_ParzenEstimator类中的初始化方法,self._mixture_distribution实例化了_MixtureOfProductDistribution类,没什么太多讲的。继续回到上层方法,optuna/samplers/_tpe/sampler.py文件,TPESampler类中的_build_parzen_estimator()方法:
def _build_parzen_estimator(self,study: Study,search_space: dict[str, BaseDistribution],trials: list[FrozenTrial],handle_below: bool,) -> _ParzenEstimator:observations = self._get_internal_repr(trials, search_space)if handle_below and study._is_multi_objective():...else:mpe = self._parzen_estimator_cls(observations, search_space, self._parzen_estimator_parameters)if not isinstance(mpe, _ParzenEstimator):raise RuntimeError("_parzen_estimator_cls must override _ParzenEstimator.")return mpe
mpe接收到_parzen_estimator_cls()返回的值后,被_build_parzen_estimator()方法返回,回到上层函数,optuna/samplers/_tpe/sampler.py文件,TPESampler类中的_sample()方法
def _sample(self, study: Study, trial: FrozenTrial, search_space: dict[str, BaseDistribution]) -> dict[str, Any]:if self._constant_liar:states = [TrialState.COMPLETE, TrialState.PRUNED, TrialState.RUNNING]else:states = [TrialState.COMPLETE, TrialState.PRUNED]use_cache = not self._constant_liartrials = study._get_trials(deepcopy=False, states=states, use_cache=use_cache)n = sum(trial.state != TrialState.RUNNING for trial in trials) below_trials, above_trials = _split_trials(study,trials,self._gamma(n),self._constraints_func is not None,)# 在below_trials数据上创建Parzen估计器mpe_below = self._build_parzen_estimator(study, search_space, below_trials, handle_below=True)# 在above_trials数据上创建Parzen估计器mpe_above = self._build_parzen_estimator(study, search_space, above_trials, handle_below=False)# 从below_trials数据上创建Parzen估计器中采集_n_ei_candidates个样本samples_below = mpe_below.sample(self._rng.rng, self._n_ei_candidates)acq_func_vals = self._compute_acquisition_func(samples_below, mpe_below, mpe_above)ret = TPESampler._compare(samples_below, acq_func_vals)for param_name, dist in search_space.items():ret[param_name] = dist.to_external_repr(ret[param_name])return ret
mpe_below和mpe_above接收到_build_parzen_estimator()方法返回的Parzen估计器后,就开始采集样本了,mpe_below.sample从精英集训练的Parzen估计期中采集_n_ei_candidates个样本,点击sample跳转到optuna/samplers/_tpe/parzen_estimator.py文件,_ParzenEstimator类中的sample()方法:
def sample(self, rng: np.random.RandomState, size: int) -> dict[str, np.ndarray]:# 从截断的正态分布中抽样sampled = self._mixture_distribution.sample(rng, size)return self._untransform(sampled)
- 可以看到,
sample()方法内部调用了_mixture_distribution.sample方法,点击sample,跳转到optuna/samplers/_tpe/probability_distributions.py文件,_MixtureOfProductDistribution类中的sample()方法:
def sample(self, rng: np.random.RandomState, batch_size: int) -> np.ndarray:# 根据权重选择batch_size个索引,表示要从哪些分布中采样active_indices = rng.choice(len(self.weights), p=self.weights, size=batch_size)# 创建一个空数组ret,用于存储采样结果ret = np.empty((batch_size, len(self.distributions)), dtype=np.float64)for i, d in enumerate(self.distributions):if isinstance(d, _BatchedCategoricalDistributions):...elif isinstance(d, _BatchedTruncNormDistributions):# 取均值active_mus = d.mu[active_indices]# 取方差active_sigmas = d.sigma[active_indices]# 使用_truncnorm生成截断正态分布的样本,并存入结果数组ret[:, i] = _truncnorm.rvs(a=(d.low - active_mus) / active_sigmas,b=(d.high - active_mus) / active_sigmas,loc=active_mus,scale=active_sigmas,random_state=rng,)elif isinstance(d, _BatchedDiscreteTruncNormDistributions):...else:assert Falsereturn ret
- 因为在上面
optuna/samplers/_tpe/parzen_estimator.py文件,_ParzenEstimator类中的_calculate_numerical_distributions()方法中,我们知道返回的是一个_BatchedTruncNormDistributions类,所以运行elif isinstance(d, _BatchedTruncNormDistributions)分支,先取了均值和方差,然后使用_truncnorm中的rvs函数生成截断的正态分布样本,并存入结果数组。点击rvs跳转到optuna/samplers/_tpe/_truncnorm.py文件,rvs()函数:
def rvs(a: np.ndarray,b: np.ndarray,loc: np.ndarray | float = 0,scale: np.ndarray | float = 1,random_state: np.random.RandomState | None = None,
) -> np.ndarray:random_state = random_state or np.random.RandomState()size = np.broadcast(a, b, loc, scale).shapepercentiles = random_state.uniform(low=0, high=1, size=size)return ppf(percentiles, a, b) * scale + loc
rvs()函数最后返回还调用了ppf()函数,ppf()函数就在rvs()函数上方:
def ppf(q: np.ndarray, a: np.ndarray | float, b: np.ndarray | float) -> np.ndarray:q, a, b = np.atleast_1d(q, a, b)q, a, b = np.broadcast_arrays(q, a, b)case_left = a < 0case_right = ~case_leftdef ppf_left(q: np.ndarray, a: np.ndarray, b: np.ndarray) -> np.ndarray:log_Phi_x = _log_sum(_log_ndtr(a), np.log(q) + _log_gauss_mass(a, b))return _ndtri_exp(log_Phi_x)def ppf_right(q: np.ndarray, a: np.ndarray, b: np.ndarray) -> np.ndarray:log_Phi_x = _log_sum(_log_ndtr(-b), np.log1p(-q) + _log_gauss_mass(a, b))return -_ndtri_exp(log_Phi_x)out = np.empty_like(q)q_left = q[case_left]q_right = q[case_right]if q_left.size:out[case_left] = ppf_left(q_left, a[case_left], b[case_left])if q_right.size:out[case_right] = ppf_right(q_right, a[case_right], b[case_right])out[q == 0] = a[q == 0]out[q == 1] = b[q == 1]out[a == b] = math.nanreturn out
- 回到上层方法,
optuna/samplers/_tpe/parzen_estimator.py文件,_ParzenEstimator类中的sample()方法:
def sample(self, rng: np.random.RandomState, size: int) -> dict[str, np.ndarray]:# 从截断的正态分布中抽样sampled = self._mixture_distribution.sample(rng, size)return self._untransform(sampled)
sampled拿到抽样的值后,经过_untransform()方法返回结果,点击_untransform,跳转到optuna/samplers/_tpe/parzen_estimator.py文件,_ParzenEstimator类中的_untransform()方法
def _untransform(self, samples_array: np.ndarray) -> dict[str, np.ndarray]:res = {param: (np.exp(samples_array[:, i])if self._is_log(self._search_space[param])else samples_array[:, i])for i, param in enumerate(self._search_space)}return {param: (np.clip(dist.low + np.round((res[param] - dist.low) / dist.step) * dist.step,dist.low,dist.high,)if isinstance(dist, IntDistribution)else res[param])for (param, dist) in self._search_space.items()}
- 该函数的作用是对经过某种变换(如对数变换)的超参数样本进行逆变换,以便将这些样本转换回原始的超参数空间中,但在示例中我们不涉及到浮点对数空间,所以对原值不会有影响。回到上层方法,
optuna/samplers/_tpe/sampler.py文件,TPESampler类中的_sample()方法:
def _sample(self, study: Study, trial: FrozenTrial, search_space: dict[str, BaseDistribution]) -> dict[str, Any]:if self._constant_liar:states = [TrialState.COMPLETE, TrialState.PRUNED, TrialState.RUNNING]else:states = [TrialState.COMPLETE, TrialState.PRUNED]use_cache = not self._constant_liartrials = study._get_trials(deepcopy=False, states=states, use_cache=use_cache)# We divide data into below and above.n = sum(trial.state != TrialState.RUNNING for trial in trials) # Ignore running trials.below_trials, above_trials = _split_trials(study,trials,self._gamma(n),self._constraints_func is not None,)# 在below_trials数据上创建Parzen估计器mpe_below = self._build_parzen_estimator(study, search_space, below_trials, handle_below=True)# 在above_trials数据上创建Parzen估计器mpe_above = self._build_parzen_estimator(study, search_space, above_trials, handle_below=False)# 从below_trials数据上创建Parzen估计器中采集_n_ei_candidates个样本samples_below = mpe_below.sample(self._rng.rng, self._n_ei_candidates)acq_func_vals = self._compute_acquisition_func(samples_below, mpe_below, mpe_above)ret = TPESampler._compare(samples_below, acq_func_vals)for param_name, dist in search_space.items():ret[param_name] = dist.to_external_repr(ret[param_name])return ret
samples_below获取了Parzen估计器的采样结果,按照程序执行顺序,点击_compute_acquisition_func,跳转到optuna/samplers/_tpe/sampler.py文件,TPESampler类中的_compute_acquisition_func()方法:
def _compute_acquisition_func(self,samples: dict[str, np.ndarray],mpe_below: _ParzenEstimator,mpe_above: _ParzenEstimator,) -> np.ndarray:log_likelihoods_below = mpe_below.log_pdf(samples)log_likelihoods_above = mpe_above.log_pdf(samples)acq_func_vals = log_likelihoods_below - log_likelihoods_abovereturn acq_func_vals
- 可以看到,该函数计算了从
Parzen估计器的采样的点在mpe_below和mpe_aboveParzen估计器的对数概率密度,点击log_pdf,跳转到optuna/samplers/_tpe/parzen_estimator.py文件,_ParzemEstimator类中的log_pdf()方法:
def log_pdf(self, samples_dict: dict[str, np.ndarray]) -> np.ndarray:transformed_samples = self._transform(samples_dict)return self._mixture_distribution.log_pdf(transformed_samples)
- 先看看
_transform()做了什么,点击_transform,跳转到optuna/samplers/_tpe/parzen_estimator.py文件_ParzemEstimator类中的_transform()方法:
def _transform(self, samples_dict: dict[str, np.ndarray]) -> np.ndarray:return np.array([(np.log(samples_dict[param])if self._is_log(self._search_space[param])else samples_dict[param])for param in self._search_space]).T
- 大家可以理解为该方法对数据进行了结构化整理,回到上层方法,
optuna/samplers/_tpe/parzen_estimator.py文件,_ParzemEstimator类中的log_pdf()方法,发现该方法在返回结果前调用了_mixture_distribution.log_pdf(),点击log_pdf跳转到optuna/samplers/_tpe/probability_distributions.py文件,_MixtureOfProductDistribution类中的log_pdf()方法:
# 计算输入x在混合分布中的对数概率密度def log_pdf(self, x: np.ndarray) -> np.ndarray:# 获取输入x的形状,即样本数量和变量数量batch_size, n_vars = x.shape# 初始化一个空的数组,用于存放各变量在各分布下的对数概率密度log_pdfs = np.empty((batch_size, len(self.weights), n_vars), dtype=np.float64)for i, d in enumerate(self.distributions):xi = x[:, i]if isinstance(d, _BatchedCategoricalDistributions):...elif isinstance(d, _BatchedTruncNormDistributions):# 计算每个样本在当前子分布下的对数概率log_pdfs[:, :, i] = _truncnorm.logpdf(x=xi[:, None],a=(d.low - d.mu[None, :]) / d.sigma[None, :],b=(d.high - d.mu[None, :]) / d.sigma[None, :],loc=d.mu[None, :],scale=d.sigma[None, :],)elif isinstance(d, _BatchedDiscreteTruncNormDistributions):...else:assert Falseweighted_log_pdf = np.sum(log_pdfs, axis=-1) + np.log(self.weights[None, :])max_ = weighted_log_pdf.max(axis=1)max_[np.isneginf(max_)] = 0with np.errstate(divide="ignore"): return np.log(np.exp(weighted_log_pdf - max_[:, None]).sum(axis=1)) + max_
- 该方法我们依然只看
elif isinstance(d, _BatchedTruncNormDistributions):分支下的代码,发现调用了_truncnorm中的logpdf()方法,点击logpdf,进入optuna/samplers/_tpe/_truncnorm.py文件,logpdf()方法:
def logpdf(x: np.ndarray,a: np.ndarray | float,b: np.ndarray | float,loc: np.ndarray | float = 0,scale: np.ndarray | float = 1,
) -> np.ndarray:x = (x - loc) / scalex, a, b = np.atleast_1d(x, a, b)out = _norm_logpdf(x) - _log_gauss_mass(a, b) - np.log(scale)x, a, b = np.broadcast_arrays(x, a, b)out[(x < a) | (b < x)] = -np.infout[a == b] = math.nanreturn out
- 回到上层方法,
optuna/samplers/_tpe/sampler.py文件,TPESampler类中的_sample()方法:
def _sample(self, study: Study, trial: FrozenTrial, search_space: dict[str, BaseDistribution]) -> dict[str, Any]:if self._constant_liar:states = [TrialState.COMPLETE, TrialState.PRUNED, TrialState.RUNNING]else:states = [TrialState.COMPLETE, TrialState.PRUNED]use_cache = not self._constant_liartrials = study._get_trials(deepcopy=False, states=states, use_cache=use_cache)# We divide data into below and above.n = sum(trial.state != TrialState.RUNNING for trial in trials) # Ignore running trials.below_trials, above_trials = _split_trials(study,trials,self._gamma(n),self._constraints_func is not None,)# 在below_trials数据上创建Parzen估计器mpe_below = self._build_parzen_estimator(study, search_space, below_trials, handle_below=True)# 在above_trials数据上创建Parzen估计器mpe_above = self._build_parzen_estimator(study, search_space, above_trials, handle_below=False)# 从below_trials数据上创建Parzen估计器中采集_n_ei_candidates个样本samples_below = mpe_below.sample(self._rng.rng, self._n_ei_candidates)acq_func_vals = self._compute_acquisition_func(samples_below, mpe_below, mpe_above)ret = TPESampler._compare(samples_below, acq_func_vals)for param_name, dist in search_space.items():ret[param_name] = dist.to_external_repr(ret[param_name])return ret
- 按照代码执行顺序,接下来点击
_compare,跳转到optuna/samplers/_tpe/sampler.py文件,TPESampler类中的_compare()方法:
def _compare(cls, samples: dict[str, np.ndarray], acquisition_func_vals: np.ndarray) -> dict[str, int | float]:sample_size = next(iter(samples.values())).sizeif sample_size == 0:raise ValueError(f"The size of `samples` must be positive, but got {sample_size}.")if sample_size != acquisition_func_vals.size:raise ValueError("The sizes of `samples` and `acquisition_func_vals` must be same, but got ""(samples.size, acquisition_func_vals.size) = "f"({sample_size}, {acquisition_func_vals.size}).")best_idx = np.argmax(acquisition_func_vals)return {k: v[best_idx].item() for k, v in samples.items()}
- 除开数据格式、维度检查外,大家可以简单理解成取了
acquisition_func_vals中最大值的下标,然后从samples中取了出来,构成了一个字典 - 回到上层方法,
optuna/samplers/_tpe/sampler.py文件,TPESampler类中的_sample()方法,返回结果字典,回到上层方法optuna/samplers/_tpe/sampler.py文件,TPESampler类中的sample_independent()方法的return行,继续回到上层方法optuna/trial/_trial.py文件,Trial类中的_suggest()方法的return行,回到最上层,demo_opt.py文件中的objective函数,x变量通过trial.suggest_float()得到了优化后可能的最优点,通过return进行返回,完成了一次优化迭代过程。