low_latency.py
7.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
import pyaudio
import numpy as np
import librosa
import queue
import pydub
from scipy import signal
from scipy.fft import fft, fftfreq
import math
import time
RATE = 44100
CHUNK = 1024
MAX_FREQ = 40 # max freq for pitch shifting
MAX_AMPLITUDE = 10000 # max amplitude for pitch shifting
MAX_DECIBEL = 50 # max decibel for decibel shifting
EFFECT_LEVEL = 20 # number of effect level
INPUT_DEVICE_INDEX = 0
OUTPUT_DEVICE_INDEX = 1
CUSTOM_AUDIO_DURATION = 0.5 # seconds
# make sound queue, chunk 단위로 들어감
Q = queue.Queue()
Q.put(np.zeros(CHUNK, dtype=np.int16))
sound_idx = 0
# 사용자의 목소리를 duration 초간 녹음.
def get_user_audio(duration):
frames = []
p = pyaudio.PyAudio()
# 카운터 시작
print("ready for recording...")
for _ in range(3, 0, -1):
print(_)
time.sleep(1)
print("start...")
# 실제 녹음 콜백 함수
def add_to_frame(in_data, frame_count, time_info, status):
frames.append(np.frombuffer(in_data, dtype=np.int16))
if(len(frames) < RATE/CHUNK * duration):
return (in_data, pyaudio.paContinue)
return (in_data, pyaudio.paComplete)
# 녹음 진행
stream = p.open(format=pyaudio.paInt16, channels=1, rate=RATE, input=True, frames_per_buffer=CHUNK, input_device_index=0, stream_callback=add_to_frame)
time.sleep(1)
stream.start_stream()
sound = np.frombuffer(b''.join(frames), dtype=np.int16)
# stream및 객체들 정리
stream.stop_stream()
stream.close()
p.terminate()
return sound
class Effector:
def __init__(self, sound):
self.sound = sound.copy()
np.set_printoptions(threshold=np.inf)
self.sound = np.concatenate((self.sound, np.zeros(CHUNK - len(sound)%CHUNK, dtype=np.int16)))
self.target = "frequency"
self.effect = "pitch_shift"
self.echoed_sounds = list()
for i in range(EFFECT_LEVEL):
self.echoed_sounds.append(self.make_echo(i/EFFECT_LEVEL))
self.pitch_shifted_sounds = list()
for i in range(EFFECT_LEVEL):
self.pitch_shifted_sounds.append(self.pitch_shift(i/EFFECT_LEVEL))
self.low_filtered_sounds = list()
for i in range(EFFECT_LEVEL):
self.low_filtered_sounds.append(self.low_filter(i/EFFECT_LEVEL))
self.augumented_sound = {
"echo": self.echoed_sounds,
"pitch_shift": self.pitch_shifted_sounds,
"low_filter": self.low_filtered_sounds
}
def change_target(self, target):
if target not in ["decibel", "frequency", "amplitude"]:
raise Exception("Invalid target")
self.target = target
def change_effect(self, effect):
if effect not in ["echo", "pitch_shift", "low_filter"]:
raise Exception("Invalid effect")
self.effect = effect
def make_echo(self, decay):
delay = int(0.01 * RATE) # Echo delay in samples
sound = self.sound
echoed_audio = np.zeros_like(sound, dtype=np.int16)
echoed_audio[:-delay] = sound[:-delay] + decay * sound[delay:]
echoed_audio[-delay:] = decay*sound[-delay:]
return np.array(echoed_audio, dtype=np.int16)
def pitch_shift(self, shift):
sound = librosa.effects.pitch_shift(y=np.array(self.sound, np.float32), sr=RATE, n_steps=shift*3, bins_per_octave=1)
return np.array(sound, dtype=np.int16)
def low_filter(self, param):
param = max(param, 0.1)
audio_data = np.array(self.sound.copy(), dtype=np.int16)
# Define the filter parameters
cutoff_freq = param * RATE / 8 # Frequency cutoff for the low-pass filter (in Hz)
# print("cut off over : ", cutoff_freq)
nyquist_freq = 0.5 * RATE # Nyquist frequency (half of the sampling rate)
normalized_cutoff = cutoff_freq / nyquist_freq # Normalized cutoff frequency
# Design the low-pass filter
b, a = signal.butter(4, normalized_cutoff, btype='low', analog=False, output='ba')
# Apply the low-pass filter to the audio data
filtered_audio = signal.lfilter(b, a, audio_data)
return np.array(filtered_audio, dtype=np.int16)
def get_distortion_rate(self, db, main_frequency, amplitude):
print("current target is ", self.target)
param = 0
print(MAX_FREQ, main_frequency)
if self.target == "frequency":
param = min(MAX_FREQ-1, main_frequency) / MAX_FREQ
elif self.target == "amplitude":
param = min(MAX_AMPLITUDE-1, amplitude) / MAX_AMPLITUDE
elif self.target == "decibel":
param = min(MAX_DECIBEL-1, db) / MAX_DECIBEL
param = max(param, 0)
return param
def get_decibel_freq_amplitude(self, active_sound):
samples = active_sound.copy()
fft_data = fft(samples)
frequencies = fftfreq(len(samples))
# Find the index of the main frequency component (excluding the DC component)
main_freq_index = np.argmax(np.abs(fft_data[1:len(samples)//2]))
# Calculate the main frequency in Hz
main_frequency = abs(frequencies[main_freq_index])*1000
# Calculate the amplitude of the main frequency component
amplitude = abs(fft_data[main_freq_index])
# Convert amplitude to decibels (dB)
db = 20 * math.log10(amplitude)
return db, main_frequency, amplitude
def add_to_queue(self, active_sound):
global Q
db, main_frequency, amplitude = self.get_decibel_freq_amplitude(active_sound)
print("touched, db {}, main_frequency {}, amplitude {}".format(db, main_frequency, amplitude))
param = self.get_distortion_rate(db, main_frequency, amplitude)
print("param : ", param)
for i in range(0, len(self.sound), CHUNK):
temp_chunk = self.augumented_sound[self.effect][int(param*EFFECT_LEVEL)][i:i+CHUNK]
Q.put(temp_chunk)
# get file
audio_file_path = "./sounds/short_wooAk.mp3"
audio = np.frombuffer(pydub.AudioSegment.from_mp3(audio_file_path).raw_data, dtype=np.int16)
if "y" == input("직접 녹음을 하시겠습니까? (y/n) : "):
audio = get_user_audio(CUSTOM_AUDIO_DURATION)
# make effect
effector = Effector(audio)
idx = 0
prev_touched = False
pa = pyaudio.PyAudio()
acitve_sound_buffer = np.zeros(0, dtype=np.int16)
def callback(in_data, frame_count, time_info, status):
global idx, effector, Q, prev_touched, acitve_sound_buffer
audio_data = np.frombuffer(in_data, dtype=np.int16)
raw_power = sum([abs(x) for x in audio_data])
if raw_power > 2**20 and not prev_touched:
prev_touched = True
acitve_sound_buffer = audio_data
elif raw_power > 2**20 and prev_touched:
acitve_sound_buffer = np.concatenate((acitve_sound_buffer, audio_data))
elif prev_touched and raw_power <= 2**20:
effector.add_to_queue(acitve_sound_buffer)
acitve_sound_buffer = np.zeros(0, dtype=np.int16)
prev_touched = False
elif not prev_touched and raw_power <= 2**20:
prev_touched = False
acitve_sound_buffer = np.zeros(0, dtype=np.int16)
if Q.qsize() == 1:
Q.put(np.zeros(CHUNK, dtype=np.int16))
else:
print(Q.qsize())
return (Q.get(), pyaudio.paContinue)
stream = pa.open(format=pa.get_format_from_width(2),
channels=1,
rate=RATE,
input_device_index=INPUT_DEVICE_INDEX,
output_device_index=OUTPUT_DEVICE_INDEX,
input=True,
output=True,
frames_per_buffer=CHUNK,
stream_callback=callback
)
stream.start_stream()
# keep the stream running for a few seconds
while stream.is_active():
pass
stream.close()
pa.terminate()