资讯 文档
技术能力
语音技术
文字识别
人脸与人体
图像技术
语言与知识
视频技术

8.Trainer

简介

在文心中,我们把模型训练过程中常用的操作进行了统一封装、统一调度,形成一套标准流程,这套标准流程的调度就是Trainer(训练器)。一个Trainer实例中的标准操作有初始化运行环境、初始化各项参数、构造神经网络(通过调用Model实现)、数据读取(通过调用Reader实现)、模型训练、模型效果评估、模型保存等。

基本结构

一个Trainer实例中包含一个Reader和Model的实例,Reader实例负责读取明文数据集并将其处理为飞桨可用的Tensor结构;Model实例负责构建神经网络、确定模型指标计算。Trainer实例在train方法(模型训练)中启动一个循环,不断的调用Reader获取数据、然后将数据传递到Model的网络结构中进行计算,并根据用户对模型评估频率和保存频率参数的设置,在指定的时机进行模型评估和模型保存,直到整个训练任务结束。其结构如下所示:

bj-42693c6852ee6216a784f2cf518ae902c43d4cb7.png

基础操作

Trainer的基类为BaseStaticTrainer和BaseDynamicTrainer,分别定义在 ./wenxin/controller/static_trainer.py 和 ./wenxin/controller/dynamic_trainer.py中。BaseDynamicTrainer为动态图模式的Trainer基类,BaseStaticTrainer为静态图模式的Trainer基类。

BaseStaticTrainer

  • 运行环境初始化:根据当前用户的安装环境,初始化飞桨的运行时环境。这部分的代码在基类中已经实现,用户无需自己编码实现,仅需要配置好对应的参数即可,参数定义详见实战演练:使用文心进行模型训练
  • 构造神经网络:调用Model实例的forward和set_optimizer完成神经网络的构造,这部分的代码在基类中已经实现,部分代码如下所示。用户无需自定义编码实现。关于Model介绍请移步Model

    def init_static_model_net(self):
        """
        初始化神经网络
        """
          ....
        self.data_set_reader.train_reader.dataset.create_reader()
              fields_dict = self.data_set_reader.train_reader.dataset.instance_fields_dict()
        ## 构造神经网络
        self.model_class.structure()
              self.forward_train_output = self.model_class.forward(fields_dict, phase=InstanceName.TRAINING)
        loss = self.forward_train_output[InstanceName.LOSS]
        self.model_class.set_optimizer()
        if self.use_fleet:
          self.optimizer = fleet.distributed_optimizer(self.model_class.optimizer)
        else:
          self.optimizer = self.model_class.optimizer
        self.optimizer.minimize(loss)
        ....
          
  • 模型训练:启动训练任务,不断通过Reader读取数据集,送入Model中进行计算,并得到评估结果,直到循环结束。这部分的代码在基类中只是定义了虚方法,需要用户继承来自定义实现。文心目前在每个task中提供了通用的CustomTrainer类(wenxin_appzoo/tasks/xxx/trainer/custom_trainer.py),能够覆盖大部分训练评估流程,用户如果对自定义训练评估要求不高,可以直接使用,部分代码如下所示:

    def do_train(self):
        """ 启动数据集循环,开始训练
        :return:
        """
        ....
        dg = self.data_set_reader.train_reader
        steps = 1
        time_begin = time.time()
        # 开始迭代训练集数据
        for batch_id, data in enumerate(dg()):
            feed_dict = self.data_set_reader.train_reader.dataset.convert_input_list_to_dict(data)
            if steps % self.params["train_log_step"] != 0:
                if self.use_fleet:
                    self.train_exe.run(program=self.train_program, feed=feed_dict, fetch_list=[], return_numpy=True)
                else:
                    self.train_exe.run(feed=feed_dict, fetch_list=[], return_numpy=True)
            else:
                if self.use_fleet:
                    fetch_output = self.train_exe.run(program=self.train_program,
                                                      feed=feed_dict,
                                                      fetch_list=self.fetch_list_train,
                                                      return_numpy=True)
                else:
                    fetch_output = self.train_exe.run(feed=feed_dict,
                                                      fetch_list=self.fetch_list_train,
                                                      return_numpy=True)
                fetch_output_dict = collections.OrderedDict()
                for key, value in zip(self.fetch_list_train_key, fetch_output):
                    if key == InstanceName.LOSS and not self.return_numpy:
                        value = np.array(value)
                    fetch_output_dict[key] = value
                time_end = time.time()
                used_time = time_end - time_begin
                meta_info = collections.OrderedDict()
                meta_info[InstanceName.STEP] = steps
                meta_info[InstanceName.GPU_ID] = self.gpu_id
                meta_info[InstanceName.TIME_COST] = used_time
                # 调用model中的get_metrics方法进行效果评估计算。
                metrics_output = self.model_class.get_metrics(fetch_output_dict, meta_info, InstanceName.TRAINING)
            if self.model_class.lr_scheduler:
                # 这一步一定要有,没有的话lr_scheduler不会生效,学习率一直为0
                self.model_class.lr_scheduler.step()
                    # 在合适的step对验证集和测试集进行评估
            if steps % self.params["eval_step"] == 0:
                if self.params["is_eval_dev"]:
                    self.do_evaluate(self.data_set_reader.dev_reader, InstanceName.EVALUATE, steps)
                if self.params["is_eval_test"]:
                    self.do_evaluate(self.data_set_reader.test_reader, InstanceName.TEST, steps)
            if self.trainer_id == 0:
                  # 在合适的时候进行模型保存
                if steps % self.params["save_model_step"] == 0:
                    self.save_model(steps)
            steps += 1
        if self.params["is_eval_dev"]:
            logging.info("Final evaluate result: ")
            metrics_output = self.do_evaluate(self.data_set_reader.dev_reader, InstanceName.EVALUATE, steps)
            self.eval_metrics = metrics_output
        if self.params["is_eval_test"]:
            logging.info("Final test result: ")
            self.do_evaluate(self.data_set_reader.test_reader, InstanceName.TEST, steps)
        if self.trainer_id == 0:
            self.save_model(steps)
            
  • 模型评估:启动待测试数据集对应的Reader,从Reader中读取数据集,送入Model中进行计算,得到每个batch的计算结果并保存,直到循环结束。将所有batch的计算结果汇总起来传入Model的get_metrics方法进行整个评估数据集上的指标计算。这部分的代码在基类中只是定义了虚方法,需要用户继承来自定义实现。文心目前提供了通用的CustomTrainer类(wenxin_appzoo/tasks/xxx/trainer/custom_trainer.py),能够覆盖大部分训练评估流程,用户如果对自定义训练评估要求不高,可以直接使用,部分代码如下所示:

    def do_evaluate(self, reader, phase, step):
        """在当前的训练状态下,对某个测试集进行评估
        :param reader:待评估数据集
        :param phase:当前的运行阶段
        :param step:当前的运行步数
        """
        all_metrics_tensor_value = []
        i = 0
        time_begin = time.time()
        for batch_id, data in enumerate(reader()):
            feed_dict = reader.dataset.convert_input_list_to_dict(data)
            metrics_tensor_value = self.executor.run(program=self.test_program,
                                             feed=feed_dict,
                                             fetch_list=self.fetch_list_evaluate,
                                             return_numpy=True)
            if i == 0:
                all_metrics_tensor_value = [[tensor] for tensor in metrics_tensor_value]
            else:
                for j in range(len(metrics_tensor_value)):
                    one_tensor_value = all_metrics_tensor_value[j]
                    all_metrics_tensor_value[j] = one_tensor_value + [metrics_tensor_value[j]]
            i += 1
        fetch_output_dict = collections.OrderedDict()
        for key, value in zip(self.fetch_list_evaluate_key, all_metrics_tensor_value):
            if key == InstanceName.LOSS and not self.return_numpy:
                value = [np.array(item) for item in value]
            fetch_output_dict[key] = value
        time_end = time.time()
        used_time = time_end - time_begin
        meta_info = collections.OrderedDict()
        meta_info[InstanceName.STEP] = step
        meta_info[InstanceName.GPU_ID] = self.gpu_id
        meta_info[InstanceName.TIME_COST] = used_time
        metrics_output = self.model_class.get_metrics(fetch_output_dict, meta_info, phase)
        return metrics_output
            
  • 模型保存:可分为checkpoints和inference model两种形式。这部分代码由基类实现,用户无需自定义实现,直接调用save_models方法即可,其基类中的部分代码实现如下所示:

    def save_model(self, steps, save_checkpoint=True, save_inference=True):
        logging.info("save model on static....")
        if save_checkpoint:
         self.save_checkpoint(self.executor, self.train_program, steps)
        if save_inference:
         self.save_inference(self.executor, self.feed_target_tensor, self.inference_output,
                                    self.save_inference_program, steps, self.infer_dict)

BaseDynamicTrainer

BaseDynamicTrainer为动态图模式的Trainer基类,其主要的方法和能力为:

  • 运行环境初始化:根据当前用户的安装环境,初始化飞桨的运行时环境。这部分的代码在基类中已经实现,用户无需自己编码实现,仅需要配置好对应的参数即可,参数定义详见实战演练:使用文心进行模型训练
  • 构造神经网络:调用Model实例的structure和set_optimizer方法完成神经网络的构造,调用load_pretrain_model方法加载热启动或者预训练模型的参数文件,这部分的代码在基类中已经实现,部分代码如下所示。用户无需自定义编码实现。关于Model介绍请移步Model
  • 模型训练:启动训练任务,不断通过Reader读取数据集,送入Model中进行计算,并得到评估结果,直到循环结束。这部分的代码在基类中只是定义了虚方法,需要用户继承来自定义实现。文心目前提供了通用的CustomDynamicTrainer类(wenxin_appzoo/tasks/xxx/custom_dynamic_trainer.py),能够覆盖大部分训练评估流程,用户如果对自定义训练评估要求不高,可以直接使用该类,部分代码如下所示:
def do_train(self):
          """
          :return
          """
          dg = self.data_set_reader.train_reader
          steps = 1
          opt_params = self.original_model.model_params.get('optimization', None)
          # 设置混合精度相关的参数,可以没有,
          init_loss_scaling = opt_params.get("init_loss_scaling", 1.0)
          incr_every_n_steps = opt_params.get("incr_every_n_steps", 1000)
          decr_every_n_nan_or_inf = opt_params.get("decr_every_n_nan_or_inf", 2)
          incr_ratio = opt_params.get("incr_ratio", 2.0)
          decr_ratio = opt_params.get("decr_ratio", 0.8)
          if self.use_amp:
              self.scaler = paddle.amp.GradScaler(enable=self.use_amp,
                                             init_loss_scaling=init_loss_scaling,
                                             incr_ratio=incr_ratio,
                                             decr_ratio=decr_ratio,
                                             incr_every_n_steps=incr_every_n_steps,                                          decr_every_n_nan_or_inf=decr_every_n_nan_or_inf)
              if self.multi_devices:
                  self.scaler = fleet.distributed_scaler(self.scaler)
          time_begin = time.time()
          # 启动训练集数据读取器
          for batch_id, data in enumerate(dg()):
              self.model_class.train()
              with paddle.amp.auto_cast(enable=self.use_amp):
                  example = self.data_set_reader.train_reader.dataset.convert_fields_to_dict(data, need_emb=False)
                  forward_out = self.model_class(example, phase=InstanceName.TRAINING)
                  loss = forward_out[InstanceName.LOSS]
              if self.use_amp:
                  loss = self.scaler.scale(loss)
                  loss.backward()
                  self.scaler.minimize(self.optimizer, loss)
              else:
                  loss.backward()
                  self.optimizer.minimize(loss)
                  self.optimizer.step()
              self.model_class.clear_gradients()
              if self.original_model.lr_scheduler:
                  cur_lr = self.original_model.lr_scheduler.get_lr()
                  self.original_model.lr_scheduler.step()
              else:
                  cur_lr = self.original_model.lr
              self.optimizer.clear_grad()
                          # 在合适的时机进行当前step的数据评估
              if steps % self.params["train_log_step"] == 0:
                  metrics_output = self.original_model.get_metrics(forward_out, meta_info, InstanceName.TRAINING)
                  time_begin = time.time()
                          # 在合适的时机对测试集、验证集上的数据进行评估
              if steps % self.params["eval_step"] == 0:
                  if self.params["is_eval_dev"]:
                      self.do_evaluate(self.data_set_reader.dev_reader, InstanceName.EVALUATE, steps)
                  if self.params["is_eval_test"]:
                      self.do_evaluate(self.data_set_reader.test_reader, InstanceName.TEST, steps)        
                          # 适当的时候进行模型保存
              if steps % self.params["save_model_step"] == 0 and self.worker_index == 0:
                  self.save_models(steps, example)
              steps += 1
  ....
  • 模型评估:启动待测试数据集对应的Reader,从Reader中读取数据集,送入Model中进行计算,得到每个batch的计算结果并保存,直到循环结束。将所有batch的计算结果汇总起来传入Model的get_metrics方法进行整个评估数据集上的指标计算。这部分的代码在基类中只是定义了虚方法,需要用户继承来自定义实现。文心目前提供了通用的CustomDynamicTrainer类(wenxin_appzoo/tasks/xxx/custom_dynamic_trainer.py),能够覆盖大部分训练评估流程,用户如果对自定义训练评估要求不高,可以直接使用,部分代码如下所示:
  def do_evaluate(self, reader, phase, step):
      """
      :param reader:
      :param phase:
      :param step:
      :return loss
      """
      step = 0
      with paddle.no_grad():
          time_begin = time.time()
          # 先切换到eval模式
          self.model_class.eval()
          fetch_output_dict = collections.OrderedDict()
          for batch_id, data in enumerate(reader()):
              step += 1
              example = reader.dataset.convert_fields_to_dict(data, need_emb=False)
              forward_out = self.model_class(example, phase=phase)
              for key, value in forward_out.items():
                  fetch_output_dict.setdefault(key, []).append(value)
          time_end = time.time()
          used_time = time_end - time_begin
          meta_info = collections.OrderedDict()
          meta_info[InstanceName.STEP] = step
          meta_info[InstanceName.TIME_COST] = used_time
          metrics_output = self.original_model.get_metrics(fetch_output_dict, meta_info, phase)
          self.model_class.train()
      logging.info("eval step = {0}".format(step))
  • 模型保存:可分为checkpoints和inference model两种形式。这部分代码由基类实现,用户无需自定义实现,直接调用save_models方法即可,其基类中的部分代码实现如下所示:
def save_models(self, step, fields_dict, save_checkpoint=True, save_inference=True):
      """模型保存:checkpoints文件用来热启动,inference文件用来预测推理
      :param step:
      :param fields_dict:
      :param save_checkpoint
      :param save_inference
      :return
      """
      path_dict = get_model_paths(self.save_checkpoints_path, self.save_inference_model_path, step)
      if save_checkpoint:
          save_path = os.path.join(path_dict["checkpoints_model_path"], "wenxin")
          paddle.save(self.original_model.state_dict(), "{0}.pdparams".format(save_path))
          paddle.save(self.original_model.optimizer.state_dict(), "{0}.pdopt".format(save_path))
          meta_path = path_dict["checkpoints_meta_path"]
          save_meta_data(self.meta_dict, meta_path)
      if save_inference:
          save_path = os.path.join(path_dict["inference_model_path"], "wenxin")
          logging.info("save path: {0}".format(save_path))
          # 通过飞桨提供的TraceLayer方式讲动态图模式模型保存成静态图类型,方便做预测加速。
          output, static_wenxin = WenxinTracedLayer.trace(self.original_model,
                                                          inputs=fields_dict,
                                                          phase="save_inference")
          static_wenxin.save_inference_model(save_path)
          meta_path = path_dict["inference_meta_path"]
          save_meta_data(self.meta_dict, meta_path)
          infer_dict = {"fields": output[InstanceName.TARGET_FEED_NAMES]}
          infer_meta_path = path_dict["inference_infer_meta_path"]
          save_meta_data(infer_dict, infer_meta_path)

文心中的预置Trainer

文心目前为所有的Task都提供了2个通用的Trainer,一个静态图Trainer,一个动态图Trainer,覆盖文本分类、文本匹配、序列标注、信息抽取、term重要性、情感分析等,所有的预置Trainer文件位于./wenxin_appzoo/tasks/xxx/training/目录下。

├── __init__.py
├── custom_trainer.py           ## 通用trainer,支持文本分类、匹配、序列标注、信息抽取等常见任务。
├── mrc_trainer.py              ## 阅读理解任务使用的trainer
├── trainer_config.py           ## 定义trainer所需参数的类

进阶使用

文心中提供了通用的Trainer流程,如果用户需要针对自己的业务场景进行自定义优化使用的话,请参考详细的接口设计与自定义Trainer

上一篇
7.Optimizer
下一篇
9.Inference