low_latency.py 8.02 KB
import pyaudio
import numpy as np
import librosa
import queue
import pydub
from scipy import signal
from scipy.fft import fft, fftfreq
import math
import time

TARGET = "decibel" # frequency, decibel
EFFECT = "pitch_shift" # pitch_shift, low_filter

RATE = 44100
CHUNK = 1024
MAX_FREQ = 40 # max freq for pitch shifting
MAX_AMPLITUDE = 1440735 # max amplitude for pitch shifting
MAX_DECIBEL = 150 # max decibel for decibel shifting
MIN_DECIBEL = 50
EFFECT_LEVEL = 20 # number of effect level

INPUT_DEVICE_INDEX = 0
OUTPUT_DEVICE_INDEX = 1

CUSTOM_AUDIO_DURATION = 0.5 # seconds

# make sound queue, chunk 단위로 들어감
Q = queue.Queue()
Q.put(np.zeros(CHUNK, dtype=np.int16))

sound_idx = 0


# 사용자의 목소리를 duration 초간 녹음.
def get_user_audio(duration):

    frames = []
    p = pyaudio.PyAudio()

    # 카운터 시작
    print("ready for recording...")
    for _ in range(3, 0, -1):
        print(_)
        time.sleep(1)
    print("start...")

    # 실제 녹음 콜백 함수
    def add_to_frame(in_data, frame_count, time_info, status):
        frames.append(np.frombuffer(in_data, dtype=np.int16))
        if(len(frames) < RATE/CHUNK * duration):
            return (in_data, pyaudio.paContinue)
        return (in_data, pyaudio.paComplete)

    # 녹음 진행
    stream = p.open(format=pyaudio.paInt16, channels=1, rate=RATE, input=True, frames_per_buffer=CHUNK, input_device_index=0, stream_callback=add_to_frame)
    
    time.sleep(1)    
    stream.start_stream()

    sound = np.frombuffer(b''.join(frames), dtype=np.int16)

    # stream및 객체들 정리
    stream.stop_stream()
    stream.close()
    p.terminate()

    return sound

class Effector:
    def __init__(self, sound):
        self.sound = sound.copy()
        np.set_printoptions(threshold=np.inf)
        self.sound = np.concatenate((self.sound, np.zeros(CHUNK - len(sound)%CHUNK, dtype=np.int16)))


        self.target = TARGET
        self.effect = EFFECT

        self.echoed_sounds = list()
        for i in range(EFFECT_LEVEL):
            self.echoed_sounds.append(self.make_echo(i/EFFECT_LEVEL))

        self.pitch_shifted_sounds = list()
        for i in range(EFFECT_LEVEL):
            self.pitch_shifted_sounds.append(self.pitch_shift(i/EFFECT_LEVEL))

        self.low_filtered_sounds = list()
        for i in range(EFFECT_LEVEL):
            self.low_filtered_sounds.append(self.low_filter(i/EFFECT_LEVEL))

        self.augumented_sound = {
            "echo": self.echoed_sounds,
            "pitch_shift": self.pitch_shifted_sounds,
            "low_filter": self.low_filtered_sounds
        }

    def change_target(self, target):
        if target not in ["decibel", "frequency", "amplitude"]:
            raise Exception("Invalid target")
        self.target = target

    def change_effect(self, effect):
        if effect not in ["echo", "pitch_shift", "low_filter"]:
            raise Exception("Invalid effect")
        self.effect = effect

    def make_echo(self, decay):
        delay = int(0.01 * RATE)  # Echo delay in samples
        sound = self.sound
        echoed_audio = np.zeros_like(sound, dtype=np.int16)
        echoed_audio[:-delay] = sound[:-delay] + decay * sound[delay:]
        echoed_audio[-delay:] = decay*sound[-delay:]
        return np.array(echoed_audio, dtype=np.int16)

    def pitch_shift(self, shift):
        sound = librosa.effects.pitch_shift(y=np.array(self.sound, np.float32), sr=RATE, n_steps=shift*3-1, bins_per_octave=1)
        return np.array(sound, dtype=np.int16)

    def low_filter(self, param):
        param = max(param, 0.1)
        audio_data = np.array(self.sound.copy(), dtype=np.int16)
        # Define the filter parameters
        cutoff_freq = param * RATE / 8 # Frequency cutoff for the low-pass filter (in Hz)
        # print("cut off over : ", cutoff_freq)
        nyquist_freq = 0.5 * RATE  # Nyquist frequency (half of the sampling rate)
        normalized_cutoff = cutoff_freq / nyquist_freq  # Normalized cutoff frequency

        # Design the low-pass filter
        b, a = signal.butter(4, normalized_cutoff, btype='low', analog=False, output='ba')

        # Apply the low-pass filter to the audio data
        filtered_audio = signal.lfilter(b, a, audio_data)

        return np.array(filtered_audio, dtype=np.int16)
    

    def get_distortion_rate(self, db, main_frequency, amplitude):
        print("current target is ", self.target)
        param = 0
        if self.target == "frequency":
            param = min(MAX_FREQ-1, main_frequency) / MAX_FREQ
        elif self.target == "amplitude":
            param = min(MAX_AMPLITUDE-1, amplitude) / MAX_AMPLITUDE
        elif self.target == "decibel":
            param = min(MAX_DECIBEL-MIN_DECIBEL, db-MIN_DECIBEL) / (MAX_DECIBEL-MIN_DECIBEL)

        param = max(param, 0)
        return param

    def get_decibel_freq_amplitude(self, active_sound):
        samples = active_sound.copy()
        fft_data = fft(samples)
        frequencies = fftfreq(len(samples))

        # Find the index of the main frequency component (excluding the DC component)
        main_freq_index = np.argmax(np.abs(fft_data[1:len(samples)//2]))

        # Calculate the main frequency in Hz
        main_frequency = abs(frequencies[main_freq_index])*1000

        # Calculate the amplitude of the main frequency component
        amplitude = abs(fft_data[main_freq_index])

        # Convert amplitude to decibels (dB)
        db = 20 * math.log10(amplitude)

        return db, main_frequency, amplitude


    def add_to_queue(self, active_sound):
        global Q

        db, main_frequency, amplitude = self.get_decibel_freq_amplitude(active_sound)
        print("touched, db {}, main_frequency {}, amplitude {}".format(db, main_frequency, amplitude))
        param = self.get_distortion_rate(db, main_frequency, amplitude)
        if(self.effect == "echo"):
            print("echo 정도 :", param)
        if(self.effect == "pitch_shift"):
            print("pitch_shift 정도 :", param*3-1,"옥타브 이동")
        if(self.effect == "low_filter"):  
            print("low_filter 정도 :", param*RATE/8,"Hz 이하만 통과")
        for i in range(0, len(self.sound), CHUNK):
            temp_chunk = self.augumented_sound[self.effect][int(param*EFFECT_LEVEL)][i:i+CHUNK]
            Q.put(temp_chunk)

        

# get file
audio_file_path = "./sounds/short_wooAk.mp3"
audio = np.frombuffer(pydub.AudioSegment.from_mp3(audio_file_path).raw_data, dtype=np.int16)


if "y" == input("직접 녹음을 하시겠습니까? (y/n) : "):
    audio = get_user_audio(CUSTOM_AUDIO_DURATION)

# make effect
effector = Effector(audio)

idx = 0
prev_touched = False
pa = pyaudio.PyAudio()

acitve_sound_buffer = np.zeros(0, dtype=np.int16)

def callback(in_data, frame_count, time_info, status):
    global idx, effector, Q, prev_touched, acitve_sound_buffer
    audio_data = np.frombuffer(in_data, dtype=np.int16)

    raw_power = sum([abs(x) for x in audio_data])

    if raw_power > 2**20 and not prev_touched:
        prev_touched = True
        acitve_sound_buffer = audio_data
    
    elif raw_power > 2**20 and prev_touched:
        acitve_sound_buffer = np.concatenate((acitve_sound_buffer, audio_data))
    
    elif prev_touched and raw_power <= 2**20:
        effector.add_to_queue(acitve_sound_buffer)
        acitve_sound_buffer = np.zeros(0, dtype=np.int16)
        prev_touched = False
    
    elif not prev_touched and raw_power <= 2**20:
        prev_touched = False
        acitve_sound_buffer = np.zeros(0, dtype=np.int16)

    if Q.qsize() == 1:
        Q.put(np.zeros(CHUNK, dtype=np.int16))
    return (Q.get(), pyaudio.paContinue)

stream = pa.open(format=pa.get_format_from_width(2),
                channels=1,
                rate=RATE,
                input_device_index=INPUT_DEVICE_INDEX,
                output_device_index=OUTPUT_DEVICE_INDEX,
                input=True,
                output=True,
                frames_per_buffer=CHUNK,
                stream_callback=callback
                )
stream.start_stream()
# keep the stream running for a few seconds

while stream.is_active():
    pass

stream.close()
pa.terminate()