stream_echo.py
3.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import sys
import numpy as np
import pyaudio
import librosa
import scipy.signal as signal
RECORD_SECONDS = 5
CHUNK = 1024
RATE = 44100
DELAY = 0.1 # Delay time in seconds
GAIN = 1 # Echo gain (0 to 1)
MAX_FREQ = 3000
input_device_index = 1
output_device_index = 4
# Create buffer for delayed audio data
buffer_size = int(RATE * DELAY)
buffer = np.zeros(buffer_size, dtype=np.int16)
def do_process(in_data, frame_count, time_info, status_flags):
global buffer
data = np.frombuffer(in_data, dtype=np.int16)
def get_max_average_db(data):
data_float = data.astype(np.float32)
# Compute the power spectrogram of the data
S = librosa.stft(data_float, n_fft=256, hop_length=512)
S_power = np.abs(S)**2
# Convert power spectrogram to dB scale
S_dB = librosa.amplitude_to_db(S_power, ref=np.max)
# Calculate the average dB level
avg_dB = np.mean(S_dB)
max_dB = np.max(S_dB)
return avg_dB, max_dB
def get_dominant_freq(data):
data = data.astype(np.float32) / 32768.0
# Compute the Fourier transform of the data
fft_data = np.fft.fft(data)
# Compute the power spectral density of the data
psd_data = np.abs(fft_data)**2
# Define the frequency range of interest
freqs = np.fft.fftfreq(len(psd_data), d=1/RATE)
# Compute the power spectrogram on the mel scale
S = librosa.feature.melspectrogram(y=data, sr=RATE, n_fft=256, hop_length=1024, n_mels=64)
# Find the frequency bin with the maximum energy in each frame
max_bin = np.argmax(S, axis=0)
# Find the dominant frequency in each frame
dominant_freqs = freqs[max_bin]
# Compute the median of the dominant frequencies to get the overall dominant frequency
dominant_freq = np.median(dominant_freqs)
return dominant_freq
def add_echo(gain):
global buffer
buffer = np.roll(buffer, len(data))
buffer[-len(data):] = data
return data + gain * buffer[:len(data)]
def shift_pitch(pitch_shift_factor):
audio_array = data
# Resample the audio array to change the pitch
resampled_array = np.resize(audio_array, int(len(audio_array) / pitch_shift_factor))
return resampled_array
def high_filter(param):
audio_data = data
# Define the filter parameters
cutoff_freq = param * MAX_FREQ # Frequency cutoff for the low-pass filter (in Hz)
nyquist_freq = 0.5 * RATE # Nyquist frequency (half of the sampling rate)
normalized_cutoff = cutoff_freq / nyquist_freq # Normalized cutoff frequency
# Design the low-pass filter
b, a = signal.butter(4, normalized_cutoff, btype='low', analog=False, output='ba')
# Apply the low-pass filter to the audio data
filtered_audio = signal.lfilter(b, a, audio_data)
return filtered_audio
try:
freq = get_dominant_freq(data)
# avg_db, max_db = get_max_average_db(data)
# temp_gain = freq/MAX_FREQ
# output = add_echo(temp_gain)
output = shift_pitch(0.5 + freq/MAX_FREQ)
# output = high_filter(0.5)
# print(int(freq), int(avg_db), int(max_db))
return (output.astype(np.int16).tobytes(), pyaudio.paContinue)
except:
print("exception occured")
return data
p = pyaudio.PyAudio()
stream = p.open(format=p.get_format_from_width(2),
channels=1 if sys.platform == 'darwin' else 2,
rate=RATE,
input=True,
input_device_index=input_device_index,
output_device_index=output_device_index,
output=True,
frames_per_buffer=CHUNK,
stream_callback=do_process
)
print('* recording')
stream.start_stream()
while stream.is_active():
# Do other processing here if necessary
pass
stream.stop_stream()
stream.close()
p.terminate()