@@ -0,0 +1,115 @@
|
||||
import warnings
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
import matplotlib
|
||||
from telegram import Update, Message
|
||||
from telegram.ext import Updater, CallbackContext, Dispatcher, CommandHandler, MessageHandler, \
|
||||
Filters
|
||||
|
||||
from ina_main import *
|
||||
|
||||
warnings.filterwarnings("ignore")
|
||||
matplotlib.use('agg')
|
||||
|
||||
|
||||
def r(u: Update, msg: str, md=True):
|
||||
updater.bot.sendMessage(chat_id=u.effective_chat.id, text=msg,
|
||||
parse_mode='Markdown' if md else None)
|
||||
|
||||
|
||||
def cmd_start(u: Update, c: CallbackContext):
|
||||
r(u, '欢迎! 点下面的录音按钮就可以开始啦w')
|
||||
|
||||
|
||||
def process_audio(message: Message):
|
||||
# Only when replying to voice or audio
|
||||
audio = message.audio or message.voice
|
||||
if not audio:
|
||||
return
|
||||
|
||||
# Download audio file
|
||||
date = datetime.now().strftime('%Y-%m-%d %H-%M')
|
||||
try:
|
||||
downloader = bot.getFile(audio.file_id)
|
||||
except:
|
||||
downloader = bot.getFile(audio.file_id)
|
||||
file = Path(tmpdir).joinpath(f'{date} {message.from_user.name[1:]}.mp3')
|
||||
print(downloader, '->', file)
|
||||
downloader.download(file)
|
||||
|
||||
# Segment file
|
||||
result = segment(file)
|
||||
|
||||
# Null case
|
||||
print(result)
|
||||
if len(result) == 0:
|
||||
bot.send_message(message.chat_id, '分析失败, 大概是音量太小或者时长太短吧, 再试试w')
|
||||
return
|
||||
|
||||
# Draw results
|
||||
with draw_result(str(file), result) as buf:
|
||||
f, m, o, pf = get_result_percentages(result)
|
||||
msg = f"分析结果: {f*100:.0f}% 🙋♀️ | {m*100:.0f}% 🙋♂️ | {o*100:.0f}% 🚫\n" \
|
||||
f"(结果仅供参考, 如果结果不是你想要的,那就是模型的问题,欢迎反馈)\n" \
|
||||
f"" \
|
||||
f"(因为这个模型基于法语数据, 和中文发音习惯有差异, 所以这个识别结果可能不准)"
|
||||
bot.send_photo(message.chat_id, photo=buf, caption=msg,
|
||||
reply_to_message_id=message.message_id)
|
||||
|
||||
|
||||
def cmd_analyze(u: Update, c: CallbackContext):
|
||||
reply = u.effective_message.reply_to_message
|
||||
|
||||
# Parse command
|
||||
text = u.effective_message.text
|
||||
if not text:
|
||||
return
|
||||
cmd = text.lower().split()[0].strip()
|
||||
|
||||
if cmd[0] not in '!/':
|
||||
return
|
||||
cmd = cmd[1:]
|
||||
|
||||
if cmd not in ['analyze', 'analyze-raw']:
|
||||
return
|
||||
|
||||
if cmd == 'analyze-raw':
|
||||
raw = True
|
||||
|
||||
if u.effective_user.id == reply.from_user.id:
|
||||
process_audio(reply)
|
||||
else:
|
||||
r(u, '只有自己能分析自己的音频哦 👀')
|
||||
|
||||
|
||||
def on_audio(u: Update, c: CallbackContext):
|
||||
process_audio(u.effective_message)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
tmpdir = Path('audio_tmp')
|
||||
tmpdir.mkdir(exist_ok=True, parents=True)
|
||||
|
||||
# Find telegram token
|
||||
path = Path(os.path.abspath(__file__)).parent
|
||||
db_path = path.joinpath('voice-bot-db.json')
|
||||
if 'tg_token' in os.environ:
|
||||
tg_token = os.environ['tg_token']
|
||||
else:
|
||||
with open(path.joinpath('voice-bot-token.txt'), 'r', encoding='utf-8') as f:
|
||||
tg_token = f.read().strip()
|
||||
|
||||
# Telegram login
|
||||
updater = Updater(token=tg_token, use_context=True)
|
||||
dispatcher: Dispatcher = updater.dispatcher
|
||||
bot = updater.bot
|
||||
|
||||
dispatcher.add_handler(CommandHandler('start', cmd_start, filters=Filters.chat_type.private))
|
||||
dispatcher.add_handler(CommandHandler('analyze', cmd_analyze, filters=Filters.reply))
|
||||
dispatcher.add_handler(MessageHandler(Filters.reply, cmd_analyze))
|
||||
dispatcher.add_handler(MessageHandler(Filters.voice & Filters.chat_type.private, on_audio))
|
||||
dispatcher.add_handler(MessageHandler(Filters.audio & Filters.chat_type.private, on_audio))
|
||||
|
||||
print('Starting bot...')
|
||||
updater.start_polling()
|
||||
@@ -0,0 +1,191 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
import os
|
||||
import subprocess
|
||||
import tempfile
|
||||
import time
|
||||
import warnings
|
||||
from subprocess import Popen, PIPE
|
||||
from typing import NamedTuple, Callable
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
import scipy.io.wavfile
|
||||
from PIL import Image
|
||||
from inaSpeechSegmenter import *
|
||||
from matplotlib.axes import Axes
|
||||
from matplotlib.figure import Figure
|
||||
import tensorflow as tf
|
||||
|
||||
|
||||
gpus = tf.config.experimental.list_physical_devices('GPU')
|
||||
for gpu in gpus:
|
||||
tf.config.experimental.set_memory_growth(gpu, True)
|
||||
|
||||
seg = Segmenter()
|
||||
|
||||
|
||||
class ResultFrame(NamedTuple):
|
||||
gender: str
|
||||
start: float
|
||||
end: float
|
||||
prob: float
|
||||
|
||||
|
||||
class Result(NamedTuple):
|
||||
frames: list[ResultFrame]
|
||||
file: str
|
||||
|
||||
|
||||
def segment(file) -> list[ResultFrame]:
|
||||
return [ResultFrame(*s) for s in seg(file)]
|
||||
|
||||
|
||||
def to_wav(file: str, callback: Callable, start_sec: float = 0, stop_sec: float = 0):
|
||||
"""
|
||||
Convert media to temp wav 16k file and return features
|
||||
"""
|
||||
base, _ = os.path.splitext(os.path.basename(file))
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir_name:
|
||||
# build ffmpeg command line
|
||||
tmp_wav = tmpdir_name + '/' + base + '.wav'
|
||||
args = ['ffmpeg', '-y', '-i', file, '-ar', '16000', '-ac', '1']
|
||||
|
||||
if start_sec != 0:
|
||||
args += ['-ss', '%f' % start_sec]
|
||||
if stop_sec != 0:
|
||||
args += ['-to', '%f' % stop_sec]
|
||||
|
||||
args += [tmp_wav]
|
||||
|
||||
# launch ffmpeg
|
||||
p = Popen(args, stdout=PIPE, stderr=PIPE)
|
||||
output, error = p.communicate()
|
||||
assert p.returncode == 0, error
|
||||
|
||||
return callback(tmp_wav)
|
||||
|
||||
|
||||
def show_image_buffer(buf):
|
||||
im = Image.open(buf)
|
||||
im.show()
|
||||
buf.close()
|
||||
|
||||
|
||||
def draw_result(file: str, result: list[ResultFrame]):
|
||||
"""
|
||||
Draw segmentation result
|
||||
|
||||
:param file: Audio file
|
||||
:param result: Segmentation result
|
||||
:return: Result image in bytes (please close it after use)
|
||||
"""
|
||||
def wav_callback(wavfile: str):
|
||||
sample_rate, audio = scipy.io.wavfile.read(wavfile)
|
||||
_time = np.linspace(0, len(audio) / sample_rate, num=len(audio))
|
||||
|
||||
fig: Figure = plt.gcf()
|
||||
ax: Axes = plt.gca()
|
||||
|
||||
# Plot audio
|
||||
plt.plot(_time, audio, color='white')
|
||||
|
||||
# Set size
|
||||
# fig.set_dpi(400)
|
||||
fig.set_size_inches(18, 6)
|
||||
|
||||
# Cutoff frequency so that the plot looks centered
|
||||
cutoff = min(abs(min(audio)), abs(max(audio)))
|
||||
ax.set_ylim([-cutoff, cutoff])
|
||||
ax.set_xlim([result[0].start, result[-1].end])
|
||||
|
||||
# Draw segmentation areas
|
||||
colors = {'female': '#F5A9B8', 'male': '#5BCEFA', 'default': 'gray'}
|
||||
for r in result:
|
||||
color = colors[r.gender] if r.gender in colors else colors['default']
|
||||
ax.axvspan(r.start, r.end - 0.01, alpha=.5, color=color)
|
||||
|
||||
# Savefig to bytes
|
||||
buf = io.BytesIO()
|
||||
plt.axis('off')
|
||||
plt.savefig(buf, bbox_inches='tight', pad_inches=0, transparent=False)
|
||||
buf.seek(0)
|
||||
plt.clf()
|
||||
plt.close()
|
||||
return buf
|
||||
|
||||
return to_wav(file, wav_callback)
|
||||
|
||||
|
||||
def get_result_percentages(result: list[ResultFrame]) -> tuple[float, float, float, float]:
|
||||
"""
|
||||
Get percentages
|
||||
|
||||
:param result: Result
|
||||
:return: %female, %male, %other, %female-vs-female+male
|
||||
"""
|
||||
# Count total and categorical durations
|
||||
total_dur = 0
|
||||
durations: dict[str, int] = {f.gender: 0 for f in result}
|
||||
for f in result:
|
||||
dur = f.end - f.start
|
||||
durations[f.gender] += dur
|
||||
total_dur += dur
|
||||
|
||||
# Convert durations to ratios
|
||||
for d in durations:
|
||||
durations[d] /= total_dur
|
||||
|
||||
# Return results
|
||||
f = durations.get('female', 0)
|
||||
m = durations.get('male', 0)
|
||||
|
||||
fm_total = f + m
|
||||
pf = 0 if fm_total == 0 else f / fm_total
|
||||
|
||||
return f, m, 1 - f - m, pf
|
||||
|
||||
|
||||
def test():
|
||||
# results: BatchResults = BatchResults(
|
||||
# [Result([ResultFrame('female', 0.0, 10.48), ResultFrame('male', 10.48, 12.780000000000001)],
|
||||
# '../test.csv')],
|
||||
# 1.7032792568206787, 1.7032792568206787, 1,
|
||||
# [('../test.csv', 0)])
|
||||
|
||||
warnings.filterwarnings("ignore")
|
||||
audio_file = '../test.flac'
|
||||
|
||||
# Warmup run
|
||||
results = segment(audio_file)
|
||||
print(results)
|
||||
|
||||
# # Actual run
|
||||
# results = process(seg, ['../test.flac'])
|
||||
# print(results)
|
||||
|
||||
# Benchmark
|
||||
# iterations = 60
|
||||
# total_time = 0
|
||||
# audio_len = float(subprocess.getoutput(f'ffprobe -i {audio_file} -show_entries format=duration -v quiet -of csv="p=0"'))
|
||||
# print(f'Audio length: {audio_len}')
|
||||
#
|
||||
# for i in range(iterations):
|
||||
# results = process(seg, [audio_file])
|
||||
# total_time += results.time_full
|
||||
#
|
||||
# time_per_second = total_time / iterations / audio_len
|
||||
# print(f'Benchmark result: {total_time}s / {iterations} iterations = {time_per_second} seconds of processing per second in audio')
|
||||
# print(f'Score: {1 / time_per_second}')
|
||||
|
||||
# Draw results
|
||||
# with draw_result(audio_file, results.results[0]) as buf:
|
||||
# show_image_buffer(buf)
|
||||
# print(get_result_percentages(results.results[0]))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
test()
|
||||
pass
|
||||
@@ -0,0 +1,49 @@
|
||||
def ansi_rgb(r: int, g: int, b: int, foreground: bool = True) -> str:
|
||||
"""
|
||||
Convert rgb color into ANSI escape code format
|
||||
|
||||
:param r:
|
||||
:param g:
|
||||
:param b:
|
||||
:param foreground: Whether the color applies to forground
|
||||
:return: Escape code
|
||||
"""
|
||||
c = '38' if foreground else '48'
|
||||
return f'\033[{c};2;{r};{g};{b}m'
|
||||
|
||||
|
||||
def color(msg: str) -> str:
|
||||
"""
|
||||
Replace extended minecraft color codes in string
|
||||
|
||||
:param msg: Message with minecraft color codes
|
||||
:return: Message with escape codes
|
||||
"""
|
||||
replacements = ["&0/\033[0;30m", "&1/\033[0;34m", "&2/\033[0;32m", "&3/\033[0;36m", "&4/\033[0;31m", "&5/\033[0;35m", "&6/\033[0;33m", "&7/\033[0;37m", "&8/\033[1;30m", "&9/\033[1;34m", "&a/\033[1;32m", "&b/\033[1;36m", "&c/\033[1;31m", "&d/\033[1;35m", "&e/\033[1;33m", "&f/\033[1;37m", "&r/\033[0m", "&n/\n"]
|
||||
for r in replacements:
|
||||
msg = msg.replace(r[:2], r[3:])
|
||||
|
||||
while '&gf(' in msg or '&gb(' in msg:
|
||||
i = msg.index('&gf(') if '&gf(' in msg else msg.index('&gb(')
|
||||
end = msg.index(')', i)
|
||||
code = msg[i + 4:end]
|
||||
fore = msg[i + 2] == 'f'
|
||||
|
||||
if code.startswith('#'):
|
||||
rgb = tuple(int(code.lstrip('#')[i:i+2], 16) for i in (0, 2, 4))
|
||||
else:
|
||||
code = code.replace(',', ' ').replace(';', ' ').replace(' ', ' ')
|
||||
rgb = tuple(int(c) for c in code.split(' '))
|
||||
|
||||
msg = msg[:i] + ansi_rgb(*rgb, foreground=fore) + msg[end + 1:]
|
||||
|
||||
return msg
|
||||
|
||||
|
||||
def printc(msg: str):
|
||||
"""
|
||||
Print with color
|
||||
|
||||
:param msg: Message with minecraft color codes
|
||||
"""
|
||||
print(color(msg + '&r'))
|
||||
Reference in New Issue
Block a user