mac录音识别APP工具
zhaowo345291 发布于2021-12 浏览:1720 回复:0
1
收藏
最后编辑于2022-04

一、背景

该应用是基于百度ASR语音识别技术mac-os的APP应用。根据录制的音频进行识别返回识别,适用于需要电话客服沟通的文字记录,会议沟通文字记录。

二、技术方案简介

以mac-os APP的形式进行录音识别,主要采用百度ASR语音识别技术,通过调用云端短语音识别API,自动识别所录取的音频。

百度ASR短语音识别技术:

https://ai.baidu.com/tech/speech/asr

三、具体实现的步骤

代码 local_wav.py:

#!/usr/bin/python3
# -*- coding: utf-8 -*-
import os
import pyaudio
import wave
import requests
import base64
from loguru import logger
from pydub import AudioSegment
from pydub.utils import make_chunks


class Audio:
    """
    本地麦克风音频获取

    CHUNK = 2048                           采样点缓存数量为2048
    FORMAT = pyaudio.paInt16               采样深度为16位
    CHANNELS = 1                           声道数为1
    RATE = 16000                           采样率为16K
    WAVE_OUTPUT_FILENAME = "Oldboy.wav"    保存路径
    """
    def __init__(self,CHUNK,FORMAT,CHANNELS,RATE):
        self.CHUNK = CHUNK
        self.FORMAT  = FORMAT
        self.CHANNELS = CHANNELS
        self.RATE = RATE
        self.frames = []
        self.active = ''
        self.time_start = ''
        self.save_st = ''
        # 初始化pyaudio
        self.stream = pyaudio.PyAudio().open(format=self.FORMAT,
                        channels=self.CHANNELS,
                        rate=self.RATE,
                        input=True,
                        frames_per_buffer=self.CHUNK)

    def start_audio(self):
        audio_data = self.stream.read(self.CHUNK,exception_on_overflow = False)           # 读出声卡缓冲区的音频数据
        # print(audio_data)
        self.frames.append(audio_data)                 # 将读出的音频数据追加到record_buf列表

    def stop_audio(self):                              #结束
        self.stream.stop_stream()
        self.stream.close()
        pyaudio.PyAudio().terminate()

    def save_audio(self,options):
        self.save_st = 1
        wf = wave.open(options, 'wb')               # 创建一个音频文件,名字为“01.wav"
        wf.setnchannels(self.CHANNELS)              # 设置声道数为1
        wf.setsampwidth(pyaudio.PyAudio().get_sample_size(self.FORMAT))  # 设置采样深度为
        wf.setframerate(self.RATE)                  # 设置采样率为16000
        wf.writeframes(b''.join(self.frames))       # 将数据写入创建的音频文件
        wf.close()                                  # 写完后将文件关闭
        self.save_st = 2

class GetLogging:
    """
    日志配置
    """

    def __init__(self):
        BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
        # 错误日志
        logger.add(
            os.path.join(BASE_DIR, "w_logs/ERROR/{time:YYYY-MM-DD}.log"),
            format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
            filter=lambda x: True if x["level"].name == "ERROR" else False,
            rotation="00:00", retention=7, level='ERROR', encoding='utf-8'
        )
        # 成功日志
        logger.add(
            os.path.join(BASE_DIR, "w_logs/SUCCESS/{time:YYYY-MM-DD}.log"),
            format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
            filter=lambda x: True if x["level"].name == "SUCCESS" else False,
            rotation="00:00", retention=7, level='SUCCESS', encoding='utf-8',
        )
        # Default日志
        logger.add(
            os.path.join(BASE_DIR, "w_logs/Default/{time:YYYY-MM-DD}.log"),
            format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
            rotation="00:00", retention=7, level='DEBUG', encoding='utf-8'
        )
        # 识别结果
        self.logger = logger

    def get(self):
        return self.logger


globalLog = GetLogging().get()   # 日志初始化



class AsrAudio:
    """
    ASR 语音识别
    """
    def __init__(self,Appid,Apiclient,Apisecurity,audio_file,text_file):
        self.Appid = Appid
        self.Apiclient = Apiclient
        self.Apisecurity = Apisecurity
        self.file_path = ''
        self.audio_file = audio_file
        self.text_file = text_file
        self.ss = ""

    # 获取access token
    def fetch_token(self):
        __token_url = 'http://aip.baidubce.com/oauth/2.0/token'
        params = {
            'grant_type': 'client_credentials',
            'client_id': self.Apiclient,
            'client_secret': self.Apisecurity
        }
        result = requests.post(__token_url, data=params)
        if result.status_code == 200:
            globalLog.success("获取access_token成功")
            return result.json()['access_token']
        else:
            globalLog.error("获取access_token失败:", str(result.json()))
            print("get access_token failed:", result.json())

    # 读取音频文件
    def get_file_content(self,file_path):
        with open(file_path, 'rb') as fp:
            return fp.read()

    # 使用pydub库拆分音频文件,size=秒*1000
    def split_audio(self, size=59000):
        audios = []
        audio_name = self.audio_file[:-4]
        audio_format = self.audio_file[-3:]
        # print(audio_name, audio_format)
        audio1 = AudioSegment.from_file(self.audio_file, audio_format)
        chunks = make_chunks(audio1, size)
        # print(chunks)
        i = 0
        for chunk in chunks:
            chunk_name = '{}-{}.{}'.format(audio_name, i, 'wav')
            chunk = chunk.set_frame_rate(16000).set_channels(1)
            chunk.export(chunk_name, format='wav', )
            i += 1
            audios.append(chunk_name)
            globalLog.success("音频切分成功: \n" + str(chunk_name))
        return audios

    # def encode(self,s):
    #     '''将字符串转成二进制'''
    #     return ' '.join([bin(ord(c)).replace('0b', '') for c in s])
    #
    # def decode(self,s):
    #     '''将二进制转换成字符串'''
    #     return ''.join([chr(i) for i in [int(b, 2) for b in s.split(' ')]])


    # 调用短语音识别接口,可将识别结果追加只text_file文件中

    def asr(self,access_token,au_file):
        __asr_url = 'http://vop.baidu.com/server_api'
        speech = self.get_file_content(au_file)
        data = {}
        data['speech'] = base64.b64encode(speech).decode()
        data['len'] = len(speech)
        data['channel'] = 1
        data['format'] = au_file[-3:]
        data['rate'] = 16000
        data['cuid'] = 'baidu'
        data['token'] = access_token
        result = requests.post(__asr_url, json=data)
        if result.status_code == 200:
            self.ss = ''.join(result.json()['result'])
            globalLog.success("音频识别成功:\n"+ str(au_file))
            globalLog.success('识别结果:\n' + str(self.ss))
            globalLog.error('识别结果:\n' + str(self.ss))
        else:
            globalLog.error("音频获取失败:\n" + str(au_file))
            print("asr failed:", result.json())
        if self.text_file and os.path.isfile(self.text_file):
            with open(self.text_file, 'a+') as f:
                # print((self.ss.encode(encoding="utf-8").decode()))
                f.write(str(self.ss.encode(encoding='utf-8')))
                f.write('\n')
                f.close()
                globalLog.success("音频结果保存成功:\n" + str(au_file))
        else:
            return self.ss

代码 main.py:

#!/usr/bin/python3
# -*- coding: utf-8 -*-
import os
import random
import string
import threading
import time
import pyaudio
import rumps
import local_wav


globalLog = local_wav.GetLogging().get()   # 日志初始化
audio_dir = './audio_file'
audio_file = './audio_file/audio.wav'    # 保存路径
text_dir = './audio_result'
text_file = './audio_result/text.txt'     # 识别解析路径


class TasAudio:
    def __init__(self,tas,task):
        self.tas = tas
        self.task = task

    def task_audio(self):
        """
        CHUNK = 2048                           采样点缓存数量为2048
        FORMAT = pyaudio.paInt16               采样深度为16位
        CHANNELS = 1                           声道数为1
        RATE = 16000                           采样率为16K
        """
        my_audio = local_wav.Audio(1024, pyaudio.paInt16, 1, 16000)
        st_time = time.time()
        while self.tas:
            try:
                my_audio.start_audio()
                globalLog.success("音频录取中>>>>>>>>>>\n")
            except Exception as e:
                globalLog.success("音频录取过程中异常>>>>>>>>>>>>:\n" + str(e))

        if self.task == 9:
            en_time = time.time()
            ent = int(en_time) - int(st_time)
            # print('录音结束', '\n总录音时间:', int(en_time) - int(st_time), '秒')
            globalLog.success("音频录取中成功>>>>>>>>>>:\n" + str(print('录音结束' + '\n总录音时间:' + str(ent) + '秒')))

        my_audio.save_audio(audio_file)
        time.sleep(5)
        if my_audio.save_st == 2:
            globalLog.success("录音文件save成功>>>>>>>>>>\n")
            thread_asr()
            time.sleep(2)
        else:
            globalLog.error("录音文件save失败>>>>>>>>>>\n")
        my_audio.stop_audio()


me_au = TasAudio(True, 8)

class WatermelonApp(rumps.App):

    """
    app配置
    """
    @rumps.clicked("Audio start")
    def start(self, sender):
        global globalLog
        timr_audio()
        try:
            rumps.notification(title="录音小助手", subtitle="开始录音", message="")
            self.title = 'audio runing'
            globalLog.success("录音开启成功")
        except Exception as e:
            globalLog.error("录音开启异常:" + str(e))

    @rumps.clicked("Audio done and asr start")
    def stop(self, sender):
        me_au.tas=False
        me_au.task=9
        try:
            rumps.notification(title="录音小助手", subtitle="停止录音", message="")
            globalLog.success("停止录音成功")
            self.title = 'Asr done'
            time.sleep(1)
        except Exception as e:
            globalLog.error("停止录音异常:" + str(e))


    # @rumps.clicked("Res")
    # def Res(self, sender):
    #     global globalLog
    #     me_au.tas = True
    #     me_au.task = 7
    #     try:
    #         rumps.notification(title="录音小助手", subtitle="状态重置", message="")
    #         self.title = 'Res'
    #         globalLog.success("状态重置成功")
    #     except Exception as e:
    #         globalLog.error("状态重置失败:", str(e))

# 定时触发器
def timr_audio():
    t = threading.Timer(1, me_au.task_audio,args=[])
    me_au.tas = True
    me_au.task = 7
    t.start()
    now = time.localtime()
    print('开始录音', '\n开始时间:', time.strftime("%Y-%m-%d-%H:%M:%S", now))
    globalLog.success("开始录音成功>>>>>>>>>>>>>")

# 调用识别
def asr_asr():
    ak = 'QVGK---------'
    sk = 'aADm-----------'
    my_asr = local_wav.AsrAudio(1111, ak, sk, audio_file, text_file)
    audios = my_asr.split_audio()
    # access = my_asr.fetch_token()
    access_token = '24.3eeba5d4405d130e34227---------------------9'
    for audio in audios:
        my_asr.asr(access_token, audio)
        rumps.notification(title="", subtitle="识别结果:", message=my_asr.ss)



#  随机线程识别
def thread_asr():
    th = random.choice(string.ascii_letters)
    th = threading.Thread(target=asr_asr, args='')
    th.start()


def mkdir(path):
    # 去除首位空格
    path = path.strip()
    # 去除尾部 \ 符号
    path = path.rstrip("\\")
    # 判断路径是否存在
    # 存在     True
    # 不存在   False
    isExists = os.path.exists(path)
    # 判断结果
    if not isExists:
        # 如果不存在则创建目录
        # 创建目录操作函数
        os.makedirs(path)
    else:
        # 如果目录存在则不创建,并提示目录已存在
        pass


if __name__ == "__main__":
    mkdir(audio_dir)
    mkdir(text_dir)
    t_file =open(text_file,'w')
    t_file.close()
    app = WatermelonApp(icon="/Users/py/电话会议语音转写记录/e1.png", name="watermelon")
    app.run()
    time.sleep(10)




 代码编译打包(命令行执行) py2pp:

 python3 setup.py py2app

 

四、效果说明

pycharm运行脚本效果:

mac-os APP运行效果:

开启录音

停止录音并进行识别

识别结果:

 

总结说明:

开发工具:Pycharm

引用的工具类:

PyAudio : pyAudio 可以在 python 程序中播放和录制音频

Rumps:mac-os通知助手

Logger:日志模块

Pydub:长音频切分

Py2app:python 打包工具

 

扩展方向:可以在结合翻译API接口实现录音-识别-翻译的APP

存在的问题点说明:

1、目前APP是打印识别结果,以及日志中记录识别结果,识别结果保存的txt文本是保存的音频为解码的识别结果(这个现象是由于使用编译后的mac-os APP运行的编码环境是ascii,python保存的编码是utf-8导致),辛苦高手支招!

2、mac-os APP需要单独配置录音权限,参考:https://blog.csdn.net/qq_34029469/article/details/107284443

配置说明(命令行执行):

sqlite3 ~/Library/Application\ Support/com.apple.TCC/TCC.db "INSERT or REPLACE INTO access VALUES('kTCCServiceMicrophone','org.pythonmac.unspecified.asr小助手',0,1,1,NULL,NULL,NULL,'UNUSED',NULL,0,1577993260);"

 3,mac-os APP(封装的access_token)仅用于体验:

企业云盘:https://ecloud.baidu.com?t=4c66dc76f04ecbd5954d010aca7929bb

收藏
点赞
1
个赞
TOP
切换版块