一、背景
该应用是基于百度ASR语音识别技术mac-os的APP应用。根据录制的音频进行识别返回识别,适用于需要电话客服沟通的文字记录,会议沟通文字记录。
二、技术方案简介
以mac-os APP的形式进行录音识别,主要采用百度ASR语音识别技术,通过调用云端短语音识别API,自动识别所录取的音频。
百度ASR短语音识别技术:
https://ai.baidu.com/tech/speech/asr
三、具体实现的步骤
代码 local_wav.py:
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import os
import pyaudio
import wave
import requests
import base64
from loguru import logger
from pydub import AudioSegment
from pydub.utils import make_chunks
class Audio:
"""
本地麦克风音频获取
CHUNK = 2048 采样点缓存数量为2048
FORMAT = pyaudio.paInt16 采样深度为16位
CHANNELS = 1 声道数为1
RATE = 16000 采样率为16K
WAVE_OUTPUT_FILENAME = "Oldboy.wav" 保存路径
"""
def __init__(self,CHUNK,FORMAT,CHANNELS,RATE):
self.CHUNK = CHUNK
self.FORMAT = FORMAT
self.CHANNELS = CHANNELS
self.RATE = RATE
self.frames = []
self.active = ''
self.time_start = ''
self.save_st = ''
# 初始化pyaudio
self.stream = pyaudio.PyAudio().open(format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
input=True,
frames_per_buffer=self.CHUNK)
def start_audio(self):
audio_data = self.stream.read(self.CHUNK,exception_on_overflow = False) # 读出声卡缓冲区的音频数据
# print(audio_data)
self.frames.append(audio_data) # 将读出的音频数据追加到record_buf列表
def stop_audio(self): #结束
self.stream.stop_stream()
self.stream.close()
pyaudio.PyAudio().terminate()
def save_audio(self,options):
self.save_st = 1
wf = wave.open(options, 'wb') # 创建一个音频文件,名字为“01.wav"
wf.setnchannels(self.CHANNELS) # 设置声道数为1
wf.setsampwidth(pyaudio.PyAudio().get_sample_size(self.FORMAT)) # 设置采样深度为
wf.setframerate(self.RATE) # 设置采样率为16000
wf.writeframes(b''.join(self.frames)) # 将数据写入创建的音频文件
wf.close() # 写完后将文件关闭
self.save_st = 2
class GetLogging:
"""
日志配置
"""
def __init__(self):
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# 错误日志
logger.add(
os.path.join(BASE_DIR, "w_logs/ERROR/{time:YYYY-MM-DD}.log"),
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
filter=lambda x: True if x["level"].name == "ERROR" else False,
rotation="00:00", retention=7, level='ERROR', encoding='utf-8'
)
# 成功日志
logger.add(
os.path.join(BASE_DIR, "w_logs/SUCCESS/{time:YYYY-MM-DD}.log"),
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
filter=lambda x: True if x["level"].name == "SUCCESS" else False,
rotation="00:00", retention=7, level='SUCCESS', encoding='utf-8',
)
# Default日志
logger.add(
os.path.join(BASE_DIR, "w_logs/Default/{time:YYYY-MM-DD}.log"),
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
rotation="00:00", retention=7, level='DEBUG', encoding='utf-8'
)
# 识别结果
self.logger = logger
def get(self):
return self.logger
globalLog = GetLogging().get() # 日志初始化
class AsrAudio:
"""
ASR 语音识别
"""
def __init__(self,Appid,Apiclient,Apisecurity,audio_file,text_file):
self.Appid = Appid
self.Apiclient = Apiclient
self.Apisecurity = Apisecurity
self.file_path = ''
self.audio_file = audio_file
self.text_file = text_file
self.ss = ""
# 获取access token
def fetch_token(self):
__token_url = 'http://aip.baidubce.com/oauth/2.0/token'
params = {
'grant_type': 'client_credentials',
'client_id': self.Apiclient,
'client_secret': self.Apisecurity
}
result = requests.post(__token_url, data=params)
if result.status_code == 200:
globalLog.success("获取access_token成功")
return result.json()['access_token']
else:
globalLog.error("获取access_token失败:", str(result.json()))
print("get access_token failed:", result.json())
# 读取音频文件
def get_file_content(self,file_path):
with open(file_path, 'rb') as fp:
return fp.read()
# 使用pydub库拆分音频文件,size=秒*1000
def split_audio(self, size=59000):
audios = []
audio_name = self.audio_file[:-4]
audio_format = self.audio_file[-3:]
# print(audio_name, audio_format)
audio1 = AudioSegment.from_file(self.audio_file, audio_format)
chunks = make_chunks(audio1, size)
# print(chunks)
i = 0
for chunk in chunks:
chunk_name = '{}-{}.{}'.format(audio_name, i, 'wav')
chunk = chunk.set_frame_rate(16000).set_channels(1)
chunk.export(chunk_name, format='wav', )
i += 1
audios.append(chunk_name)
globalLog.success("音频切分成功: \n" + str(chunk_name))
return audios
# def encode(self,s):
# '''将字符串转成二进制'''
# return ' '.join([bin(ord(c)).replace('0b', '') for c in s])
#
# def decode(self,s):
# '''将二进制转换成字符串'''
# return ''.join([chr(i) for i in [int(b, 2) for b in s.split(' ')]])
# 调用短语音识别接口,可将识别结果追加只text_file文件中
def asr(self,access_token,au_file):
__asr_url = 'http://vop.baidu.com/server_api'
speech = self.get_file_content(au_file)
data = {}
data['speech'] = base64.b64encode(speech).decode()
data['len'] = len(speech)
data['channel'] = 1
data['format'] = au_file[-3:]
data['rate'] = 16000
data['cuid'] = 'baidu'
data['token'] = access_token
result = requests.post(__asr_url, json=data)
if result.status_code == 200:
self.ss = ''.join(result.json()['result'])
globalLog.success("音频识别成功:\n"+ str(au_file))
globalLog.success('识别结果:\n' + str(self.ss))
globalLog.error('识别结果:\n' + str(self.ss))
else:
globalLog.error("音频获取失败:\n" + str(au_file))
print("asr failed:", result.json())
if self.text_file and os.path.isfile(self.text_file):
with open(self.text_file, 'a+') as f:
# print((self.ss.encode(encoding="utf-8").decode()))
f.write(str(self.ss.encode(encoding='utf-8')))
f.write('\n')
f.close()
globalLog.success("音频结果保存成功:\n" + str(au_file))
else:
return self.ss
代码 main.py:
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import os
import random
import string
import threading
import time
import pyaudio
import rumps
import local_wav
globalLog = local_wav.GetLogging().get() # 日志初始化
audio_dir = './audio_file'
audio_file = './audio_file/audio.wav' # 保存路径
text_dir = './audio_result'
text_file = './audio_result/text.txt' # 识别解析路径
class TasAudio:
def __init__(self,tas,task):
self.tas = tas
self.task = task
def task_audio(self):
"""
CHUNK = 2048 采样点缓存数量为2048
FORMAT = pyaudio.paInt16 采样深度为16位
CHANNELS = 1 声道数为1
RATE = 16000 采样率为16K
"""
my_audio = local_wav.Audio(1024, pyaudio.paInt16, 1, 16000)
st_time = time.time()
while self.tas:
try:
my_audio.start_audio()
globalLog.success("音频录取中>>>>>>>>>>\n")
except Exception as e:
globalLog.success("音频录取过程中异常>>>>>>>>>>>>:\n" + str(e))
if self.task == 9:
en_time = time.time()
ent = int(en_time) - int(st_time)
# print('录音结束', '\n总录音时间:', int(en_time) - int(st_time), '秒')
globalLog.success("音频录取中成功>>>>>>>>>>:\n" + str(print('录音结束' + '\n总录音时间:' + str(ent) + '秒')))
my_audio.save_audio(audio_file)
time.sleep(5)
if my_audio.save_st == 2:
globalLog.success("录音文件save成功>>>>>>>>>>\n")
thread_asr()
time.sleep(2)
else:
globalLog.error("录音文件save失败>>>>>>>>>>\n")
my_audio.stop_audio()
me_au = TasAudio(True, 8)
class WatermelonApp(rumps.App):
"""
app配置
"""
@rumps.clicked("Audio start")
def start(self, sender):
global globalLog
timr_audio()
try:
rumps.notification(title="录音小助手", subtitle="开始录音", message="")
self.title = 'audio runing'
globalLog.success("录音开启成功")
except Exception as e:
globalLog.error("录音开启异常:" + str(e))
@rumps.clicked("Audio done and asr start")
def stop(self, sender):
me_au.tas=False
me_au.task=9
try:
rumps.notification(title="录音小助手", subtitle="停止录音", message="")
globalLog.success("停止录音成功")
self.title = 'Asr done'
time.sleep(1)
except Exception as e:
globalLog.error("停止录音异常:" + str(e))
# @rumps.clicked("Res")
# def Res(self, sender):
# global globalLog
# me_au.tas = True
# me_au.task = 7
# try:
# rumps.notification(title="录音小助手", subtitle="状态重置", message="")
# self.title = 'Res'
# globalLog.success("状态重置成功")
# except Exception as e:
# globalLog.error("状态重置失败:", str(e))
# 定时触发器
def timr_audio():
t = threading.Timer(1, me_au.task_audio,args=[])
me_au.tas = True
me_au.task = 7
t.start()
now = time.localtime()
print('开始录音', '\n开始时间:', time.strftime("%Y-%m-%d-%H:%M:%S", now))
globalLog.success("开始录音成功>>>>>>>>>>>>>")
# 调用识别
def asr_asr():
ak = 'QVGK---------'
sk = 'aADm-----------'
my_asr = local_wav.AsrAudio(1111, ak, sk, audio_file, text_file)
audios = my_asr.split_audio()
# access = my_asr.fetch_token()
access_token = '24.3eeba5d4405d130e34227---------------------9'
for audio in audios:
my_asr.asr(access_token, audio)
rumps.notification(title="", subtitle="识别结果:", message=my_asr.ss)
# 随机线程识别
def thread_asr():
th = random.choice(string.ascii_letters)
th = threading.Thread(target=asr_asr, args='')
th.start()
def mkdir(path):
# 去除首位空格
path = path.strip()
# 去除尾部 \ 符号
path = path.rstrip("\\")
# 判断路径是否存在
# 存在 True
# 不存在 False
isExists = os.path.exists(path)
# 判断结果
if not isExists:
# 如果不存在则创建目录
# 创建目录操作函数
os.makedirs(path)
else:
# 如果目录存在则不创建,并提示目录已存在
pass
if __name__ == "__main__":
mkdir(audio_dir)
mkdir(text_dir)
t_file =open(text_file,'w')
t_file.close()
app = WatermelonApp(icon="/Users/py/电话会议语音转写记录/e1.png", name="watermelon")
app.run()
time.sleep(10)
代码编译打包(命令行执行) py2pp:
python3 setup.py py2app
四、效果说明
pycharm运行脚本效果:
mac-os APP运行效果:
开启录音
停止录音并进行识别
识别结果:
总结说明:
开发工具:Pycharm
引用的工具类:
PyAudio : pyAudio 可以在 python 程序中播放和录制音频
Rumps:mac-os通知助手
Logger:日志模块
Pydub:长音频切分
Py2app:python 打包工具
扩展方向:可以在结合翻译API接口实现录音-识别-翻译的APP
存在的问题点说明:
1、目前APP是打印识别结果,以及日志中记录识别结果,识别结果保存的txt文本是保存的音频为解码的识别结果(这个现象是由于使用编译后的mac-os APP运行的编码环境是ascii,python保存的编码是utf-8导致),辛苦高手支招!
2、mac-os APP需要单独配置录音权限,参考:https://blog.csdn.net/qq_34029469/article/details/107284443
配置说明(命令行执行):
sqlite3 ~/Library/Application\ Support/com.apple.TCC/TCC.db "INSERT or REPLACE INTO access VALUES('kTCCServiceMicrophone','org.pythonmac.unspecified.asr小助手',0,1,1,NULL,NULL,NULL,'UNUSED',NULL,0,1577993260);"
3,mac-os APP(封装的access_token)仅用于体验:
企业云盘:https://ecloud.baidu.com?t=4c66dc76f04ecbd5954d010aca7929bb