@
在1.2官网注册后拿到APISecret和APIKey,直接复制文章2.5demo代码,保存为real_time_audio_recognition.py,在命令行执行
python real_time_audio_recognition.py -client_secret=您的client_secret -client_id=您的client_id -file_path=test.wav --audio_format=wav --sample_rate=16000
使用中有任何问题,欢迎留言提问。
Python 3
标贝科技 https://ai.data-baker.com/#/index
填写邀请码fwwqgs,每日免费调用量还可以翻倍
点击产品地址进行登录,支持短信、密码、微信三种方式登录。
登录后进入【首页概览】,各位开发者可以进行创建多个应用。包括一句话识别、长语音识别、录音文件识别;在线合成、离线合成、长文本合成。
进入【已创建的应用】,左侧选择您需调用的AI技术服务,右侧展示对应服务页面概览(您可查询用量、管理套餐、购买服务量、自主获取授权、预警管理)。
通过服务 / 授权管理,获取对应参数,进行开发配置(获取访问令牌token)
拿到Key和Secret就可以正式使用啦!
在拿到Key和Secret后,我们还需要调用授权接口获取access_token,这个access_token有效时长是24小时。
# 获取access_token用于鉴权
def get_access_token(client_secret, client_id):
grant_type = "client_credentials"
url = "https://openapi.data-baker.com/oauth/2.0/token?grant_type={}&client_secret={}&client_id={}" .format(grant_type, client_secret, client_id)
try:
response = requests.post(url)
response.raise_for_status()
except Exception as e:
print(response.text)
raise Exception
else:
access_token = json.loads(response.text).get(‘access_token‘)
return access_token
需要根据接口要求设置参数,并且对音频数据进行分割
# 准备数据
def prepare_data(args, access_token):
# 读取音频文件
with open(args.file_path, ‘rb‘) as f:
file = f.read()
# 填写Header信息
audio_format = args.audio_format
sample_rate = args.sample_rate
splited_data = [str(base64.b64encode(file[i:i + 5120]), encoding=‘utf-8‘) for i in range(0, len(file), 5120)]
asr_params = {"audio_format": audio_format, "sample_rate": int(sample_rate), "speech_type": 1}
json_list = []
for i in range(len(splited_data)):
if i != len(splited_data) - 1:
asr_params[‘req_idx‘] = i
else:
asr_params[‘req_idx‘] = -len(splited_data) + 1
asr_params["audio_data"] = splited_data[i]
data = {"access_token": access_token, "version": "1.0", "asr_params": asr_params}
json_list.append(json.dumps(data))
return json_list
client_secret和client_id:在文章1.2的官网获取,必填
file_save_path:文件保存路径,必填
audio_format:音频类型,默认wav格式
sample_rate:采样率,默认16000Hz
# 获取命令行输入参数
def get_args():
parser = argparse.ArgumentParser(description=‘ASR‘)
parser.add_argument(‘-client_secret‘, type=str, required=True)
parser.add_argument(‘-client_id‘, type=str, required=True)
parser.add_argument(‘-file_path‘, type=str, required=True)
parser.add_argument(‘--audio_format‘, type=str, default=‘wav‘)
parser.add_argument(‘--sample_rate‘, type=str, default=‘16000‘)
args = parser.parse_args()
return args
class Client:
def __init__(self, data, uri):
self.data = data
self.uri = uri
#建立连接
def connect(self):
ws_app = websocket.WebSocketApp(uri,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close)
ws_app.run_forever()
# 建立连接后发送消息
def on_open(self, ws):
print("sending..")
for i in range(len(self.data)):
ws.send(self.data[i])
# 接收消息
def on_message(self, ws, message):
code = json.loads(message).get("code")
if code != 90000:
# 打印接口错误
print(message)
if json.loads(message).get(‘end_flag‘) == 1:
print(json.loads(message).get(‘asr_text‘))
# 打印错误
def on_error(slef, ws, error):
print("error: ", str(error))
# 关闭连接
def on_close(ws):
print("client closed.")
import argparse
import json
import base64
import requests
import websocket
class Client:
def __init__(self, data, uri):
self.data = data
self.uri = uri
#建立连接
def connect(self):
ws_app = websocket.WebSocketApp(uri,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close)
ws_app.run_forever()
# 建立连接后发送消息
def on_open(self, ws):
print("sending..")
for i in range(len(self.data)):
ws.send(self.data[i])
# 接收消息
def on_message(self, ws, message):
code = json.loads(message).get("code")
if code != 90000:
# 打印接口错误
print(message)
if json.loads(message).get(‘end_flag‘) == 1:
print(json.loads(message).get(‘asr_text‘))
# 打印错误
def on_error(slef, ws, error):
print("error: ", str(error))
# 关闭连接
def on_close(ws):
print("client closed.")
# 准备数据
def prepare_data(args, access_token):
# 读取音频文件
with open(args.file_path, ‘rb‘) as f:
file = f.read()
# 填写Header信息
audio_format = args.audio_format
sample_rate = args.sample_rate
splited_data = [str(base64.b64encode(file[i:i + 5120]), encoding=‘utf-8‘) for i in range(0, len(file), 5120)]
asr_params = {"audio_format": audio_format, "sample_rate": int(sample_rate), "speech_type": 1}
json_list = []
for i in range(len(splited_data)):
if i != len(splited_data) - 1:
asr_params[‘req_idx‘] = i
else:
asr_params[‘req_idx‘] = -len(splited_data) + 1
asr_params["audio_data"] = splited_data[i]
data = {"access_token": access_token, "version": "1.0", "asr_params": asr_params}
json_list.append(json.dumps(data))
return json_list
# 获取命令行输入参数
def get_args():
parser = argparse.ArgumentParser(description=‘ASR‘)
parser.add_argument(‘-client_secret‘, type=str, required=True)
parser.add_argument(‘-client_id‘, type=str, required=True)
parser.add_argument(‘-file_path‘, type=str, required=True)
parser.add_argument(‘--audio_format‘, type=str, default=‘wav‘)
parser.add_argument(‘--sample_rate‘, type=str, default=‘16000‘)
args = parser.parse_args()
return args
# 获取access_token用于鉴权
def get_access_token(client_secret, client_id):
grant_type = "client_credentials"
url = "https://openapi.data-baker.com/oauth/2.0/token?grant_type={}&client_secret={}&client_id={}" .format(grant_type, client_secret, client_id)
try:
response = requests.post(url)
response.raise_for_status()
except Exception as e:
print(response.text)
raise Exception
else:
access_token = json.loads(response.text).get(‘access_token‘)
return access_token
if __name__ == ‘__main__‘:
try:
args = get_args()
# 获取access_token
client_secret = args.client_secret
client_id = args.client_id
access_token = get_access_token(client_secret, client_id)
# 准备数据
data = prepare_data(args, access_token)
uri = "wss://openapi.data-baker.com/asr/realtime"
# 建立Websocket连接
client = Client(data, uri)
client.connect()
except Exception as e:
print(e)
复制所有代码,确定音频为wav格式,采样率为16K,在命令行执行
python real_time_audio_recognition.py -client_secret=您的client_secret -client_id=您的client_id -file_path=test.wav --audio_format=wav --sample_rate=16000
填写邀请码fwwqgs,每日免费调用量还可以翻倍
原文:https://www.cnblogs.com/DataBaker/p/14914455.html