Finished LAS Paper
This commit is contained in:
@ -0,0 +1,4 @@
|
||||
# __init__.py
|
||||
__all__ = ['noise_suppressor', 'noise_estimator', 'suppression_gain']
|
||||
|
||||
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -0,0 +1,177 @@
|
||||
import numpy as np
|
||||
|
||||
|
||||
'''
|
||||
Constants
|
||||
'''
|
||||
# 1) Parameters of Short Time Fourier Analysis:
|
||||
Fs_ref = 8e3 # 1.1) Reference Sampling frequency
|
||||
M_ref = 512 # 1.2) Size of analysis window
|
||||
#Mo_ref = 0.75*M_ref # 1.3) Number of overlapping samples in consecutive frames
|
||||
Mo_ref = 352
|
||||
|
||||
# 2) Parameters of Noise Spectrum Estimate
|
||||
w = 1 # 2.1) Size of frequency smoothing window function = 2*w+1
|
||||
alpha_s_ref = 0.9 # 2.2) Recursive averaging parameter for the smoothing operation
|
||||
Nwin = 8 # 2.3) Resolution of local minima search
|
||||
Vwin = 15
|
||||
delta_s = 1.67 # 2.4) Local minimum factor
|
||||
Bmin = 1.66
|
||||
delta_y = 4.6 # 2.4) Local minimum factor
|
||||
delta_yt = 3
|
||||
alpha_d_ref = 0.85 # 2.7) Recursive averaging parameter for the noise
|
||||
|
||||
# 3) Parameters of a Priori Probability for Signal-Absence Estimate
|
||||
alpha_xi_ref = 0.7 # 3.1) Recursive averaging parameter
|
||||
|
||||
# 4) Parameters of "Decision-Directed" a Priori SNR Estimate
|
||||
alpha_eta_ref = 0.95 # 4.1) Recursive averaging parameter
|
||||
eta_min_dB = -18 # 4.2) Lower limit constraint
|
||||
|
||||
# 5) Flags
|
||||
nonstat = 'medium' #Non stationarity # new version
|
||||
|
||||
Fs = Fs_ref
|
||||
M = int(M_ref)
|
||||
Mo = int(Mo_ref)
|
||||
Mno = int(M-Mo)
|
||||
alpha_s = alpha_s_ref
|
||||
alpha_d = alpha_d_ref
|
||||
alpha_eta = alpha_eta_ref
|
||||
alpha_xi = alpha_xi_ref
|
||||
|
||||
alpha_d_long = 0.99
|
||||
eta_min = 10**(eta_min_dB/10)
|
||||
|
||||
#b = hanning(2*w+1)
|
||||
#b = b/sum(b) # normalize the window function
|
||||
b = np.array([0, 1, 0])
|
||||
|
||||
M21 = int(M/2+1)
|
||||
|
||||
class NoiseEstimator(object):
|
||||
def update(self, features):
|
||||
pass
|
||||
|
||||
class ImcraNoiseEstimator(NoiseEstimator):
|
||||
def __init__(self):
|
||||
self.l = 0 #count of frame
|
||||
self.l_mod_lswitch = 0
|
||||
self.S = np.zeros(M21)
|
||||
self.St = np.zeros(M21)
|
||||
self.Sy = np.zeros(M21)
|
||||
self.Smin = np.zeros(M21)
|
||||
self.Smint = np.zeros(M21)
|
||||
self.SMact = np.zeros(M21)
|
||||
self.SMactt = np.zeros(M21)
|
||||
self.SW = np.zeros((M21,Nwin))
|
||||
self.SWt = np.zeros((M21,Nwin))
|
||||
self.lambda_d = np.zeros(M21)
|
||||
self.lambda_dav = np.zeros(M21)
|
||||
|
||||
def update(self, features):
|
||||
Ya2 = features['signal_power']
|
||||
self.eta_2term = features['eta_2term']
|
||||
|
||||
self.l = self.l + 1
|
||||
gamma = Ya2 / np.maximum(self.lambda_d, 1e-10) #post_snr
|
||||
eta = alpha_eta*self.eta_2term + (1-alpha_eta)*np.maximum(gamma-1,0) #prior_snr
|
||||
eta = np.maximum(eta,eta_min)
|
||||
v = gamma*eta/(1+eta)
|
||||
|
||||
# 2.1. smooth over frequency
|
||||
Sf = np.convolve(b, Ya2) # smooth over frequency
|
||||
Sf = Sf[w:M21+w]
|
||||
# if l==1
|
||||
if self.l == 1 :
|
||||
self.Sy = Ya2
|
||||
self.S = Sf
|
||||
self.St = Sf
|
||||
self.lambda_dav = Ya2
|
||||
else :
|
||||
self.S = alpha_s * self.S + (1-alpha_s) * Sf # smooth over time
|
||||
|
||||
if self.l < 15 :
|
||||
self.Smin = self.S
|
||||
self.SMact = self.S
|
||||
else :
|
||||
self.Smin = np.minimum(self.Smin, self.S)
|
||||
self.SMact = np.minimum(self.SMact, self.S)
|
||||
|
||||
# Local Minima Search
|
||||
I_f = np.zeros(M21)
|
||||
for i in range(M21) :
|
||||
I_f[i] = Ya2[i]<delta_y*Bmin*self.Smin[i] and self.S[i]<delta_s*Bmin*self.Smin[i] and 1
|
||||
conv_I = np.convolve(b, I_f)
|
||||
conv_I = conv_I[w:M21+w]
|
||||
Sft = self.St
|
||||
idx = [i for i, v in enumerate(conv_I) if v>0]
|
||||
if len(idx)!=0 :
|
||||
if w :
|
||||
conv_Y = np.convolve(b, I_f*Ya2)
|
||||
conv_Y = conv_Y[w:M21+w]
|
||||
Sft[idx] = conv_Y[idx]/conv_I[idx]
|
||||
else :
|
||||
Sft[idx] = Ya2[idx]
|
||||
|
||||
if self.l < 15 :
|
||||
self.St = self.S
|
||||
self.Smint = self.St
|
||||
self.SMactt = self.St
|
||||
else :
|
||||
self.St[:] = alpha_s * self.St + (1-alpha_s) * Sft
|
||||
self.Smint[:] = np.minimum(self.Smint, self.St)
|
||||
self.SMactt[:] = np.minimum(self.SMactt, self.St)
|
||||
|
||||
qhat = np.ones(M21)
|
||||
phat = np.zeros(M21)
|
||||
|
||||
if nonstat == 'low' :
|
||||
gamma_mint = Ya2/Bmin/np.maximum(self.Smin,1e-10)
|
||||
zetat = self.S/Bmin/np.maximum(self.Smin,1e-10)
|
||||
else :
|
||||
gamma_mint = Ya2/Bmin/np.maximum(self.Smint,1e-10)
|
||||
zetat = self.S/Bmin/np.maximum(self.Smint,1e-10)
|
||||
|
||||
for idx in range(M21) :
|
||||
if gamma_mint[idx]>1 and gamma_mint[idx]<delta_yt and zetat[idx]<delta_s :
|
||||
qhat[idx] = (delta_yt-gamma_mint[idx])/(delta_yt-1)
|
||||
phat[idx] = 1/(1+qhat[idx]/(1-qhat[idx])*(1+eta[idx])*np.exp(-v[idx]))
|
||||
if gamma_mint[idx]>delta_yt or zetat[idx]>=delta_s :
|
||||
phat[idx] = 1
|
||||
|
||||
self.l_mod_lswitch = self.l_mod_lswitch + 1
|
||||
if self.l_mod_lswitch == Vwin :
|
||||
self.l_mod_lswitch = 0
|
||||
|
||||
if self.l == Vwin :
|
||||
for i in range(Nwin):
|
||||
self.SW[:,i] = self.S
|
||||
self.SWt[:, i] = self.St
|
||||
else :
|
||||
self.SW[:,:Nwin-1] = self.SW[:,1:Nwin]
|
||||
self.SW[:,Nwin-1] = self.SMact
|
||||
self.Smin = self.SW.min(1)
|
||||
self.SMact = self.S
|
||||
self.SWt[:,:Nwin-1] = self.SWt[:,1:Nwin]
|
||||
self.SWt[:,Nwin-1] = self.SMactt
|
||||
self.Smint = self.SWt.min(1)
|
||||
self.SMactt = self.St
|
||||
|
||||
alpha_dt = alpha_d + (1-alpha_d)*phat
|
||||
self.lambda_dav = alpha_dt * self.lambda_dav + (1-alpha_dt)*Ya2
|
||||
if self.l < 15 :
|
||||
self.lambda_dav_long = self.lambda_dav
|
||||
else :
|
||||
alpha_dt_long = alpha_d_long + (1-alpha_d_long)*phat
|
||||
self.lambda_dav_long = alpha_dt_long * self.lambda_dav_long + (1-alpha_dt_long)*Ya2
|
||||
|
||||
# 2.4. Noise Spectrum Estimate
|
||||
if nonstat == 'high' :
|
||||
self.lambda_d = 2 * self.lambda_dav
|
||||
else :
|
||||
self.lambda_d = 1.4685 * self.lambda_dav
|
||||
|
||||
return self.lambda_d
|
||||
|
||||
|
@ -0,0 +1,98 @@
|
||||
#!/usr/bin/python
|
||||
|
||||
import numpy as np
|
||||
from .noise_estimator import ImcraNoiseEstimator
|
||||
from .suppression_gain import OmlsaGain
|
||||
|
||||
'''
|
||||
Constants
|
||||
'''
|
||||
# 1) Parameters of Short Time Fourier Analysis:
|
||||
Fs_ref = 8e3 # 1.1) Reference Sampling frequency
|
||||
M_ref = 512 # 1.2) Size of analysis window
|
||||
#Mo_ref = 0.75*M_ref # 1.3) Number of overlapping samples in consecutive frames
|
||||
Mo_ref = 352
|
||||
Mno_ref = 160
|
||||
|
||||
# zero_thres is a threshold for discriminating between zero and nonzero sample.
|
||||
zero_thres = 1e-10
|
||||
|
||||
|
||||
'''
|
||||
Class
|
||||
'''
|
||||
class NoiseSuppressor(object):
|
||||
def __init__(self, sample_rate):
|
||||
self.sample_rate = sample_rate
|
||||
self.frame_size = Mno_ref
|
||||
self.overlap_size = Mo_ref
|
||||
self.fft_size = M_ref
|
||||
self.win =np.hamming(self.fft_size)
|
||||
self.in_buffer = np.zeros(self.fft_size)
|
||||
self.out_buffer = np.zeros(self.fft_size)
|
||||
self.noise_estimator = ImcraNoiseEstimator()
|
||||
self.suppression_gain = OmlsaGain(sample_rate, self.fft_size)
|
||||
self.fnz_flag = 0 # flag for the first frame which is non-zero
|
||||
|
||||
def get_frame_size(self):
|
||||
return self.frame_size
|
||||
|
||||
def get_fft_size(self):
|
||||
return self.fft_size
|
||||
|
||||
def stft_analyze(self, audio):
|
||||
M = self.fft_size
|
||||
M21 = int(M/2+1)
|
||||
Mno = int(M - self.overlap_size)
|
||||
|
||||
self.in_buffer[:M-Mno] = self.in_buffer[Mno:M] # update the frame of data
|
||||
self.in_buffer[M-Mno:M] = audio
|
||||
signal_spec = np.zeros(M)
|
||||
signal_power = np.zeros(M21)
|
||||
|
||||
if ((self.fnz_flag==0 and abs(self.in_buffer[1])>zero_thres)) or \
|
||||
(self.fnz_flag==1 and any(abs(self.in_buffer)>zero_thres)) :
|
||||
self.fnz_flag = 1
|
||||
# 1. Short Time Fourier Analysis
|
||||
signal_spec = np.fft.fft(self.win * self.in_buffer)
|
||||
signal_power = abs(signal_spec[:M21])**2
|
||||
|
||||
return signal_spec, signal_power
|
||||
|
||||
#def stft_synthesize(self, audio):
|
||||
|
||||
def process_frame(self, frame_data):
|
||||
|
||||
M = self.fft_size
|
||||
M21 = int(M/2+1)
|
||||
Mno = int(M - self.overlap_size)
|
||||
|
||||
#0 STFT Analysis
|
||||
signal_spec, signal_power = self.stft_analyze(frame_data)
|
||||
yout = np.zeros(Mno)
|
||||
|
||||
if self.fnz_flag == 1 :
|
||||
#1 rough noise estimation
|
||||
#2 rough a priori and posteri snr estimation
|
||||
#3 speech presence prabability estimation
|
||||
#4 precise noise estimation
|
||||
#5 a priori and posteri snr estimation
|
||||
features= {'signal_power': signal_power,
|
||||
'eta_2term': self.suppression_gain.get_eta()}
|
||||
noise_power = self.noise_estimator.update(features)
|
||||
|
||||
#6 Update suppression gain
|
||||
features= {'signal_power': signal_power,
|
||||
'noise_power': noise_power}
|
||||
gain = self.suppression_gain.update(features)
|
||||
|
||||
#7 STFT Synthesis
|
||||
X = gain * signal_spec[:M21]
|
||||
x = self.win *np.fft.irfft(X)
|
||||
self.out_buffer = self.out_buffer + x
|
||||
|
||||
yout = self.out_buffer[:Mno] * 1.0
|
||||
self.out_buffer[:M-Mno] = self.out_buffer[Mno:M] # update output frame
|
||||
self.out_buffer[M-Mno:M] = np.zeros(Mno) # update output frame
|
||||
|
||||
return yout
|
@ -0,0 +1,190 @@
|
||||
import numpy as np
|
||||
from numpy import matlib
|
||||
from scipy.special import expn
|
||||
|
||||
'''
|
||||
Constants
|
||||
'''
|
||||
# 1) Parameters of Short Time Fourier Analysis:
|
||||
Fs_ref = 8e3 # 1.1) Reference Sampling frequency
|
||||
M_ref = 512 # 1.2) Size of analysis window
|
||||
#Mo_ref = 0.75*M_ref # 1.3) Number of overlapping samples in consecutive frames
|
||||
Mo_ref = 352
|
||||
|
||||
# 3) Parameters of a Priori Probability for Signal-Absence Estimate
|
||||
alpha_xi_ref = 0.7 # 3.1) Recursive averaging parameter
|
||||
w_xi_local = 1 # 3.2) Size of frequency local smoothing window function
|
||||
w_xi_global = 15 # 3.3) Size of frequency local smoothing window function
|
||||
f_u = 10e3 # 3.4) Upper frequency threshold for global decision
|
||||
f_l = 50 # 3.5) Lower frequency threshold for global decision
|
||||
P_min = 0.005 # 3.6) Lower bound constraint
|
||||
xi_lu_dB = -5 # 3.7) Upper threshold for local decision
|
||||
xi_ll_dB = -10 # 3.8) Lower threshold for local decision
|
||||
xi_gu_dB = -5 # 3.9) Upper threshold for global decision
|
||||
xi_gl_dB = -10 # 3.10) Lower threshold for global decision
|
||||
xi_fu_dB = -5 # 3.11) Upper threshold for local decision
|
||||
xi_fl_dB = -10 # 3.12) Lower threshold for local decision
|
||||
xi_mu_dB = 10 # 3.13) Upper threshold for xi_m
|
||||
xi_ml_dB = 0 # 3.14) Lower threshold for xi_m
|
||||
q_max = 0.998 # 3.15) Upper limit constraint
|
||||
|
||||
# 4) Parameters of "Decision-Directed" a Priori SNR Estimate
|
||||
alpha_eta_ref = 0.95 # 4.1) Recursive averaging parameter
|
||||
eta_min_dB = -18 # 4.2) Lower limit constraint
|
||||
|
||||
# 5) Flags
|
||||
broad_flag = 1 # broad band flag # new version
|
||||
tone_flag = 0 # pure tone flag # new version
|
||||
nonstat = 'medium' #Non stationarity # new version
|
||||
|
||||
Fs = Fs_ref
|
||||
M = int(M_ref)
|
||||
Mo = int(Mo_ref)
|
||||
Mno = int(M-Mo)
|
||||
alpha_eta = alpha_eta_ref
|
||||
alpha_xi = alpha_xi_ref
|
||||
|
||||
alpha_d_long = 0.99
|
||||
eta_min = 10**(eta_min_dB/10)
|
||||
G_f = eta_min**0.5 # Gain floor
|
||||
|
||||
|
||||
##b_xi_local = hanning(2*w_xi_local+1)
|
||||
#b_xi_local = b_xi_local/sum(b_xi_local) # normalize the window function
|
||||
b_xi_local = np.array([0, 1, 0])
|
||||
#b_xi_global = hanning(2*w_xi_global+1)
|
||||
#b_xi_global = b_xi_global/sum(b_xi_global) # normalize the window function
|
||||
b_xi_global = np.array([0, 0.000728, 0.002882, 0.006366, 0.011029, 0.016667, 0.023033, 0.029849, 0.036818, 0.043634, 0.050000, 0.055638, 0.060301, 0.063785, 0.065938, 0.066667, 0.065938, 0.063785, 0.060301, 0.055638, 0.050000, 0.043634, 0.036818, 0.029849, 0.023033, 0.016667, 0.011029, 0.006366, 0.002882, 0.000728, 0
|
||||
])
|
||||
|
||||
|
||||
M21 = int(M/2+1)
|
||||
k_u = round(f_u/Fs*M+1) # Upper frequency bin for global decision
|
||||
k_l = round(f_l/Fs*M+1) # Lower frequency bin for global decision
|
||||
k_u = min(k_u,M21)
|
||||
k2_local=round(500/Fs*M+1)
|
||||
k3_local = round(3500/Fs*M+1)
|
||||
|
||||
class SuppressionGain(object):
|
||||
def update(self, features):
|
||||
pass
|
||||
|
||||
class WienerGain(SuppressionGain):
|
||||
def update(self, features):
|
||||
'''
|
||||
ksi : a priori snr
|
||||
'''
|
||||
gain = features.ksi / (1 + features.ksi)
|
||||
return gain
|
||||
|
||||
class OmlsaGain(SuppressionGain):
|
||||
def __init__(self, sample_rate, fft_size):
|
||||
self.fs = sample_rate
|
||||
self.fft_size = fft_size
|
||||
self.M21 = int(fft_size/2+1)
|
||||
self.eta_2term = np.ones(M21)
|
||||
self.xi = np.ones(M21)
|
||||
self.xi_frame = 0
|
||||
self.xi_m_dB = 0
|
||||
|
||||
def update(self, features):
|
||||
Ya2 = features['signal_power']
|
||||
lambda_d = features['noise_power']
|
||||
|
||||
gamma = Ya2 / np.maximum(lambda_d, 1e-10) #post_snr
|
||||
eta = alpha_eta*self.eta_2term + (1-alpha_eta)*np.maximum(gamma-1,0) #prior_snr
|
||||
eta = np.maximum(eta,eta_min)
|
||||
v = gamma*eta/(1+eta)
|
||||
|
||||
# A Priori Probability for Signal-Absence Estimate
|
||||
self.xi = alpha_xi * self.xi + (1-alpha_xi) * eta
|
||||
xi_local = np.convolve(self.xi, b_xi_local)
|
||||
xi_local = xi_local[w_xi_local:self.M21+w_xi_local]
|
||||
xi_global = np.convolve(self.xi, b_xi_global)
|
||||
xi_global = xi_global[w_xi_global:self.M21+w_xi_global]
|
||||
dxi_frame = self.xi_frame
|
||||
self.xi_frame = np.mean(self.xi[k_l:k_u])
|
||||
dxi_frame = self.xi_frame - dxi_frame
|
||||
|
||||
xi_local_dB = np.zeros(len(xi_local))
|
||||
xi_global_dB = np.zeros(len(xi_global))
|
||||
|
||||
for i in range(len(xi_local)) :
|
||||
if xi_local[i] > 0 :
|
||||
xi_local_dB[i] = 10*np.log10(xi_local[i])
|
||||
else :
|
||||
xi_local_dB[i] = -100
|
||||
|
||||
for i in range(len(xi_global)) :
|
||||
if xi_global[i] >0 :
|
||||
xi_global_dB[i] = 10*np.log10(xi_global[i])
|
||||
else :
|
||||
xi_global_dB[i] = -100
|
||||
|
||||
if self.xi_frame >0 :
|
||||
xi_frame_dB = 10*np.log10(self.xi_frame)
|
||||
else :
|
||||
xi_frame_dB = -100
|
||||
|
||||
P_local = np.ones(M21)
|
||||
for idx in range(M21) :
|
||||
if xi_local_dB[idx] <= xi_ll_dB:
|
||||
P_local[idx] = P_min
|
||||
if xi_local_dB[idx] > xi_ll_dB and xi_local_dB[idx] < xi_lu_dB :
|
||||
P_local[idx] = P_min + (xi_local_dB[idx]-xi_ll_dB) / (xi_lu_dB-xi_ll_dB) * (1-P_min)
|
||||
|
||||
P_global = np.ones(M21)
|
||||
for idx in range(M21) :
|
||||
if xi_global_dB[idx] <= xi_gl_dB:
|
||||
P_global[idx] = P_min
|
||||
if xi_global_dB[idx] >xi_gl_dB and xi_global_dB[idx] <xi_gu_dB :
|
||||
P_global[idx] = P_min + (xi_global_dB[idx]-xi_gl_dB)/(xi_gu_dB-xi_gl_dB)*(1-P_min)
|
||||
|
||||
m_P_local = np.mean(P_local[2:(k2_local+k3_local-3)]) # average probability of speech presence
|
||||
if m_P_local < 0.25 :
|
||||
P_local[k2_local:k3_local] = P_min # reset P_local (frequency>500Hz) for low probability of speech presence
|
||||
|
||||
if xi_frame_dB <= xi_fl_dB :
|
||||
P_frame = P_min
|
||||
elif dxi_frame >= 0 :
|
||||
self.xi_m_dB = min(max(xi_frame_dB,xi_ml_dB),xi_mu_dB)
|
||||
P_frame = 1
|
||||
elif xi_frame_dB >= self.xi_m_dB + xi_fu_dB :
|
||||
P_frame = 1
|
||||
elif xi_frame_dB <= self.xi_m_dB + xi_fl_dB :
|
||||
P_frame = P_min
|
||||
else :
|
||||
P_frame = P_min+(xi_frame_dB-self.xi_m_dB-xi_fl_dB)/(xi_fu_dB-xi_fl_dB)*(1-P_min)
|
||||
|
||||
# q=1-P_global.*P_local*P_frame # new version
|
||||
if broad_flag : # new version
|
||||
q = 1 - P_global * P_local * P_frame # new version
|
||||
else : # new version
|
||||
q = 1 - P_local * P_frame ##ok<UNRCH> # new version
|
||||
|
||||
q = np.minimum(q, q_max)
|
||||
gamma = np.zeros(M21)
|
||||
gamma = Ya2 / np.maximum(lambda_d, 1e-10)
|
||||
eta = alpha_eta * self.eta_2term + (1-alpha_eta) * np.maximum(gamma-1,0)
|
||||
eta = np.maximum(eta, eta_min)
|
||||
v = gamma*eta/(1+eta)
|
||||
PH1 = np.zeros(M21)
|
||||
idx = [i for i, v in enumerate(q) if v<0.9]
|
||||
PH1[idx] = 1 / ( 1+q[idx] / (1-q[idx]) * (1+eta[idx]) * np.exp(-v[idx]) )
|
||||
|
||||
# Spectral Gain
|
||||
GH1 = np.ones(M21)
|
||||
|
||||
idx = [i for i, val in enumerate(v) if val>5 ]
|
||||
GH1[idx] = eta[idx] / (1+eta[idx])
|
||||
idx = [i for i, val in enumerate(v) if val<=5 and val>0]
|
||||
GH1[idx] = eta[idx] / (1+eta[idx]) * np.exp(0.5 * expn(1, v[idx]))
|
||||
|
||||
GH0 = G_f
|
||||
|
||||
G = GH1**PH1 * GH0**(1 - PH1)
|
||||
self.eta_2term = GH1**2 * gamma
|
||||
return G
|
||||
|
||||
def get_eta(self):
|
||||
return self.eta_2term
|
Reference in New Issue
Block a user