pythonlibrosaaudio-fingerprinting

How to find out how many times a specific audio repeats in another longer audio?


I have an example of an audio ad and a recording of a radio stream. I want to find out how many times this specific ad repeats in this stream. Tried to use audio fingerprinting and librosa library. But my command seems to run forever, I've already waited for 30 minutes and it is still running. Can you please say what is wrong with my code? How can I optimize it?

import numpy as np
import librosa

def count_repeats(ref_file, longer_file):
    # Load the reference and longer audio signals
    ref_audio, ref_sr = librosa.load(ref_file, sr=None, mono=True)
    longer_audio, longer_sr = librosa.load(longer_file, sr=None, mono=True)

    # Compute the cross-correlation between the reference and longer audio signals
    corr = np.correlate(ref_audio, longer_audio, mode='full')

    # Find the time lag that maximizes the cross-correlation
    lag = np.argmax(corr) - len(ref_audio) + 1

    # Compute the duration of the reference and longer audio signals
    ref_duration = len(ref_audio) / ref_sr
    longer_duration = len(longer_audio) / longer_sr

    # Compute the number of repeats based on the time lag and the duration of the audio signals
    repeats = int(np.floor((longer_duration - lag) / ref_duration))

    return repeats

ref_file = 'house_ad.mp3'
longer_file = 'long_audio.mp3'
repeats = count_repeats(ref_file, longer_file)
print(f'Number of repeats: {repeats}')


Solution

  • The reason your function is very slow probably that cross correlation is a O(n**2) operation, and with audio sample-rates n is very large. With a 60 second and a 60 minute, it is an estimated 105*10e12 operations.

    And while cross-correlation is conceptually a valid approach for comparing a signal against a template it is quite fragile - as even the tiniest change in the audio will throw it off.

    A much more robust and also more computationally efficient approach is to do matching in another feature space. For audio a good starting point may be MFCC.

    A basic but very effective method of signal matching is described in DiagonalMatching from the book Fundamentals of Music Processing Using Python and Jupyter Notebooks (FMP) by Meinard Müller.

    Below follows complete code following this method. On your example, it manages to successfully identify the 3 locations of the provided clip at the correct locations.

    NOTE: This approach assumes that the audio clip being queried for is very similar each time it appears. But if there is considerable variation in the clip, or it or occurs together with other sounds et.c, more advanced approaches will be needed.

    The complete notebook can also be found here.

    import math
    
    import numpy
    import pandas
    import librosa
    
    import seaborn
    from matplotlib import pyplot as plt
    
    np = numpy # code from FMP uses this
    
    def normalize_feature_sequence(X, norm='2', threshold=0.0001, v=None):
        """Normalizes the columns of a feature sequence
    
        Notebook: C3/C3S1_FeatureNormalization.ipynb
    
        Args:
            X (np.ndarray): Feature sequence
            norm (str): The norm to be applied. '1', '2', 'max' or 'z' (Default value = '2')
            threshold (float): An threshold below which the vector ``v`` used instead of normalization
                (Default value = 0.0001)
            v (float): Used instead of normalization below ``threshold``. If None, uses unit vector for given norm
                (Default value = None)
    
        Returns:
            X_norm (np.ndarray): Normalized feature sequence
        """
        assert norm in ['1', '2', 'max', 'z']
    
        K, N = X.shape
        X_norm = np.zeros((K, N))
    
        if norm == '2':
            if v is None:
                v = np.ones(K, dtype=np.float64) / np.sqrt(K)
            for n in range(N):
                s = np.sqrt(np.sum(X[:, n] ** 2))
                if s > threshold:
                    X_norm[:, n] = X[:, n] / s
                else:
                    X_norm[:, n] = v
        else:
            raise ValueError("Norm type not supported")
    
        return X_norm
    
    def compute_features(audio, sr, hop_length=512, n_mfcc=13, n_fft=None):
        
        if n_fft is None:
            n_fft = next_power_of_2(hop_length)
        
        mfcc = librosa.feature.mfcc(y=audio, sr=sr, hop_length=hop_length, n_mfcc=n_mfcc)
        # Normalize using Euclidean norm - as the diagonal matching code expects it
        mfcc = normalize_feature_sequence(mfcc)
        
        return mfcc
    
    
    def cost_matrix_dot(X, Y):
        """Computes cost matrix via dot product
    
        Notebook: C7/C7S2_DiagonalMatching.ipynb
    
        Args:
            X (np.ndarray): First sequence (K x N matrix)
            Y (np.ndarray): Second sequence (K x M matrix)
    
        Returns:
            C (np.ndarray): Cost matrix
        """
        return 1 - np.dot(X.T, Y)
    
    def matching_function_diag(C, cyclic=False):
        """Computes diagonal matching function
    
        Notebook: C7/C7S2_DiagonalMatching.ipynb
    
        Args:
            C (np.ndarray): Cost matrix
            cyclic (bool): If "True" then matching is done cyclically (Default value = False)
    
        Returns:
            Delta (np.ndarray): Matching function
        """
        N, M = C.shape
        assert N <= M, "N <= M is required"
        Delta = C[0, :]
        for n in range(1, N):
            Delta = Delta + np.roll(C[n, :], -n)
        Delta = Delta / N
        if cyclic is False:
            Delta[M-N+1:M] = np.inf
        return Delta
    
    def mininma_from_matching_function(Delta, rho=2, tau=0.2, num=None):
        """Derives local minima positions of matching function in an iterative fashion
    
        Notebook: C7/C7S2_DiagonalMatching.ipynb
    
        Args:
            Delta (np.ndarray): Matching function
            rho (int): Parameter to exclude neighborhood of a matching position for subsequent matches (Default value = 2)
            tau (float): Threshold for maximum Delta value allowed for matches (Default value = 0.2)
            num (int): Maximum number of matches (Default value = None)
    
        Returns:
            pos (np.ndarray): Array of local minima
        """
        Delta_tmp = numpy.array(Delta).copy()
        M = len(Delta)
        pos = []
        num_pos = 0
        rho = int(rho)
        if num is None:
            num = M
        while num_pos < num and np.sum(Delta_tmp < tau) > 0:
            m = np.argmin(Delta_tmp)
            #print(Delta_tmp.shape)
            #print('argmin', m, Delta_tmp[int(m)])
            pos.append(m)
            num_pos += 1
            # exclude this region from candidate minimums
            s = max(0, m - rho)
            e = min(m + rho, M)
            #print(s, e)
            Delta_tmp[s:e] = np.inf
        pos = np.array(pos).astype(int)
        return pos
    
    def next_power_of_2(x):
        return 2**(math.ceil(math.log(x, 2)))
    
    def plot_results(scores, threshold=None, events=None):
            
        fig, ax = plt.subplots(1, figsize=(30, 5))
        ax.plot(scores.reset_index()['time'], scores['distance'])
    
        if threshold is not None:
            ax.axhline(threshold, ls='--', alpha=0.5, color='black')
            
        if events is not None:
            for idx, e in events.iterrows():
                ax.axvspan(e['start'], e['end'], color='green', alpha=0.5)
        
        import matplotlib.ticker
        x_formatter = matplotlib.ticker.FuncFormatter(ticker_format_minutes_seconds)
        ax.xaxis.set_major_formatter(x_formatter)
        ax.xaxis.set_major_locator(matplotlib.ticker.MultipleLocator(base=10*60))
        ax.xaxis.set_minor_locator(matplotlib.ticker.MultipleLocator(base=60))
        ax.grid(axis='x')
        ax.grid(axis='x', which='minor')
    
        return fig
        
    def ticker_format_minutes_seconds(x, pos):
        hours = int(x//3600)
        minutes = int((x%3600)//60)
        seconds = int(x%60)
    
        return "{:02d}:{:02d}:{:02d}".format(hours, minutes, seconds)
    
    
    def find_audio(long, short, sr=16000, time_resolution=0.500, max_matches=10, score_threshold=0.1):
    
        # distance between frames in feature representation [seconds]
    
        hop_length = int(time_resolution*samplerate)
        
        # compute features for the audio
        query = compute_features(short, sr=sr, hop_length=hop_length)
        clip = compute_features(long, sr=sr, hop_length=hop_length)
        
    
        # Compute cost matrix and matching function
        C = cost_matrix_dot(query, clip)
        Delta = matching_function_diag(C)
        
        scores = pandas.DataFrame({
            'time': librosa.times_like(Delta, hop_length=hop_length, sr=samplerate),
            'distance': Delta,
        }).set_index('time')
        
        # convert to discrete
        match_idx = mininma_from_matching_function(scores['distance'].values,
                                                   num=max_matches, rho=query.shape[1], tau=score_threshold)
    
        matches = scores.reset_index().loc[match_idx]
        matches = matches.rename(columns={'time': 'start'})
        matches['end'] = matches['start'] + (query.shape[1] * time_resolution)
        matches = matches.reset_index()
        
        return scores, matches
    

    Here is how to use it

    # configuration
    samplerate = 16000
    threshold = 0.02
    max_matches = 3 
    
    short, sr = librosa.load('house_ad.m4a', sr=samplerate)
    long, sr = librosa.load('long_audio.m4a', sr=samplerate)
    
    scores, matches = find_audio(long, short, max_matches=max_matches, score_threshold=threshold)
    
    # print results
    for idx, m in matches.iterrows():
        td = pandas.Timedelta(m['start'], unit='s').round('1s').to_pytimedelta()
        print(f'match {idx}: {td}')
    
    # visualize results
    fig = plot_results(scores, events=matches, threshold=threshold)
    fig.savefig('results.png')
    
    

    It should output

    match 0: 0:38:26
    match 1: 0:03:47
    match 2: 0:51:35
    

    and give a plot like this. enter image description here