cli_version.py 6.88 KB
import pyaudio
import numpy as np
import scipy.signal as signal
import pydub
import time
import librosa
import tkinter as tk

ORIGIN_SOUND = np.frombuffer(pydub.AudioSegment.from_mp3("./sounds/s1.mp3").raw_data, dtype=np.int16)

# 파라미터 설정
RATE = 44100  # 샘플링 주파수
CHUNK = 1024  # 읽을 샘플의 수
THRESHOLD = 128  # 피크를 검출하기 위한 threshold 값
WIN_SIZE = 1024  # STFT를 적용할 윈도우 사이즈
HOP_SIZE = 512  # STFT에서 윈도우 사이의 거리 (오버랩 사이즈)
DELAY = 0.1  # Delay time in seconds
MAX_FREQ = 10000 # max freq for pitch shifting
MAX_HEIGHT = 10000 # max height for pitch shifting
MAX_DECIBEL = 50 # max decibel for decibel shifting
SOURCE_MODE = "decibel" # height, decibel or frequency
MODE = "low_filter" # low_filter, echo or pitch_shift
SOUND_SIZE = len(ORIGIN_SOUND)  # 음원 길이

sound_idx = 0


# 사용자의 목소리를 duration 초간 녹음.
def get_user_audio(duration):

    frames = []
    p = pyaudio.PyAudio()

    # 카운터 시작
    print("ready for recording...")
    for _ in range(3, 0, -1):
        print(_)
        time.sleep(1)
    print("start...")

    # 실제 녹음 콜백 함수
    def add_to_frame(in_data, frame_count, time_info, status):
        frames.append(np.frombuffer(in_data, dtype=np.int16))
        if(len(frames) < RATE/CHUNK * duration):
            return (in_data, pyaudio.paContinue)
        return (in_data, pyaudio.paComplete)

    # 녹음 진행
    stream = p.open(format=pyaudio.paInt16, channels=1, rate=RATE, input=True, frames_per_buffer=CHUNK, input_device_index=1, stream_callback=add_to_frame)
    
    time.sleep(1)    
    stream.start_stream()

    sound = np.frombuffer(b''.join(frames), dtype=np.int16)

    # stream및 객체들 정리
    stream.stop_stream()
    stream.close()
    p.terminate()

    return sound

if "y" == input("직접 녹음을 하시겠습니까? (y/n) : "):
    ORIGIN_SOUND = get_user_audio(0.2)

sound = ORIGIN_SOUND.copy()

print(type(sound), len(sound))

p = pyaudio.PyAudio()

last_time = 0

# 콜백 함수 정의
def process_audio(in_data, frame_count, time_info, status):
    global buffer
    global sound
    global sound_idx
    global last_time
    
    
    def get_distortion(height, frequency, decibel):
        height = min(height, MAX_HEIGHT) / RATE
        frequency = min(frequency, MAX_FREQ) / MAX_FREQ
        decibel = min(decibel, MAX_DECIBEL) / MAX_DECIBEL

        if SOURCE_MODE == "height":
            param = height
        elif SOURCE_MODE == "frequency":
            param = frequency
        elif SOURCE_MODE == "decibel":
            param = decibel
        else:
            return ORIGIN_SOUND

        if MODE == "pitch_shift":
            return shift_pitch(param)
        elif MODE == "echo":
            return add_echo(param)
        elif MODE == "low_filter":
            return low_filter(param)
        return ORIGIN_SOUND
    
    def add_echo(decay):
        # Create an empty array to store the echoed audio samples
        echoed_samples = np.zeros_like(ORIGIN_SOUND, dtype=np.int16)

        # Calculate the delay in samples
        delay_samples = int(DELAY * 44100)  # Assuming a sample rate of 44100 Hz

        # Apply the echo effect
        for i in range(delay_samples, len(ORIGIN_SOUND)):
            echoed_samples[i] = ORIGIN_SOUND[i] + int(decay * echoed_samples[i - delay_samples])

        return echoed_samples
    
    def shift_pitch(frequency):
        pitch_shift_factor = frequency * 3
        audio_array = ORIGIN_SOUND.copy()
        # Resample the audio array to change the pitch
        resampled_array = librosa.effects.pitch_shift(np.array(audio_array, dtype=np.float32), sr=RATE, n_steps=pitch_shift_factor, bins_per_octave=1)
        return np.array(resampled_array, dtype=np.int16)

    def low_filter(param):
        audio_data = np.array(ORIGIN_SOUND.copy(), dtype=np.float32)
        # Define the filter parameters
        cutoff_freq = param * RATE  # Frequency cutoff for the low-pass filter (in Hz)
        print("cut of below : ", cutoff_freq)
        nyquist_freq = 0.5 * RATE  # Nyquist frequency (half of the sampling rate)
        normalized_cutoff = cutoff_freq / nyquist_freq  # Normalized cutoff frequency

        # Design the low-pass filter
        b, a = signal.butter(4, normalized_cutoff, btype='low', analog=False, output='ba')

        # Apply the low-pass filter to the audio data
        filtered_audio = signal.lfilter(b, a, audio_data)

        return np.array(filtered_audio, dtype=np.int16)

    # 오디오 데이터 변환
    data = np.frombuffer(in_data, dtype=np.int16)

    # STFT 수행
    f, t, Zxx = signal.stft(data, RATE, nperseg=WIN_SIZE, noverlap=HOP_SIZE)

    # 피크 검출
    peaks, _ = signal.find_peaks(np.abs(np.mean(Zxx, axis=1)), height=THRESHOLD, distance=WIN_SIZE)
    # 파라미터 추정
    if len(peaks) > 0 and last_time+0.1 < time_info['current_time']:
        peak_idx = peaks[0]  # 첫 번째 피크 선택
        height = np.abs(Zxx[peak_idx, 0])  # 피크의 높이 추정
        freq = f[peak_idx]  # 피크의 주파수 추정
        amp = np.max(np.abs(data))  # 신호의 진폭 추정
        decibel = np.mean(librosa.amplitude_to_db(np.abs(Zxx)))  # 진폭을 데시벨로 변환
    
        if(decibel > 10) and height > 100:
            last_time = time_info['current_time']
            print("Height: {:.2f}, 주파수: {:.2f}, Amplitude: {:.2f}, Decibel: {:.2f}, time_info {:.2f}".format(height, freq, amp, decibel, time_info['current_time']))
            new_sound = get_distortion(height, freq, decibel)
            if(sound_idx > len(sound)):
                sound = new_sound
                sound_idx = 0
            else:
                mixed_end = min(len(sound), sound_idx + len(new_sound))
                print(mixed_end, sound_idx)
                sound[sound_idx:mixed_end] = new_sound[:mixed_end-sound_idx] + sound[sound_idx:mixed_end]
                if(mixed_end-sound_idx < len(new_sound)):
                    result = np.concatenate((sound, new_sound[mixed_end-sound_idx:]),axis=0)
                    sound = result
    elif len(peaks) > 0:
        last_time = time_info['current_time']

    sound_idx += 1024
    if sound_idx > len(sound):
        sound = ORIGIN_SOUND.copy()
        return (np.zeros(data.shape), pyaudio.paContinue)
    return (sound[sound_idx-1024:sound_idx], pyaudio.paContinue)


# 입력 스트림 열기
stream = p.open(format=p.get_format_from_width(2),
                channels=1,
                rate=RATE,
                input_device_index=1,
                output_device_index=2,
                input=True,
                output=True,
                frames_per_buffer=CHUNK,
                stream_callback=process_audio
                )

# 스트림 시작
stream.start_stream()



# 프로그램 실행 중지 전까지 무한 대기
while stream.is_active():
    pass

# 스트림과 PyAudio 객체 종료
stream.stop_stream()
stream.close()
p.terminate()