I am using an open source audio fingerprinting platform in python DeJavu that can recognize music from disk and from microphone. I have tested the recognition from disk and it is amazing. 100% accuracy.
I seek assistance on how to add a class "BroadcastRecognizer" This will recognize music from a URL online stream example online radio [http://bbcmedia.ic.llnwd.net/stream/bbcmedia_radio1_mf_p] Because the music in the radio stream is constantly changing, would like to set it to recognize after every 10 seconds.
import dejavu.fingerprint as fingerprint
import dejavu.decoder as decoder
import numpy as np
import pyaudio
import time
class BaseRecognizer(object):
def __init__(self, dejavu):
self.dejavu = dejavu
self.Fs = fingerprint.DEFAULT_FS
def _recognize(self, *data):
matches = []
for d in data:
matches.extend(self.dejavu.find_matches(d, Fs=self.Fs))
return self.dejavu.align_matches(matches)
def recognize(self):
pass # base class does nothing
class FileRecognizer(BaseRecognizer):
def __init__(self, dejavu):
super(FileRecognizer, self).__init__(dejavu)
def recognize_file(self, filename):
frames, self.Fs, file_hash = decoder.read(filename, self.dejavu.limit)
t = time.time()
match = self._recognize(*frames)
t = time.time() - t
if match:
match['match_time'] = t
return match
def recognize(self, filename):
return self.recognize_file(filename)
class MicrophoneRecognizer(BaseRecognizer):
default_chunksize = 8192
default_format = pyaudio.paInt16
default_channels = 2
default_samplerate = 44100
def __init__(self, dejavu):
super(MicrophoneRecognizer, self).__init__(dejavu)
self.audio = pyaudio.PyAudio()
self.stream = None
self.data = []
self.channels = MicrophoneRecognizer.default_channels
self.chunksize = MicrophoneRecognizer.default_chunksize
self.samplerate = MicrophoneRecognizer.default_samplerate
self.recorded = False
def start_recording(self, channels=default_channels,
samplerate=default_samplerate,
chunksize=default_chunksize):
self.chunksize = chunksize
self.channels = channels
self.recorded = False
self.samplerate = samplerate
if self.stream:
self.stream.stop_stream()
self.stream.close()
self.stream = self.audio.open(
format=self.default_format,
channels=channels,
rate=samplerate,
input=True,
frames_per_buffer=chunksize,
)
self.data = [[] for i in range(channels)]
def process_recording(self):
data = self.stream.read(self.chunksize)
nums = np.fromstring(data, np.int16)
for c in range(self.channels):
self.data[c].extend(nums[c::self.channels])
def stop_recording(self):
self.stream.stop_stream()
self.stream.close()
self.stream = None
self.recorded = True
def recognize_recording(self):
if not self.recorded:
raise NoRecordingError("Recording was not complete/begun")
return self._recognize(*self.data)
def get_recorded_time(self):
return len(self.data[0]) / self.rate
def recognize(self, seconds=10):
self.start_recording()
for i in range(0, int(self.samplerate / self.chunksize
* seconds)):
self.process_recording()
self.stop_recording()
return self.recognize_recording()
class NoRecordingError(Exception):
pass
import os``
import sys
import json
import warnings
import argparse
from dejavu import Dejavu
from dejavu.recognize import FileRecognizer
from dejavu.recognize import MicrophoneRecognizer
from argparse import RawTextHelpFormatter
warnings.filterwarnings("ignore")
DEFAULT_CONFIG_FILE = "dejavu.cnf.SAMPLE"
def init(configpath):
"""
Load config from a JSON file
"""
try:
with open(configpath) as f:
config = json.load(f)
except IOError as err:
print("Cannot open configuration: %s. Exiting" % (str(err)))
sys.exit(1)
# create a Dejavu instance
return Dejavu(config)
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description="Dejavu: Audio Fingerprinting library",
formatter_class=RawTextHelpFormatter)
parser.add_argument('-c', '--config', nargs='?',
help='Path to configuration file\n'
'Usages: \n'
'--config /path/to/config-file\n')
parser.add_argument('-f', '--fingerprint', nargs='*',
help='Fingerprint files in a directory\n'
'Usages: \n'
'--fingerprint /path/to/directory extension\n'
'--fingerprint /path/to/directory')
parser.add_argument('-r', '--recognize', nargs=2,
help='Recognize what is '
'playing through the microphone\n'
'Usage: \n'
'--recognize mic number_of_seconds \n'
'--recognize file path/to/file \n')
args = parser.parse_args()
if not args.fingerprint and not args.recognize:
parser.print_help()
sys.exit(0)
config_file = args.config
if config_file is None:
config_file = DEFAULT_CONFIG_FILE
# print "Using default config file: %s" % (config_file)
djv = init(config_file)
if args.fingerprint:
# Fingerprint all files in a directory
if len(args.fingerprint) == 2:
directory = args.fingerprint[0]
extension = args.fingerprint[1]
print("Fingerprinting all .%s files in the %s directory"
% (extension, directory))
djv.fingerprint_directory(directory, ["." + extension], 4)
elif len(args.fingerprint) == 1:
filepath = args.fingerprint[0]
if os.path.isdir(filepath):
print("Please specify an extension if you'd like to fingerprint a directory!")
sys.exit(1)
djv.fingerprint_file(filepath)
elif args.recognize:
# Recognize audio source
song = None
source = args.recognize[0]
opt_arg = args.recognize[1]
if source in ('mic', 'microphone'):
song = djv.recognize(MicrophoneRecognizer, seconds=opt_arg)
elif source == 'file':
song = djv.recognize(FileRecognizer, opt_arg)
print(song)
sys.exit(0)
I still think that you need a discrete "piece" of audio, so you need a beginning and an end.
For what it is worth, start with something like this, which records a 10 second burst of audio, which you can then test against your finger-printed records.
Note: that this is bashed out for python 2, so you would have to edit it for it to run on python 3
import time, sys
import urllib2
url = "http://bbcmedia.ic.llnwd.net/stream/bbcmedia_radio1_mf_p"
print ("Connecting to "+url)
response = urllib2.urlopen(url, timeout=10.0)
fname = "Sample"+str(time.clock())[2:]+".wav"
f = open(fname, 'wb')
block_size = 1024
print ("Recording roughly 10 seconds of audio Now - Please wait")
limit = 10
start = time.time()
while time.time() - start < limit:
try:
audio = response.read(block_size)
if not audio:
break
f.write(audio)
sys.stdout.write('.')
sys.stdout.flush()
except Exception as e:
print ("Error "+str(e))
f.close()
sys.stdout.flush()
print("")
print ("10 seconds from "+url+" have been recorded in "+fname)
#
# here run the finger print test to identify the audio recorded
# using the sample you have downloaded in the file "fname"
#