find_peak copy.py 7.82 KB
import pyaudio
import numpy as np
import scipy.signal as signal
import pydub
import time
import librosa
import tkinter as tk

ORIGIN_SOUND = np.frombuffer(pydub.AudioSegment.from_mp3("./sounds/ping.mp3").raw_data, dtype=np.int16)

# 파라미터 설정
RATE = 44100  # 샘플링 주파수
CHUNK = 1024  # 읽을 샘플의 수
THRESHOLD = 256  # 피크를 검출하기 위한 threshold 값
WIN_SIZE = 1024  # STFT를 적용할 윈도우 사이즈
HOP_SIZE = 512  # STFT에서 윈도우 사이의 거리 (오버랩 사이즈)
DELAY = 0.1  # Delay time in seconds
MAX_FREQ = 3000 # max freq for pitch shifting
MAX_HEIGHT = 2000 # max height for pitch shifting

sound_idx = 0

window = tk.Tk()
window.title("Sound Effect")
window.geometry("640x400+100+100")
window.resizable(False, False)

info_text = tk.StringVar()
info_text.set("welcome! please press record button.")

info_label = tk.Button(window, textvariable=info_text, foreground="black", background="white")
info_label.pack()

def set_source_mode(mode):
    global SOURCE_MODE
    SOURCE_MODE = mode

# 사용자의 목소리를 duration 초간 녹음.
def get_user_audio(duration):
    global info_text, info_label
    frames = []
    p = pyaudio.PyAudio()

    # 카운터 시작

    info_text.set("ready for recording...")
    for _ in range(3, 0, -1):
        info_text.set(str(_))
        time.sleep(1)
    info_text.set("start...")

    # 실제 녹음 콜백 함수
    def add_to_frame(in_data, frame_count, time_info, status):
        frames.append(np.frombuffer(in_data, dtype=np.int16))
        if(len(frames) < RATE/CHUNK * duration):
            return (in_data, pyaudio.paContinue)
        return (in_data, pyaudio.paComplete)

    # 녹음 진행
    stream = p.open(format=pyaudio.paInt16, channels=1, rate=RATE, input=True, frames_per_buffer=CHUNK, input_device_index=1, stream_callback=add_to_frame)
    
    time.sleep(1)    
    stream.start_stream()

    sound = np.frombuffer(b''.join(frames), dtype=np.int16)

    # stream및 객체들 정리
    stream.stop_stream()
    stream.close()
    p.terminate()

    return sound

def record():
    global ORIGIN_SOUND
    global SOURCE_MODE
    ORIGIN_SOUND = get_user_audio(0.5)
    SOURCE_MODE = "decibel" # decibel or frequency

def start():
    global MODE, SOUND_SIZE, sound_idx, sound
    MODE = "high_filter" # echo or pitch_shift
    SOUND_SIZE = len(ORIGIN_SOUND)  # 음원 길이

    sound = ORIGIN_SOUND.copy()

    print(type(sound), len(sound))

    p = pyaudio.PyAudio()

    last_frame = 0

    # 콜백 함수 정의
    def process_audio(in_data, frame_count, time_info, status):
        
        
        def get_distortion(height, frequency):
            height = min(height, MAX_HEIGHT) / MAX_HEIGHT
            frequency = min(frequency, MAX_FREQ) / MAX_FREQ

            if SOURCE_MODE == "decibel":
                param = height
            elif SOURCE_MODE == "frequency":
                param = frequency
            else:
                return ORIGIN_SOUND

            if MODE == "pitch_shift":
                return shift_pitch(param)
            elif MODE == "echo":
                return add_echo(param)
            elif MODE == "low_filter":
                return low_filter(param)
            else:
                return ORIGIN_SOUND
        
        def add_echo(decay):
            # Create an empty array to store the echoed audio samples
            echoed_samples = np.zeros_like(ORIGIN_SOUND, dtype=np.int16)

            # Calculate the delay in samples
            delay_samples = int(DELAY * 44100)  # Assuming a sample rate of 44100 Hz

            # Apply the echo effect
            for i in range(delay_samples, len(ORIGIN_SOUND)):
                echoed_samples[i] = ORIGIN_SOUND[i] + int(decay * echoed_samples[i - delay_samples])

            return echoed_samples
        
        def shift_pitch(frequency):
            pitch_shift_factor = frequency
            audio_array = ORIGIN_SOUND.copy()
            # Resample the audio array to change the pitch
            resampled_array = librosa.effects.pitch_shift(np.array(audio_array, dtype=np.float32), sr=RATE, n_steps=pitch_shift_factor, bins_per_octave=1)
            return np.array(resampled_array, dtype=np.int16)

        def low_filter(param):
            audio_data = data
            # Define the filter parameters
            cutoff_freq = param * MAX_FREQ  # Frequency cutoff for the low-pass filter (in Hz)
            nyquist_freq = 0.5 * RATE  # Nyquist frequency (half of the sampling rate)
            normalized_cutoff = cutoff_freq / nyquist_freq  # Normalized cutoff frequency

            # Design the low-pass filter
            b, a = signal.butter(4, normalized_cutoff, btype='low', analog=False, output='ba')

            # Apply the low-pass filter to the audio data
            filtered_audio = signal.lfilter(b, a, audio_data)

            return filtered_audio

        # 오디오 데이터 변환
        data = np.frombuffer(in_data, dtype=np.int16)

        # STFT 수행
        f, t, Zxx = signal.stft(data, RATE, nperseg=WIN_SIZE, noverlap=HOP_SIZE)

        # 피크 검출
        peaks, _ = signal.find_peaks(np.abs(np.mean(Zxx, axis=1)), height=THRESHOLD, distance=WIN_SIZE)
        # 파라미터 추정
        if len(peaks) > 0 and last_frame+1 != frame_count:
            last_frame = frame_count
            peak_idx = peaks[0]  # 첫 번째 피크 선택
            height = np.abs(Zxx[peak_idx, 0])  # 피크의 높이 추정
            freq = f[peak_idx]  # 피크의 주파수 추정
            amp = np.max(np.abs(data))  # 신호의 진폭 추정
            decibel = np.mean(librosa.amplitude_to_db(np.abs(Zxx)))  # 진폭을 데시벨로 변환
        
            if(decibel > 20):
                print("Height: {:.2f}, 주파수: {:.2f}, Amplitude: {:.2f}, Decibel: {:.2f}".format(height, freq, amp, decibel))
                new_sound = get_distortion(height, freq)
                if(sound_idx > len(sound)):
                    sound_idx = 0
                else:
                    mixed_end = min(len(sound), sound_idx + len(new_sound))
                    print(mixed_end, sound_idx)
                    sound[sound_idx:mixed_end] = new_sound[:mixed_end-sound_idx] + sound[sound_idx:mixed_end]
                    if(mixed_end-sound_idx < len(new_sound)):
                        result = np.concatenate((sound, new_sound[mixed_end-sound_idx:]),axis=0)
                        sound = result
        elif len(peaks) > 0:
            last_frame = frame_count

        sound_idx += 1024
        if sound_idx > len(sound):
            sound = ORIGIN_SOUND.copy()
            return (np.zeros(data.shape), pyaudio.paContinue)
        return (sound[sound_idx-1024:sound_idx], pyaudio.paContinue)


    # 입력 스트림 열기
    stream = p.open(format=p.get_format_from_width(2),
                    channels=1,
                    rate=RATE,
                    input_device_index=1,
                    output_device_index=2,
                    input=True,
                    output=True,
                    frames_per_buffer=CHUNK,
                    stream_callback=process_audio
                    )

    # 스트림 시작
    stream.start_stream()



    # 프로그램 실행 중지 전까지 무한 대기
    while stream.is_active():
        pass


    # 스트림과 PyAudio 객체 종료
    stream.stop_stream()
    stream.close()
    p.terminate()


record_button = tk.Button(window, text="Record", width=10, height=2, command=lambda: record())
record_button.pack()

decibel_button = tk.Button(window, text="Decibel", width=10, height=2, command=lambda: set_source_mode("decibel"))
decibel_button.pack()

frequency_button = tk.Button(window, text="Frequency", width=10, height=2, command = lambda: set_source_mode("frequency"))
frequency_button.pack()

start_button = tk.Button(window, text="Start", width=10, height=2, command=lambda: start())
start_button.pack()


window.mainloop()