added testsuite

This commit is contained in:
Jan Buethe 2023-07-22 13:10:54 -07:00
parent 0e5c103d1a
commit ba44bac435
No known key found for this signature in database
GPG key ID: 9E32027A35B36314
11 changed files with 673 additions and 0 deletions

View file

@ -0,0 +1,46 @@
# lpcnet-testsuite
## setup
The test script is written for Linux only. It requires sox to be installed and available.
Setup is done as usual via
```
pip install -r requirements.txt
```
The test scrip run_warpq_test.py requires a setup file in yaml format, which specifies how
to generate a wave file OUTPUT from a wave file INPUT sampled resampled to the specified
sampling rate as a list of shell commands. This makes it easy to test other neural vocoders
with it as well. Two examples are given in examples. INPUT and OUTPUT will be replaced by using
the string.format(INPUT=input,OUTPUT=output) method.
Here is one example:
```
test: "LPCNet reference test"
processing:
- "sox {INPUT} {INPUT}.raw"
- "/local/code/LPCNet/lpcnet_demo -features {INPUT}.raw {INPUT}.features.f32"
- "/local/code/LPCNet/lpcnet_demo -synthesis {INPUT}.features.f32 {INPUT}.decoded.raw"
- "sox -r 16000 -L -e signed-integer -b 16 -c 1 {INPUT}.decoded.raw {OUTPUT}"
```
The structure of the output folder is as follows:
```
output_folder
+-- html
+-- index.html
+-- items
+-- processing
+-- setup.yml
+-- stats.txt
+-- scores.txt
```
scores.txt contains the WARP-Q scores in descending order (best to worse)
stats.txt contains mean values over all, the 10 best and the 10 worst items
setup.yml contains all information to repeat the run
htms contains a self-contained website displaying the 10 best and 10 worst items
processing contains processing output

View file

@ -0,0 +1,6 @@
test: "LPCNet reference test"
processing:
- "sox {INPUT} {INPUT}.raw"
- "/local/code/LPCNet/lpcnet_demo -features {INPUT}.raw {INPUT}.features.f32"
- "/local/code/LPCNet/lpcnet_demo -synthesis {INPUT}.features.f32 {INPUT}.decoded.raw"
- "sox -r 16000 -L -e signed-integer -b 16 -c 1 {INPUT}.decoded.raw {OUTPUT} trim 0.015"

View file

@ -0,0 +1,5 @@
test: "LPCNet reference test"
processing:
- "sox {INPUT} {INPUT}.raw"
- "/local/code/LPCNet/lpcnet_demo -plc_file causal {PLCFILE} {INPUT}.raw {INPUT}.decoded.raw"
- "sox -r 16000 -L -e signed-integer -b 16 -c 1 {INPUT}.decoded.raw {OUTPUT}"

View file

@ -0,0 +1,5 @@
test: "no noise test"
processing:
- "sox {INPUT} {INPUT}.raw"
- "/home/ubuntu/bin/lpcnet_dump_data_v2 -test {INPUT}.raw {INPUT}.features.f32"
- "/home/ubuntu/opt/miniconda3/envs/torch/bin/python /local/code/lpcnext/test_lpcnet.py {INPUT}.features.f32 /local/experiments/noise_augmentation/output/lpcnet_384_2/checkpoints/checkpoint_epoch_20.pth {OUTPUT}"

View file

@ -0,0 +1,10 @@
scipy
librosa
numpy
scikit-image
pyvad
speechpy
soundfile
pyyaml
pesq
AMFM_decompy

View file

@ -0,0 +1,353 @@
from genericpath import isfile
import os
import multiprocessing
import random
import subprocess
import argparse
import shutil
import yaml
from utils.files import get_wave_file_list
from utils.warpq import compute_WAPRQ
from utils.pesq import compute_PESQ
from utils.pitch import compute_pitch_error
parser = argparse.ArgumentParser()
parser.add_argument('setup', type=str, help='setup yaml specifying end to end processing with model under test')
parser.add_argument('input_folder', type=str, help='input folder path')
parser.add_argument('output_folder', type=str, help='output folder path')
parser.add_argument('--num-testitems', type=int, help="number of testitems to be processed (default 100)", default=100)
parser.add_argument('--seed', type=int, help='seed for random item selection', default=None)
parser.add_argument('--fs', type=int, help="sampling rate at which input is presented as wave file (defaults to 16000)", default=16000)
parser.add_argument('--num-workers', type=int, help="number of subprocesses to be used (default=4)", default=4)
parser.add_argument('--plc-suffix', type=str, default="_is_lost.txt", help="suffix of plc error pattern file: only relevant if command chain uses PLCFILE (default=_is_lost.txt)")
parser.add_argument('--metrics', type=str, default='warpq', help='comma separated string of metrics, supported: {{"warpq", "pesq"}}, default="warpq"')
def check_for_sox_in_path():
r = subprocess.run("sox -h", shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return r.returncode == 0
def run_save_sh(command, verbose=False):
if verbose:
print(f"[run_save_sh] running command {command}...")
r = subprocess.run(command, shell=True)
if r.returncode != 0:
raise RuntimeError(f"command '{command}' failed with exit code {r.returncode}")
def run_processing_chain(input_path, output_path, model_commands, fs, metrics={'warpq'}, plc_suffix="_is_lost.txt", verbose=False):
# prepare model input
model_input = output_path + ".resamp.wav"
run_save_sh(f"sox {input_path} -r {fs} {model_input}", verbose=verbose)
plcfile = os.path.splitext(input_path)[0] + plc_suffix
if os.path.isfile(plcfile):
run_save_sh(f"cp {plcfile} {os.path.dirname(output_path)}")
# generate model output
for command in model_commands:
run_save_sh(command.format(INPUT=model_input, OUTPUT=output_path, PLCFILE=plcfile), verbose=verbose)
scores = dict()
cache = dict()
for metric in metrics:
if metric == 'warpq':
# run warpq
score = compute_WAPRQ(input_path, output_path, sr=fs)
elif metric == 'pesq':
# run pesq
score = compute_PESQ(input_path, output_path, fs=fs)
elif metric == 'pitch_error':
if metric in cache:
score = cache[metric]
else:
rval = compute_pitch_error(input_path, output_path, fs=fs)
score = rval[metric]
cache['voicing_error'] = rval['voicing_error']
elif metric == 'voicing_error':
if metric in cache:
score = cache[metric]
else:
rval = compute_pitch_error(input_path, output_path, fs=fs)
score = rval[metric]
cache['pitch_error'] = rval['pitch_error']
else:
ValueError(f'error: unknown metric {metric}')
scores[metric] = score
return (output_path, scores)
def get_output_path(root_folder, input, output_folder):
input_relpath = os.path.relpath(input, root_folder)
os.makedirs(os.path.join(output_folder, 'processing', os.path.dirname(input_relpath)), exist_ok=True)
output_path = os.path.join(output_folder, 'processing', input_relpath + '.output.wav')
return output_path
def add_audio_table(f, html_folder, results, title, metric):
item_folder = os.path.join(html_folder, 'items')
os.makedirs(item_folder, exist_ok=True)
# table with results
f.write(f"""
<div>
<h2> {title} </h2>
<table>
<tr>
<th> Rank </th>
<th> Name </th>
<th> {metric.upper()} </th>
<th> Audio (out) </th>
<th> Audio (orig) </th>
</tr>
""")
for i, r in enumerate(results):
item, score = r
item_name = os.path.basename(item)
new_item_path = os.path.join(item_folder, item_name)
shutil.copyfile(item, new_item_path)
shutil.copyfile(item + '.resamp.wav', os.path.join(item_folder, item_name + '.orig.wav'))
f.write(f"""
<tr>
<td> {i + 1} </td>
<td> {item_name.split('.')[0]} </td>
<td> {score:.3f} </td>
<td>
<audio controls>
<source src="items/{item_name}">
</audio>
</td>
<td>
<audio controls>
<source src="items/{item_name + '.orig.wav'}">
</audio>
</td>
</tr>
""")
# footer
f.write("""
</table>
</div>
""")
def create_html(output_folder, results, title, metric):
html_folder = output_folder
items_folder = os.path.join(html_folder, 'items')
os.makedirs(html_folder, exist_ok=True)
os.makedirs(items_folder, exist_ok=True)
with open(os.path.join(html_folder, 'index.html'), 'w') as f:
# header and title
f.write(f"""
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>{title}</title>
<style>
article {{
align-items: flex-start;
display: flex;
flex-wrap: wrap;
gap: 4em;
}}
html {{
box-sizing: border-box;
font-family: "Amazon Ember", "Source Sans", "Verdana", "Calibri", sans-serif;
padding: 2em;
}}
td {{
padding: 3px 7px;
text-align: center;
}}
td:first-child {{
text-align: end;
}}
th {{
background: #ff9900;
color: #000;
font-size: 1.2em;
padding: 7px 7px;
}}
</style>
</head>
</body>
<h1>{title}</h1>
<article>
""")
# top 20
add_audio_table(f, html_folder, results[:-21: -1], "Top 20", metric)
# 20 around median
N = len(results) // 2
add_audio_table(f, html_folder, results[N + 10 : N - 10: -1], "Median 20", metric)
# flop 20
add_audio_table(f, html_folder, results[:20], "Flop 20", metric)
# footer
f.write("""
</article>
</body>
</html>
""")
metric_sorting_signs = {
'warpq' : -1,
'pesq' : 1,
'pitch_error' : -1,
'voicing_error' : -1
}
def is_valid_result(data, metrics):
if not isinstance(data, dict):
return False
for metric in metrics:
if not metric in data:
return False
return True
def evaluate_results(output_folder, results, metric):
results = sorted(results, key=lambda x : metric_sorting_signs[metric] * x[1])
with open(os.path.join(args.output_folder, f'scores_{metric}.txt'), 'w') as f:
for result in results:
f.write(f"{os.path.relpath(result[0], args.output_folder)} {result[1]}\n")
# some statistics
mean = sum([r[1] for r in results]) / len(results)
top_mean = sum([r[1] for r in results[-20:]]) / 20
bottom_mean = sum([r[1] for r in results[:20]]) / 20
with open(os.path.join(args.output_folder, f'stats_{metric}.txt'), 'w') as f:
f.write(f"mean score: {mean}\n")
f.write(f"bottom mean score: {bottom_mean}\n")
f.write(f"top mean score: {top_mean}\n")
print(f"\nmean score: {mean}")
print(f"bottom mean score: {bottom_mean}")
print(f"top mean score: {top_mean}\n")
# create output html
create_html(os.path.join(output_folder, 'html', metric), results, setup['test'], metric)
if __name__ == "__main__":
args = parser.parse_args()
# check for sox
if not check_for_sox_in_path():
raise RuntimeError("script requires sox")
# prepare output folder
if os.path.exists(args.output_folder):
print("warning: output folder exists")
reply = input('continue? (y/n): ')
while reply not in {'y', 'n'}:
reply = input('continue? (y/n): ')
if reply == 'n':
os._exit()
else:
# start with a clean sleight
shutil.rmtree(args.output_folder)
os.makedirs(args.output_folder, exist_ok=True)
# extract metrics
metrics = args.metrics.split(",")
for metric in metrics:
if not metric in metric_sorting_signs:
print(f"unknown metric {metric}")
args.usage()
# read setup
print(f"loading {args.setup}...")
with open(args.setup, "r") as f:
setup = yaml.load(f.read(), yaml.FullLoader)
model_commands = setup['processing']
print("\nfound the following model commands:")
for command in model_commands:
print(command.format(INPUT='input.wav', OUTPUT='output.wav', PLCFILE='input_is_lost.txt'))
# store setup to output folder
setup['input'] = os.path.abspath(args.input_folder)
setup['output'] = os.path.abspath(args.output_folder)
setup['seed'] = args.seed
with open(os.path.join(args.output_folder, 'setup.yml'), 'w') as f:
yaml.dump(setup, f)
# get input
print(f"\nCollecting audio files from {args.input_folder}...")
file_list = get_wave_file_list(args.input_folder, check_for_features=False)
print(f"...{len(file_list)} files found\n")
# sample from file list
file_list = sorted(file_list)
random.seed(args.seed)
random.shuffle(file_list)
num_testitems = min(args.num_testitems, len(file_list))
file_list = file_list[:num_testitems]
print(f"\nlaunching test on {num_testitems} items...")
# helper function for parallel processing
def func(input_path):
output_path = get_output_path(args.input_folder, input_path, args.output_folder)
try:
rval = run_processing_chain(input_path, output_path, model_commands, args.fs, metrics=metrics, plc_suffix=args.plc_suffix, verbose=False)
except:
rval = (input_path, -1)
return rval
with multiprocessing.Pool(args.num_workers) as p:
results = p.map(func, file_list)
results_dict = dict()
for name, values in results:
if is_valid_result(values, metrics):
results_dict[name] = values
print(results_dict)
# evaluating results
num_failures = num_testitems - len(results_dict)
print(f"\nprocessing of {num_failures} items failed\n")
for metric in metrics:
print(metric)
evaluate_results(
args.output_folder,
[(name, value[metric]) for name, value in results_dict.items()],
metric
)

View file

View file

@ -0,0 +1,25 @@
import os
def get_wave_file_list(parent_folder, extensions=[".wav", ".flac"], check_for_features=False):
""" traverses subfolders of parent_folder in search for files that match the given extension """
file_list = []
for root, dirs, files in os.walk(parent_folder, topdown=True):
for file in files:
stem, ext = os.path.splitext(file)
#check for extension
if not ext in extensions:
continue
# check if feature file exists
if check_for_features and not os.path.isfile(os.path.join(root, stem + "_features.f32")):
continue
file_list.append(os.path.join(root, file))
return file_list

View file

@ -0,0 +1,14 @@
import pesq
import librosa
def compute_PESQ(ref, test, fs=16000):
if not ref.endswith('.wav') or not test.endswith('.wav'):
raise ValueError('error: expecting .wav as file extension')
ref_item, _ = librosa.load(ref, sr=fs)
test_item, _ = librosa.load(test, sr=fs)
score = pesq.pesq(fs, ref_item, test_item)
return score

View file

@ -0,0 +1,32 @@
import numpy as np
from scipy.io import wavfile
import amfm_decompy.pYAAPT as pYAAPT
import amfm_decompy.basic_tools as basic
def get_voicing_info(x, sr=16000):
signal = basic.SignalObj(x, sr)
pitch = pYAAPT.yaapt(signal, **{'frame_length' : 20.0, 'tda_frame_length' : 20.0})
pitch_values = pitch.samp_values
voiced_flags = pitch.vuv.astype('float')
return pitch_values, voiced_flags
def compute_pitch_error(ref_path, test_path, fs=16000):
fs_orig, x_orig = wavfile.read(ref_path)
fs_test, x_test = wavfile.read(test_path)
min_length = min(len(x_orig), len(x_test))
x_orig = x_orig[:min_length]
x_test = x_test[:min_length]
assert fs_orig == fs_test == fs
pitch_contour_orig, voicing_orig = get_voicing_info(x_orig.astype(np.float32))
pitch_contour_test, voicing_test = get_voicing_info(x_test.astype(np.float32))
return {
'pitch_error' : np.mean(np.abs(pitch_contour_orig - pitch_contour_test)).item(),
'voicing_error' : np.sum(np.abs(voicing_orig - voicing_test)).item() / len(voicing_orig)
}

View file

@ -0,0 +1,177 @@
"""
WARP-Q: Quality Prediction For Generative Neural Speech Codecs
This is the WARP-Q version used in the ICASSP 2021 Paper:
W. A. Jassim, J. Skoglund, M. Chinen, and A. Hines, WARP-Q: Quality prediction
for generative neural speech codecs, paper accepted for presentation at the 2021 IEEE
International Conference on Acoustics, Speech and Signal Processing (ICASSP 2021).
Date of acceptance: 30 Jan 2021. Preprint: https://arxiv.org/pdf/2102.10449
Run using python 3.x and include these package dependencies in your virtual environment:
- pandas
- librosa
- numpy
- pyvad
- skimage
- speechpy
- soundfile
- scipy (optional)
- seaborn (optional, for plotting only)
- multiprocessing (optional, for parallel computing mode only)
- joblib (optional, for parallel computing mode only)
Input:
- The main_test function calls a csv file that contains paths of audio files.
- The csv file cosists of four columns:
- Ref_Wave: reference speech
- Test_Wave: test speech
- MOS: subjective score (optinal, for plotting only)
- Codec: type of speech codec for the test speech (optinal, for plotting only)
Output:
- Code will compute the WARP-Q quality scores between Ref_Wave and Test_Wave,
and will store the obrained results in a new column in the same csv file.
Releases:
Warning: While this code has been tested and commented giving invalid input
files may cause unexpected results and will not be caught by robust exception
handling or validation checking. It will just fail or give you the wrong answer.
In this simple and basic demo, we compute WARP-Q scores for 8 speech samples only.
More data should should be provided to have better score distributions.
(c) Dr Wissam Jassim
University College Dublin
wissam.a.jassim@gmail.com
wissam.jassim@ucd.ie
November 28, 2020
"""
# Load libraries
import librosa, librosa.core, librosa.display
import numpy as np
from pyvad import vad
from skimage.util.shape import view_as_windows
import speechpy
import soundfile as sf
################################ WARP-Q #######################################
def compute_WAPRQ(ref_path,test_path,sr=16000,n_mfcc=12,fmax=5000,patch_size=0.4,
sigma=np.array([[1,1],[3,2],[1,3]])):
# Inputs:
# refPath: path of reference speech
# disPath: path pf degraded speech
# sr: sampling frequency, Hz
# n_mfcc: number of MFCCs
# fmax: cutoff frequency
# patch_size: size of each patch in s
# sigma: step size conditon for DTW
# Output:
# WARP-Q quality score between refPath and disPath
####################### Load speech files #################################
# Load Ref Speech
if ref_path[-4:] == '.wav':
speech_Ref, sr_Ref = librosa.load(ref_path,sr=sr)
else:
if ref_path[-4:] == '.SRC': #For ITUT database if applicable
speech_Ref, sr_Ref = sf.read(ref_path, format='RAW', channels=1, samplerate=16000,
subtype='PCM_16', endian='LITTLE')
if sr_Ref != sr:
speech_Ref = librosa.resample(speech_Ref, sr_Ref, sr)
sr_Ref = sr
# Load Coded Speech
if test_path[-4:] == '.wav':
speech_Coded, sr_Coded = librosa.load(test_path,sr=sr)
else:
if test_path[-4:] == '.OUT': #For ITUT database if applicable
speech_Coded, sr_Coded = sf.read(test_path, format='RAW', channels=1, samplerate=16000,
subtype='PCM_16', endian='LITTLE')
if sr_Coded != sr:
speech_Coded = librosa.resample(speech_Coded, sr_Coded, sr)
sr_Coded = sr
if sr_Ref != sr_Coded:
raise ValueError("Reference and degraded signals should have same sampling rate!")
# Make sure amplitudes are in the range of [-1, 1] otherwise clipping to -1 to 1
# after resampling (if applicable). We experienced this issue for TCD-VOIP database only
speech_Ref[speech_Ref>1]=1.0
speech_Ref[speech_Ref<-1]=-1.0
speech_Coded[speech_Coded>1]=1.0
speech_Coded[speech_Coded<-1]=-1.0
###########################################################################
win_length = int(0.032*sr) #32 ms frame
hop_length = int(0.004*sr) #4 ms overlap
#hop_length = int(0.016*sr)
n_fft = 2*win_length
lifter = 3
# DTW Parameters
Metric = 'euclidean'
# VAD Parameters
hop_size_vad = 30
sr_vad = sr
aggresive = 0
# VAD for Ref speech
vact1 = vad(speech_Ref, sr, fs_vad = sr_vad, hop_length = hop_size_vad, vad_mode=aggresive)
speech_Ref_vad = speech_Ref[vact1==1]
# VAD for Coded speech
vact2 = vad(speech_Coded, sr, fs_vad = sr_vad, hop_length = hop_size_vad, vad_mode=aggresive)
speech_Coded_vad = speech_Coded[vact2==1]
# Compute MFCC features for the two signals
mfcc_Ref = librosa.feature.mfcc(y=speech_Ref_vad,sr=sr,n_mfcc=n_mfcc,fmax=fmax,
n_fft=n_fft,win_length=win_length,hop_length=hop_length,lifter=lifter)
mfcc_Coded = librosa.feature.mfcc(y=speech_Coded_vad,sr=sr,n_mfcc=n_mfcc,fmax=fmax,
n_fft=n_fft,win_length=win_length,hop_length=hop_length,lifter=lifter)
# Feature Normalisation using CMVNW method
mfcc_Ref = speechpy.processing.cmvnw(mfcc_Ref.T,win_size=201,variance_normalization=True).T
mfcc_Coded = speechpy.processing.cmvnw(mfcc_Coded.T,win_size=201,variance_normalization=True).T
# Divid MFCC features of Coded speech into patches
cols = int(patch_size/(hop_length/sr))
window_shape = (np.size(mfcc_Ref,0), cols)
step = int(cols/2)
mfcc_Coded_patch = view_as_windows(mfcc_Coded, window_shape, step)
Acc =[]
band_rad = 0.25
weights_mul=np.array([1, 1, 1])
# Compute alignment cose between each patch and Ref MFCC
for i in range(mfcc_Coded_patch.shape[1]):
patch = mfcc_Coded_patch[0][i]
D, P = librosa.sequence.dtw(X=patch, Y=mfcc_Ref, metric=Metric,
step_sizes_sigma=sigma, weights_mul=weights_mul,
band_rad=band_rad, subseq=True, backtrack=True)
P_librosa = P[::-1, :]
b_ast = P_librosa[-1, 1]
Acc.append(D[-1, b_ast] / D.shape[0])
# Final score
return np.median(Acc).item()