Trainer
更新时间:2021-05-13
基类定义
Trainer的基类定义在./wenxin/common/controler.so中。
....
class BaseTrainer(object):
"""BaseTrainer"""
def __init__(self, params, data_set_reader, model_class):
"""
1.运行环境初始化 2.program初始化 3.计算图网络导入 4.模型参数导入 5.运行(reader) 6.模型导出
:param params: 运行的基本参数设置
:param data_set_reader: 运行的基本参数设置
:param model_class: 使用的是哪个model
"""
self.init_env()
self.build_executor()
self.init_net()
if self.params["load_checkpoint"] or self.params["load_parameters"]:
self.load_model_params("net_model")
elif self.params["pre_train_model"]:
self.load_model_params("pre_train_model")
self.meta_dict["is_encryption"] = self.need_encrypt
self.executor_run()
....
def init_env(self):
"""
运行环境初始化,基类已经实现,用户不需要自定义
"""
....
def init_net(self):
"""
初始化网络结构,基类已经实现,用户不需要自定义
"""
....
def run(self, phase, need_fetch=True, return_numpy=True, fetch_list=None, need_feed=False, feed_dict=None):
"""
运行网络结构,动态获取运行结果,基类已经实现,用户不需要自定义
:param phase:
:param need_fetch
:param return_numpy
:return:
"""
pass
....
def train_and_eval(self):
"""
启动模型训练,训练过程中可以进行模型评估,需要用户自定义实现,文心目前已经提供了通用的CustomTrainer,覆盖绝大部分的训练需求。
:param fetch_list_value:
:param fetch_list_key:
:param steps:
:param phase:
:return:
"""
raise NotImplementedError
def evaluate(self, reader, phase, step):
"""
模型评估方法,需要用户自定义实现,文心目前已经提供了通用的CustomTrainer,覆盖绝大部分的训练需求。
:param reader:
:param phase:
:param program:
:param step:
:return: loss
"""
raise NotImplementedError
def save_models(self, steps, save_checkpoint=True, save_inference=True):
"""
保存模型,基类已经实现,用户不需要自定义
:param steps:
:param save_checkpoint:
:param save_inference:
:return:
"""
....
核心函数
BaseTrainer做为基类,将训练过程中常用的操作(如运行时环境初始化、网络结构初始化、模型保存等)已经统一封装实现,不需要在子类中重新实现。需要用户按自己业务场景在子类中自定义的核心函数为以下两个:
- train_and_eval(self):模型训练函数
- evaluate(self, reader, phase, step):模型评估函数
自定义实现示例
以文心目前提供的能够覆盖绝大部分的训练场景的CustomTrainer为例,在子类中实现train_and_eval和evalute函数,详见以下代码及核心部分的注释。
...
def train_and_eval(self):
""" 模型训练,训练过程中进行模型评估
:return:
"""
# step1:获取训练集样本总数
num_train_examples = self.params.get("num_train_examples", 0)
if num_train_examples == 0:
num_train_examples = self.data_set_reader.train_reader.get_num_examples()
# step2:启动训练集reader,开始对训练步数进行计数
self.data_set_reader.train_reader.run()
steps = 1
time_begin = time.time()
try:
while True:
try:
# step3:进入reader读取的循环,开始模型训练。
if steps % self.params["train_log_step"] != 0:
# run方法是指进行一次网络计算,一次是一个batch,如果不需要输出计算结果,则need_fetch参数置为False。
self.run(InstanceName.TRAINING, need_fetch=False,
return_numpy=self.return_numpy)
else:
# step4: 在你需要的某些step,将run方法的need_fetch参数置为True,获取当前网络的计算结果。
metrics_tensor_value = self.run(InstanceName.TRAINING, need_fetch=True, return_numpy=self.return_numpy)
current_example, current_epoch = self.data_set_reader.train_reader.get_train_progress()
logging.info("epoch {0} progress {1}/{2} pyreader queue size {3}".format(current_epoch, current_example, num_train_examples,
self.data_set_reader.train_reader.paddle_data_loader.queue.size()))
fetch_output_dict = collections.OrderedDict()
for key, value in zip(self.fetch_list_train_key, metrics_tensor_value):
if key == InstanceName.LOSS and not self.return_numpy:
value = np.array(value)
fetch_output_dict[key] = value
time_end = time.time()
used_time = time_end - time_begin
meta_info = collections.OrderedDict()
meta_info[InstanceName.STEP] = steps
meta_info[InstanceName.GPU_ID] = self.gpu_id
meta_info[InstanceName.TIME_COST] = used_time
# step4: 将神经网络当前的前向计算结果转换成dict格式,传递给model类,进行指标评估
metrics_output = self.model_class.get_metrics(fetch_output_dict, meta_info, InstanceName.TRAINING)
# step5:如果需要使用可视化工具,则需要在trainer初始化方法中先初始化好一个VisualManager对象,让后将指标结果传参进去。
if self.params.get("visualdl_log", False):
assert isinstance(metrics_output, OrderedDict), "metrics_output is must be OrderedDict"
self.visualdl_log(metrics_output, np.mean(fetch_output_dict[InstanceName.LOSS]), steps, phase=InstanceName.TRAINING)
if self.visual_manager and self.params.get("metrics_visual", False):
self.visual_manager.show_metric(metrics_output, steps, tag=InstanceName.TRAINING)
time_begin = time.time()
# step6:在你需要的某些step,调用evaluate方法,在测试集、验证集上进行效果评估。
if steps % self.params["eval_step"] == 0:
if self.params["is_eval_dev"]:
self.evaluate(self.data_set_reader.dev_reader, InstanceName.EVALUATE, steps)
if self.params["is_eval_test"]:
self.evaluate(self.data_set_reader.test_reader, InstanceName.TEST, steps)
# step6:在你需要的某些step,调用基类的save_models方法进行模型保存。
if steps % self.params["save_model_step"] == 0:
self.save_models(steps)
steps += 1
except fluid.core.EOFException:
self.data_set_reader.train_reader.stop()
break
if self.params["is_eval_dev"]:
logging.info("Final evaluate result: ")
self.evaluate(self.data_set_reader.dev_reader, InstanceName.EVALUATE, steps)
if self.params["is_eval_test"]:
logging.info("Final test result: ")
self.evaluate(self.data_set_reader.test_reader, InstanceName.TEST, steps)
except Exception as e:
logging.error('traceback.format_exc():%s' % traceback.format_exc())
self.save_models(steps)
raise e
# step7:训练结束之后可以再进行一次模型保存
self.save_models(steps)
def evaluate(self, reader, phase, step):
"""
:param reader: 待评估数据集对应的reader
:param phase: 数据集tag,取值为test、evaluate
:param step: 当前是在模型训的哪个step,仅做标记,可不填
:return: loss
"""
if not reader:
raise ValueError("{0} reader is none".format(phase))
# step1: 启动reader
reader.run()
# step2: 定义变量保存整个数据集上的模型预测结果
all_metrics_tensor_value = []
i = 0
time_begin = time.time()
while True:
try:
# step3:调用基类的run方法启动模型计算,并返回计算结果
metrics_tensor_value = self.run(phase=phase,
return_numpy=self.return_numpy)
# step4: 计算结果存入all_metrics_tensor_value
if i == 0:
all_metrics_tensor_value = [[tensor] for tensor in metrics_tensor_value]
else:
for j in range(len(metrics_tensor_value)):
one_tensor_value = all_metrics_tensor_value[j]
all_metrics_tensor_value[j] = one_tensor_value + [metrics_tensor_value[j]]
i += 1
except fluid.core.EOFException:
reader.stop()
break
fetch_output_dict = collections.OrderedDict()
# step4:遍历所有的计算结果,将其转为model类中的get_metrics方法可用的dict结构,并调用get_metrics方法计算指标结果。这里是可以自定义的,可以不使用model.get_metrics进行计算,直接在这里写你自己的指标计算代码即可。
for key, value in zip(self.fetch_list_evaluate_key, all_metrics_tensor_value):
if key == InstanceName.LOSS and not self.return_numpy:
value = [np.array(item) for item in value]
fetch_output_dict[key] = value
time_end = time.time()
used_time = time_end - time_begin
meta_info = collections.OrderedDict()
meta_info[InstanceName.STEP] = step
meta_info[InstanceName.GPU_ID] = self.gpu_id
meta_info[InstanceName.TIME_COST] = used_time
metrics_output = self.model_class.get_metrics(fetch_output_dict, meta_info, phase)
...