I have written a simple python script that act as a cli music player and organizer. I use the python input as a controller for the player such that for example when user enter "pause" the music will pause; when they enter "resume" the music will resume and so on. The main problem is when a music reach its end the player doesn't go for the next track because it waits for the user input. I need a solution which can run input separately and recognize the end of a track.
I try anything from threading to concurrent futures and ... I expected that my player could recognize the end of a track and played the next song.
def play_track(track):
print("------------------------------------")
print(f"Playing Track: | Title: {track['Title']} | Artist: {track['Artist']} | Album: {track['Album']}"
f" | Date: {track['Date']} | Length: {track['Length']}")
print("------------------------------------")
playback = Playback()
playback.load_file(track["Path"])
playback.play()
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(control)
while playback.active:
mediactl = future.result()
match mediactl:
case "pause":
playback.pause()
print("Track is paused")
case "resume":
playback.resume()
print("Track is resumed")
case "stop":
playback.stop()
print("Playback is stopped")
return
case "next":
playback.stop()
print("Next track is playing")
return "next"
case "prev":
playback.stop()
print("Previous track is playing")
return "prev"
def control():
mediactl = input("Control (pause | resume | stop | next | prev): ")
print("------------------------------------")
print()
if mediactl in ("pause", "resume", "stop", "next", "prev"):
return mediactl
return "invalid"
future.result()
blocks code execution until control()
returns, meaning your program is continuously waiting for user input before continuing, hence your player cannot automatically proceed to the next track when the current one ends
run the input handling in a separate thread
detect when track finishes playing
import threading
import queue
import time
class Playback:
def __init__(self):
self.active = False
self.paused = False
def load_file(self, path):
self.path = path
self.length = 5
self.active = True
def play(self):
start_time = time.time()
while time.time() - start_time < self.length:
if not self.active:
break
if not self.paused:
time.sleep(1)
self.active = False
def pause(self):
self.paused = True
def resume(self):
self.paused = False
def stop(self):
self.active = False
def control(input_queue):
while True:
cmd = input().strip()
input_queue.put(cmd)
if cmd == "stop":
break
def play_track(track):
print(f"Playing {track['Title']} by {track['Artist']}...")
playback = Playback()
playback.load_file(track["Path"])
input_queue = queue.Queue()
input_thread = threading.Thread(target=control, args=(input_queue,), daemon=True)
input_thread.start()
playback_thread = threading.Thread(target=playback.play)
playback_thread.start()
while playback.active:
try:
mediactl = input_queue.get(timeout=1)
except queue.Empty:
continue
match mediactl:
case "pause":
playback.pause()
print("Track is paused")
case "resume":
playback.resume()
print("Track is resumed")
case "stop":
playback.stop()
print("Playback is stopped")
return
case "next":
playback.stop()
print("Next track is playing")
return "next"
print("Track finished playing, moving to next track.")
return "next"
in this code, control()
runs on a separate thread, listening for user input and a queue.Queue()
is used for communication between input thread and the main media player, with the media player checking every second for user input without blocking the queue.