pythonjsonnumpyaudioaudio-fingerprinting

Record and Recognize music from URL


I am using an open source audio fingerprinting platform in python DeJavu that can recognize music from disk and from microphone. I have tested the recognition from disk and it is amazing. 100% accuracy.

I seek assistance on how to add a class "BroadcastRecognizer" This will recognize music from a URL online stream example online radio [http://bbcmedia.ic.llnwd.net/stream/bbcmedia_radio1_mf_p] Because the music in the radio stream is constantly changing, would like to set it to recognize after every 10 seconds.

Here is the recognize.py

import dejavu.fingerprint as fingerprint
import dejavu.decoder as decoder
import numpy as np
import pyaudio
import time


class BaseRecognizer(object):

def __init__(self, dejavu):
    self.dejavu = dejavu
    self.Fs = fingerprint.DEFAULT_FS

def _recognize(self, *data):
    matches = []
    for d in data:
        matches.extend(self.dejavu.find_matches(d, Fs=self.Fs))
    return self.dejavu.align_matches(matches)

def recognize(self):
    pass  # base class does nothing


class FileRecognizer(BaseRecognizer):
def __init__(self, dejavu):
    super(FileRecognizer, self).__init__(dejavu)

def recognize_file(self, filename):
    frames, self.Fs, file_hash = decoder.read(filename, self.dejavu.limit)

    t = time.time()
    match = self._recognize(*frames)
    t = time.time() - t

    if match:
        match['match_time'] = t

    return match

def recognize(self, filename):
    return self.recognize_file(filename)


class MicrophoneRecognizer(BaseRecognizer):
default_chunksize   = 8192
default_format      = pyaudio.paInt16
default_channels    = 2
default_samplerate  = 44100

def __init__(self, dejavu):
    super(MicrophoneRecognizer, self).__init__(dejavu)
    self.audio = pyaudio.PyAudio()
    self.stream = None
    self.data = []
    self.channels = MicrophoneRecognizer.default_channels
    self.chunksize = MicrophoneRecognizer.default_chunksize
    self.samplerate = MicrophoneRecognizer.default_samplerate
    self.recorded = False

def start_recording(self, channels=default_channels,
                    samplerate=default_samplerate,
                    chunksize=default_chunksize):
    self.chunksize = chunksize
    self.channels = channels
    self.recorded = False
    self.samplerate = samplerate

    if self.stream:
        self.stream.stop_stream()
        self.stream.close()

    self.stream = self.audio.open(
        format=self.default_format,
        channels=channels,
        rate=samplerate,
        input=True,
        frames_per_buffer=chunksize,
    )

    self.data = [[] for i in range(channels)]

def process_recording(self):
    data = self.stream.read(self.chunksize)
    nums = np.fromstring(data, np.int16)
    for c in range(self.channels):
        self.data[c].extend(nums[c::self.channels])

def stop_recording(self):
    self.stream.stop_stream()
    self.stream.close()
    self.stream = None
    self.recorded = True

def recognize_recording(self):
    if not self.recorded:
        raise NoRecordingError("Recording was not complete/begun")
    return self._recognize(*self.data)

def get_recorded_time(self):
    return len(self.data[0]) / self.rate

def recognize(self, seconds=10):
    self.start_recording()
    for i in range(0, int(self.samplerate / self.chunksize
                          * seconds)):
        self.process_recording()
    self.stop_recording()
    return self.recognize_recording()


class NoRecordingError(Exception):
pass

Here is the dejavu.py

import os``
import sys
import json
import warnings
import argparse

from dejavu import Dejavu
from dejavu.recognize import FileRecognizer
from dejavu.recognize import MicrophoneRecognizer
from argparse import RawTextHelpFormatter

warnings.filterwarnings("ignore")

DEFAULT_CONFIG_FILE = "dejavu.cnf.SAMPLE"


def init(configpath):
""" 
Load config from a JSON file
"""
try:
    with open(configpath) as f:
        config = json.load(f)
except IOError as err:
    print("Cannot open configuration: %s. Exiting" % (str(err)))
    sys.exit(1)

# create a Dejavu instance
return Dejavu(config)


if __name__ == '__main__':
parser = argparse.ArgumentParser(
    description="Dejavu: Audio Fingerprinting library",
    formatter_class=RawTextHelpFormatter)
parser.add_argument('-c', '--config', nargs='?',
                    help='Path to configuration file\n'
                         'Usages: \n'
                         '--config /path/to/config-file\n')
parser.add_argument('-f', '--fingerprint', nargs='*',
                    help='Fingerprint files in a directory\n'
                         'Usages: \n'
                         '--fingerprint /path/to/directory extension\n'
                         '--fingerprint /path/to/directory')
parser.add_argument('-r', '--recognize', nargs=2,
                    help='Recognize what is '
                         'playing through the microphone\n'
                         'Usage: \n'
                         '--recognize mic number_of_seconds \n'
                         '--recognize file path/to/file \n')
args = parser.parse_args()

if not args.fingerprint and not args.recognize:
    parser.print_help()
    sys.exit(0)

config_file = args.config
if config_file is None:
    config_file = DEFAULT_CONFIG_FILE
    # print "Using default config file: %s" % (config_file)

djv = init(config_file)
if args.fingerprint:
    # Fingerprint all files in a directory
    if len(args.fingerprint) == 2:
        directory = args.fingerprint[0]
        extension = args.fingerprint[1]
        print("Fingerprinting all .%s files in the %s directory"
              % (extension, directory))
        djv.fingerprint_directory(directory, ["." + extension], 4)

    elif len(args.fingerprint) == 1:
        filepath = args.fingerprint[0]
        if os.path.isdir(filepath):
            print("Please specify an extension if you'd like to fingerprint a directory!")
            sys.exit(1)
        djv.fingerprint_file(filepath)

elif args.recognize:
    # Recognize audio source
    song = None
    source = args.recognize[0]
    opt_arg = args.recognize[1]

    if source in ('mic', 'microphone'):
        song = djv.recognize(MicrophoneRecognizer, seconds=opt_arg)
    elif source == 'file':
        song = djv.recognize(FileRecognizer, opt_arg)
    print(song)

sys.exit(0)

Solution

  • I still think that you need a discrete "piece" of audio, so you need a beginning and an end.
    For what it is worth, start with something like this, which records a 10 second burst of audio, which you can then test against your finger-printed records.
    Note: that this is bashed out for python 2, so you would have to edit it for it to run on python 3

    import time, sys
    import urllib2
    url = "http://bbcmedia.ic.llnwd.net/stream/bbcmedia_radio1_mf_p"
    print ("Connecting to "+url)
    response = urllib2.urlopen(url, timeout=10.0)
    fname = "Sample"+str(time.clock())[2:]+".wav"
    f = open(fname, 'wb')
    block_size = 1024
    print ("Recording roughly 10 seconds of audio Now - Please wait")
    limit = 10
    start = time.time()
    while time.time() - start < limit:
        try:
            audio = response.read(block_size)
            if not audio:
                break
            f.write(audio)
            sys.stdout.write('.')
            sys.stdout.flush()
        except Exception as e:
            print ("Error "+str(e))
    f.close()
    sys.stdout.flush()
    print("")
    print ("10 seconds from "+url+" have been recorded in "+fname)
    
    #
    # here run the finger print test to identify the audio recorded
    # using the sample you have downloaded in the file "fname"
    #