stream_echo.py 3.94 KB
import sys
import numpy as np
import pyaudio
import librosa
import scipy.signal as signal

RECORD_SECONDS = 5
CHUNK = 1024
RATE = 44100
DELAY = 0.1  # Delay time in seconds
GAIN = 1  # Echo gain (0 to 1)
MAX_FREQ = 3000

input_device_index = 1
output_device_index = 4

# Create buffer for delayed audio data
buffer_size = int(RATE * DELAY)
buffer = np.zeros(buffer_size, dtype=np.int16)

def do_process(in_data, frame_count, time_info, status_flags):
    global buffer
    data = np.frombuffer(in_data, dtype=np.int16)

    def get_max_average_db(data):
        data_float = data.astype(np.float32) 

        # Compute the power spectrogram of the data
        S = librosa.stft(data_float, n_fft=256, hop_length=512)
        S_power = np.abs(S)**2

        # Convert power spectrogram to dB scale
        S_dB = librosa.amplitude_to_db(S_power, ref=np.max)

        # Calculate the average dB level
        avg_dB = np.mean(S_dB)
        max_dB = np.max(S_dB)

        return avg_dB, max_dB

    def get_dominant_freq(data):
        data = data.astype(np.float32) / 32768.0

        # Compute the Fourier transform of the data
        fft_data = np.fft.fft(data)

        # Compute the power spectral density of the data
        psd_data = np.abs(fft_data)**2

        # Define the frequency range of interest
        freqs = np.fft.fftfreq(len(psd_data), d=1/RATE)

        # Compute the power spectrogram on the mel scale
        S = librosa.feature.melspectrogram(y=data, sr=RATE, n_fft=256, hop_length=1024, n_mels=64)

        # Find the frequency bin with the maximum energy in each frame
        max_bin = np.argmax(S, axis=0)

        # Find the dominant frequency in each frame
        dominant_freqs = freqs[max_bin]

        # Compute the median of the dominant frequencies to get the overall dominant frequency
        dominant_freq = np.median(dominant_freqs)

        return dominant_freq

    def add_echo(gain):
        global buffer
        buffer = np.roll(buffer, len(data))
        buffer[-len(data):] = data
        return data + gain * buffer[:len(data)]
    
    def shift_pitch(pitch_shift_factor):
        audio_array = data
        # Resample the audio array to change the pitch
        resampled_array = np.resize(audio_array, int(len(audio_array) / pitch_shift_factor))

        return resampled_array

    def high_filter(param):
        audio_data = data
        # Define the filter parameters
        cutoff_freq = param * MAX_FREQ  # Frequency cutoff for the low-pass filter (in Hz)
        nyquist_freq = 0.5 * RATE  # Nyquist frequency (half of the sampling rate)
        normalized_cutoff = cutoff_freq / nyquist_freq  # Normalized cutoff frequency

        # Design the low-pass filter
        b, a = signal.butter(4, normalized_cutoff, btype='low', analog=False, output='ba')

        # Apply the low-pass filter to the audio data
        filtered_audio = signal.lfilter(b, a, audio_data)

        return filtered_audio

    try:
        freq = get_dominant_freq(data)
        # avg_db, max_db = get_max_average_db(data)
        
        # temp_gain = freq/MAX_FREQ
        # output = add_echo(temp_gain)
        output = shift_pitch(0.5 + freq/MAX_FREQ)
        # output = high_filter(0.5)
        # print(int(freq), int(avg_db), int(max_db))
        return (output.astype(np.int16).tobytes(), pyaudio.paContinue)
    except:
        print("exception occured")
        return data


p = pyaudio.PyAudio()
stream = p.open(format=p.get_format_from_width(2),
                channels=1 if sys.platform == 'darwin' else 2,
                rate=RATE,
                input=True,
                input_device_index=input_device_index,
                output_device_index=output_device_index,
                output=True,
                frames_per_buffer=CHUNK,
                stream_callback=do_process
                )

print('* recording')

stream.start_stream()

while stream.is_active():
    # Do other processing here if necessary
    pass

stream.stop_stream()
stream.close()
p.terminate()