资讯 社区 文档
技术能力
语音技术
文字识别
人脸与人体
图像技术
语言与知识
视频技术

Trainer

基类定义

Trainer的基类定义在./wenxin/common/controler.so中。

....
class BaseTrainer(object):
    """BaseTrainer"""
    def __init__(self, params, data_set_reader, model_class):
        """
        1.运行环境初始化 2.program初始化 3.计算图网络导入 4.模型参数导入 5.运行(reader) 6.模型导出
        :param params: 运行的基本参数设置
        :param data_set_reader: 运行的基本参数设置
        :param model_class: 使用的是哪个model
        """
        self.init_env()
        self.build_executor()
        self.init_net()
       
        if self.params["load_checkpoint"] or self.params["load_parameters"]:
            self.load_model_params("net_model")
        elif self.params["pre_train_model"]:
            self.load_model_params("pre_train_model")
        self.meta_dict["is_encryption"] = self.need_encrypt
        self.executor_run()
        ....
        
    def init_env(self):
    		"""
        运行环境初始化,基类已经实现,用户不需要自定义
        """
        ....
        
    def init_net(self):
    		"""
        初始化网络结构,基类已经实现,用户不需要自定义
        """
        ....
        
    def run(self, phase, need_fetch=True, return_numpy=True, fetch_list=None, need_feed=False, feed_dict=None):
        """
        运行网络结构,动态获取运行结果,基类已经实现,用户不需要自定义
        :param phase:
        :param need_fetch
        :param return_numpy
        :return:
        """
        pass
        ....
        
        
    def train_and_eval(self):
        """
        启动模型训练,训练过程中可以进行模型评估,需要用户自定义实现,文心目前已经提供了通用的CustomTrainer,覆盖绝大部分的训练需求。
        :param fetch_list_value:
        :param fetch_list_key:
        :param steps:
        :param phase:
        :return:
        """
        raise NotImplementedError
        
    def evaluate(self, reader, phase, step):
        """
        模型评估方法,需要用户自定义实现,文心目前已经提供了通用的CustomTrainer,覆盖绝大部分的训练需求。
        :param reader:
        :param phase:
        :param program:
        :param step:
        :return: loss
        """
        raise NotImplementedError
        
    def save_models(self, steps, save_checkpoint=True, save_inference=True):
        """
        保存模型,基类已经实现,用户不需要自定义
        :param steps:
        :param save_checkpoint:
        :param save_inference:
        :return:
        """
        ....
    

核心函数

BaseTrainer做为基类,将训练过程中常用的操作(如运行时环境初始化、网络结构初始化、模型保存等)已经统一封装实现,不需要在子类中重新实现。需要用户按自己业务场景在子类中自定义的核心函数为以下两个:

  • train_and_eval(self):模型训练函数
  • evaluate(self, reader, phase, step):模型评估函数

自定义实现示例

以文心目前提供的能够覆盖绝大部分的训练场景的CustomTrainer为例,在子类中实现train_and_eval和evalute函数,详见以下代码及核心部分的注释。

...
 def train_and_eval(self):
        """ 模型训练,训练过程中进行模型评估
        :return:
        """
      	# step1:获取训练集样本总数
        num_train_examples = self.params.get("num_train_examples", 0)
        if num_train_examples == 0:
            num_train_examples = self.data_set_reader.train_reader.get_num_examples()
				
        # step2:启动训练集reader,开始对训练步数进行计数
        self.data_set_reader.train_reader.run()
        steps = 1
        time_begin = time.time()
        try:
            while True:
                try:
                  	# step3:进入reader读取的循环,开始模型训练。
                    if steps % self.params["train_log_step"] != 0:
                				# run方法是指进行一次网络计算,一次是一个batch,如果不需要输出计算结果,则need_fetch参数置为False。
                        self.run(InstanceName.TRAINING, need_fetch=False, 
                            return_numpy=self.return_numpy)
                    else:
                        # step4: 在你需要的某些step,将run方法的need_fetch参数置为True,获取当前网络的计算结果。
                        metrics_tensor_value = self.run(InstanceName.TRAINING, need_fetch=True, return_numpy=self.return_numpy)
                        current_example, current_epoch = self.data_set_reader.train_reader.get_train_progress()
                        logging.info("epoch {0} progress {1}/{2} pyreader queue size {3}".format(current_epoch, current_example, num_train_examples,
                                        self.data_set_reader.train_reader.paddle_data_loader.queue.size()))

                        fetch_output_dict = collections.OrderedDict()
                        for key, value in zip(self.fetch_list_train_key, metrics_tensor_value):
                            if key == InstanceName.LOSS and not self.return_numpy:
                                value = np.array(value)
                            fetch_output_dict[key] = value
                        time_end = time.time()
                        used_time = time_end - time_begin
                        meta_info = collections.OrderedDict()
                        meta_info[InstanceName.STEP] = steps
                        meta_info[InstanceName.GPU_ID] = self.gpu_id
                        meta_info[InstanceName.TIME_COST] = used_time
												
                        # step4: 将神经网络当前的前向计算结果转换成dict格式,传递给model类,进行指标评估
                        metrics_output = self.model_class.get_metrics(fetch_output_dict, meta_info, InstanceName.TRAINING)
                        # step5:如果需要使用可视化工具,则需要在trainer初始化方法中先初始化好一个VisualManager对象,让后将指标结果传参进去。
                        if self.params.get("visualdl_log", False):
                            assert isinstance(metrics_output, OrderedDict), "metrics_output is must be OrderedDict"
                            self.visualdl_log(metrics_output, np.mean(fetch_output_dict[InstanceName.LOSS]), steps, phase=InstanceName.TRAINING)

                        if self.visual_manager and self.params.get("metrics_visual", False):
                            self.visual_manager.show_metric(metrics_output, steps, tag=InstanceName.TRAINING)

                        time_begin = time.time()
										
                    # step6:在你需要的某些step,调用evaluate方法,在测试集、验证集上进行效果评估。
                    if steps % self.params["eval_step"] == 0:
                        if self.params["is_eval_dev"]:
                            self.evaluate(self.data_set_reader.dev_reader, InstanceName.EVALUATE, steps)
                        if self.params["is_eval_test"]:
                            self.evaluate(self.data_set_reader.test_reader, InstanceName.TEST, steps)
                            
                    # step6:在你需要的某些step,调用基类的save_models方法进行模型保存。       
                    if steps % self.params["save_model_step"] == 0:
                            self.save_models(steps) 
                    steps += 1
                except fluid.core.EOFException:
                    self.data_set_reader.train_reader.stop()
                    break
            if self.params["is_eval_dev"]:
                logging.info("Final evaluate result: ")
                self.evaluate(self.data_set_reader.dev_reader, InstanceName.EVALUATE, steps)
            if self.params["is_eval_test"]:
                logging.info("Final test result: ")
                self.evaluate(self.data_set_reader.test_reader, InstanceName.TEST, steps)
        except Exception as e:
            logging.error('traceback.format_exc():%s' % traceback.format_exc())
            self.save_models(steps)
            raise e
            
        # step7:训练结束之后可以再进行一次模型保存  
        self.save_models(steps)
       
            

    def evaluate(self, reader, phase, step):
        """
        :param reader: 待评估数据集对应的reader
        :param phase:  数据集tag,取值为test、evaluate
        :param step: 当前是在模型训的哪个step,仅做标记,可不填
        :return: loss
        """
        if not reader:
            raise ValueError("{0} reader is none".format(phase))
            
        # step1: 启动reader   
        reader.run()
        
        # step2: 定义变量保存整个数据集上的模型预测结果
        all_metrics_tensor_value = []
        i = 0
        time_begin = time.time()
        while True:
            try:
              	# step3:调用基类的run方法启动模型计算,并返回计算结果
                metrics_tensor_value = self.run(phase=phase, 
                    return_numpy=self.return_numpy)
                # step4: 计算结果存入all_metrics_tensor_value
                if i == 0:
                    all_metrics_tensor_value = [[tensor] for tensor in metrics_tensor_value]
                else:
                    for j in range(len(metrics_tensor_value)):
                        one_tensor_value = all_metrics_tensor_value[j]
                        all_metrics_tensor_value[j] = one_tensor_value + [metrics_tensor_value[j]]
                i += 1
            except fluid.core.EOFException:
                reader.stop()
                break

        fetch_output_dict = collections.OrderedDict()
        
        # step4:遍历所有的计算结果,将其转为model类中的get_metrics方法可用的dict结构,并调用get_metrics方法计算指标结果。这里是可以自定义的,可以不使用model.get_metrics进行计算,直接在这里写你自己的指标计算代码即可。
        for key, value in zip(self.fetch_list_evaluate_key, all_metrics_tensor_value):
            if key == InstanceName.LOSS and not self.return_numpy:
                value = [np.array(item) for item in value]
            fetch_output_dict[key] = value
        time_end = time.time()
        used_time = time_end - time_begin

        meta_info = collections.OrderedDict()
        meta_info[InstanceName.STEP] = step
        meta_info[InstanceName.GPU_ID] = self.gpu_id
        meta_info[InstanceName.TIME_COST] = used_time
        metrics_output = self.model_class.get_metrics(fetch_output_dict, meta_info, phase)

...
上一篇
Tokenizer
下一篇
数据蒸馏