RA3_MP3_Player/RA3MP3Playback.py

652 lines
21 KiB
Python

from pathlib import Path
import threading
import time
import os
import sys
import random
import re
import queue
import tempfile
import psutil
import subprocess
import getpass
if (os.name != "nt"):
import pty
os.environ["PYGAME_HIDE_SUPPORT_PROMPT"] = "1"
import pygame
# Event to signal shutdown
shutdown_event = threading.Event()
# ========================= CONFIG =========================
DEBUG = False # Set True to enable debug overrides
# Default paths (used if not in debug mode)
DEFAULT_WIN_GAME_EXE = r"./qgame.dll"
DEFAULT_LIN_GAME_EXE = r"./qgame.so"
DEFAULT_PLAYLIST = r"./arena/music/playlist.txt"
DEFAULT_RA3_MAPS = r"./arena/music/ra3_maps.txt"
DEFAULT_ARENA0_BL = r"./arena/music/arena0_bl.txt"
DEFAULT_GAME_EXE = ""
# Debug override paths
DEBUG_WIN_GAME_EXE = r"D:/GOG Games/Quake III/qgame.dll"
DEBUG_LIN_GAME_EXE = r"D:/GOG Games/Quake III/qgame.so"
DEBUG_PLAYLIST = r"D:/GOG Games/Quake III/arena/music/playlist.txt"
DEBUG_RA3_MAPS = r"D:/GOG Games/Quake III/arena/music/ra3_maps.txt"
DEBUG_ARENA0_BL = r"D:/GOG Games/Quake III/arena/music/arena0_bl.txt"
DEBUG_GAME_EXE = ""
if (os.name == "nt"):
DEFAULT_GAME_EXE = DEFAULT_WIN_GAME_EXE
DEBUG_GAME_EXE = DEBUG_WIN_GAME_EXE
else:
DEFAULT_GAME_EXE = DEFAULT_LIN_GAME_EXE
DEBUG_GAME_EXE = DEBUG_LIN_GAME_EXE
# Initial volume
VOLUME_STEP = 0.1 # step for W/S volume control
# ==========================================================
# ====================== GLOBAL STATE =====================
volumecheck = False
master_fd = None
gametypecheck = False
arenacheck = False
last_arena0 = "undefined"
is_ra3 = False
is_ra3_map = False
is_in_map = False
ra3_maps_path = "."
arena0_bl_path = "."
playlist = []
arena0_bl = []
playlist_index = 0
current_mode = "shuffle" # sequential, shuffle, loop
volume = 0.5
is_playing = False
stop_flag = threading.Event()
serverstatus_sent = False
current_map = "nomap"
current_song = "nosong"
playlist_path = "undefined"
ra3_maps = set()
song_queue = queue.Queue()
ANSI_ESCAPE_RE = re.compile(
r'''
\x1B(?:
[78] # ESC 7, ESC 8 (DEC save/restore)
| \[ [0-?]* [ -/]* [@-~] # CSI sequences
| \] .*? (?:\x07|\x1B\\) # OSC sequences
| [@-Z\\-_] # Other 7-bit C1 controls
)
| \x08\ \x08 # literal "\x08 \x08" sequence
''',
re.VERBOSE
)
# ==========================================================
# ===================== UTILITY FUNCTIONS =================
# def apply_backspaces(s):
# # Ensure we are working with str
# if isinstance(s, bytes):
# s = s.decode(errors="ignore")
# buf = []
# for c in s:
# if c == "\x08":
# if buf:
# buf.pop()
# else:
# buf.append(c)
# return "".join(buf)
def acquire_single_instance(lockfile_base: str):
"""
Ensures only one instance of the program is running per user.
Uses a lock file in the temp directory and psutil to check for stale locks.
Returns (file descriptor, lockfile path) if acquired, None if another instance is running.
"""
username = getpass.getuser()
lockfile_name = f"{lockfile_base}_{username}.lock"
lockfile_path = os.path.join(tempfile.gettempdir(), lockfile_name)
# If lock file exists, check if PID inside is running
if os.path.exists(lockfile_path):
try:
with open(lockfile_path, "r") as f:
pid = int(f.read())
if psutil.pid_exists(pid):
# Another instance is running
return None, lockfile_path
else:
# Stale lock file; remove it
os.unlink(lockfile_path)
except Exception:
# Could not read PID; remove stale lock
try:
os.unlink(lockfile_path)
except:
pass
# Create lock file exclusively
try:
fd = os.open(lockfile_path, os.O_CREAT | os.O_EXCL | os.O_RDWR)
os.write(fd, str(os.getpid()).encode())
return fd, lockfile_path
except FileExistsError:
# Race condition: another process created it just now
return None, lockfile_path
def release_single_instance(fd, lockfile_path):
"""Closes the lock file and removes it."""
try:
os.close(fd)
except:
pass
try:
os.unlink(lockfile_path)
except:
pass
def Check_Arena_Blacklist(arenaline: str) -> bool:
global arena0_bl_path
arena_bl_local = load_arena0_blacklist(arena0_bl_path)
for item in arena_bl_local:
if arenaline.lower().startswith(f"\"arena0\" is:\"{item.lower()}"):
return True
return False
def strip_ansi(text: str) -> str:
return ANSI_ESCAPE_RE.sub("", text)
def playlist_watcher(path, poll_interval=1.0):
global playlist
try:
last_mtime = os.path.getmtime(path)
except FileNotFoundError:
last_mtime = 0
while not stop_flag.is_set():
try:
current_mtime = os.path.getmtime(path)
if current_mtime != last_mtime:
last_mtime = current_mtime
print("[DEBUG] Playlist file changed. Reloading playlist.")
playlist = load_playlist(path)
except FileNotFoundError:
pass # Playlist temporarily missing; ignore
time.sleep(poll_interval)
def track_watcher():
global is_playing
last_busy = False
while not stop_flag.is_set():
busy = pygame.mixer.music.get_busy()
if last_busy and not busy and is_playing:
# Track just finished
print("[DEBUG] Track finished. Advancing to next track.")
next_track()
last_busy = busy
time.sleep(0.1)
def parse_music_volume(line: str) -> float:
# Remove all ^<digit> color codes
clean_line = re.sub(r'\^\d', '', line)
# Extract the number inside quotes after "is:"
match = re.search(r'is\s*:\s*"(.*?)"', clean_line, re.IGNORECASE)
if not match:
#raise ValueError(f"Could not parse volume from line: {line}")
value_str = 0
else:
value_str = match.group(1)
try:
return float(value_str)
except ValueError:
raise ValueError(f"Invalid float value for volume: {value_str}")
def load_playlist(playlist_path):
global playlist_index
playlist_path = Path(playlist_path).resolve()
base_dir = playlist_path.parent
songs = []
try:
with open(playlist_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
if line.startswith("#"):
continue
song_path = Path(line)
# If the playlist entry is relative, resolve it relative to playlist.txt
if not song_path.is_absolute():
song_path = base_dir / song_path
songs.append(str(song_path))
if current_mode == "shuffle":
random.shuffle(songs)
# Re-align song that's playing to new index
if current_song != "nosong":
_song_index = 0
_found_song = False
for s in songs:
_song_eval = Path(s).stem
if current_song == _song_eval:
_found_song = True
playlist_index = _song_index
break
_song_index += 1
if not _found_song:
playlist_index = 0
play_current()
except:
print(f"[DEBUG] Failed to open playlist.txt")
pass
return songs
def load_ra3_maps(path):
maps = set()
try:
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
maps.add(line.lower())
except:
print(f"[DEBUG] Failed to open ra3_maps.txt")
return set()
return maps
def load_arena0_blacklist(path):
maps = set()
try:
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
maps.add(line.lower())
except:
print(f"[DEBUG] Failed to open ra3_maps.txt")
return set()
return maps
def next_track():
global playlist
if not playlist:
return
global volumecheck
volumecheck = True
send_command(pty_proc,"s_musicvolume")
global playlist_index
if current_mode == "loop":
pass # keep same index
else:
playlist_index = (playlist_index + 1) % len(playlist)
play_current()
def previous_track():
global playlist
if not playlist:
return
global volumecheck
volumecheck = True
send_command(pty_proc,"s_musicVolume")
global playlist_index
if current_mode == "loop":
pass # keep same index
else:
playlist_index = (playlist_index - 1) % len(playlist)
play_current()
def play_current():
global is_playing, current_song, playlist
if not playlist:
return
track = playlist[playlist_index]
if not os.path.isfile(track):
print(f"[DEBUG] Track not found: {track}")
return
try:
pygame.mixer.music.load(track)
pygame.mixer.music.set_volume(volume)
pygame.mixer.music.play()
is_playing = True
current_song = Path(track).stem
print(f"[DEBUG] Playing: {current_song} at volume {volume}")
threading.Timer(.1, lambda: send_command(pty_proc,f"echo\r\necho\r\necho\r\necho Playing: {current_song}")).start()
except Exception as e:
print(f"[DEBUG] Couldn't start MP3 player': {e}")
def stop_playback():
global is_playing
pygame.mixer.music.stop()
is_playing = False
def toggle_pause():
global playlist
if not playlist:
return
global volumecheck
volumecheck = True
send_command(pty_proc,"s_musicVolume")
global is_playing
if pygame.mixer.music.get_busy():
pygame.mixer.music.pause()
is_playing = False
else:
pygame.mixer.music.unpause()
is_playing = True
def change_mode():
global current_mode, playlist_path, playlist
modes = ["sequential", "shuffle", "loop"]
idx = modes.index(current_mode)
current_mode = modes[(idx + 1) % len(modes)]
print(f"[DEBUG] Mode changed to: {current_mode}")
send_command(pty_proc,f"echo _musicmode {current_mode}")
playlist = load_playlist(playlist_path)
def monitor_game_pty(master_fd):
buffer = b""
while not stop_flag.is_set():
try:
data = os.read(master_fd, 1024)
except OSError:
break
if not data:
break
buffer += data
while b"\n" in buffer:
line, buffer = buffer.split(b"\n", 1)
#if os.name != "nt":
# line = apply_backspaces(line)
line = strip_ansi(line.decode(errors="ignore")).strip()
if line:
if line.startswith("]"):
line = line[1:]
if "--debug" in sys.argv:
print(f"[GAME] {line}", flush=True)
if "--rawdebug" in sys.argv:
print(f"[GAME RAW] {repr(line)}", flush=True)
handle_game_line(line, pty_proc)
def monitor_game(proc):
#subprocess version
global serverstatus_sent
buffer = b""
while not stop_flag.is_set():
data = proc.stdout.read(1024)
if not data:
break
buffer += data
while b"\n" in buffer:
line, buffer = buffer.split(b"\n", 1)
try:
line = line.decode(errors="ignore")
except:
continue
line = strip_ansi(line).strip()
if line:
if any(arg.lower() == "--debug" for arg in sys.argv):
print(f"[GAME] {line}")
if any(arg.lower() == "--rawdebug" for arg in sys.argv):
print(f"[GAME RAW] {repr(line)}")
handle_game_line(line, proc)
def handle_game_line(line, pty_proc):
global volume, current_map, ra3_maps, ra3_maps_path, volumecheck, is_ra3, gametypecheck, is_playing, arenacheck, is_in_map, is_ra3_map, last_arena0
global serverstatus_sent
if is_ra3 == False or is_in_map == False: is_ra3_map = False
#Loading vm file vm/ui.qvm...
#if "Sound initialization successful." in line:
if "Loading vm file vm/ui.qvm..." in line:
gametypecheck = True
is_in_map = False
threading.Timer(.3, lambda: send_command(pty_proc,"fs_game")).start()
elif "\"fs_game\" is:\"" in line and gametypecheck:
gametypecheck = False
if "\"fs_game\" is:\"arena" in line:
if is_ra3 == False:
print(f"[DEBUG] Active mod is Rocket Arena 3 - enabling music playback")
is_ra3 = True
volumecheck = True
threading.Timer(.5, lambda: send_command(pty_proc,"s_musicVolume")).start()
if playlist:
if current_mode == "shuffle":
global playlist_index
playlist_index = random.randint(0, len(playlist) - 1)
threading.Timer(1.0, play_current).start()
else:
if not is_playing:
next_track()
threading.Timer(.2, lambda: send_command(pty_proc,"bind [ echo ]\\prevtrack\r\nbind ] echo ]\\nexttrack\r\nbind \\ echo ]\\musicmode\r\nbind ' echo ]\\pausetrack")).start()
else:
#print(f"[DEBUG] Active mod is NOT Rocket Arena 3 - disabling music playback")
is_ra3 = False
if is_playing:
stop_playback()
elif line.startswith("]\\showvars"):
threading.Timer(.1, lambda: send_command(pty_proc,f"echo is_ra3 = {is_ra3}\r\necho is_in_map = {is_in_map}\r\necho is_ra3_map = {is_ra3_map}")).start()
if is_ra3:
if line.startswith("\"s_musicVolume\"") and volumecheck == True:
volumecheck = False
svolume = parse_music_volume(line)
if svolume > 0:
if volume != svolume:
volume = svolume
pygame.mixer.music.set_volume(volume)
print(f"[DEBUG] Set music volume to {volume} by game client.")
elif line.startswith("]\\nexttrack"):
if (is_in_map and is_ra3_map) or (is_in_map == False):
next_track()
else:
threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start()
elif line.startswith("]\\prevtrack"):
if (is_in_map and is_ra3_map) or (is_in_map == False):
previous_track()
else:
threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start()
elif line.startswith("]\\pausetrack"):
if (is_in_map and is_ra3_map) or (is_in_map == False):
toggle_pause()
else:
threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start()
elif line.startswith("]\\musicmode"):
if (is_in_map and is_ra3_map) or (is_in_map == False):
change_mode()
else:
threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start()
elif line.startswith("Com_TouchMemory:"):
is_in_map = True
arenacheck = True
threading.Timer(.5, lambda: send_command(pty_proc,f"arena0")).start()
elif "\"arena0\" is:" in line and arenacheck:
if line == last_arena0:
same_map = True
else:
same_map = False
last_arena0 = line
arenacheck = False
if line.startswith("\"arena0\" is:\"Arena Number 1"):
print(f"[DEBUG] Not an RA3 map. Checking for override.")
serverstatus_sent = True
threading.Timer(.1, lambda: send_command(pty_proc,f"serverstatus")).start()
else:
if not same_map:
if not Check_Arena_Blacklist(line):
print(f"[DEBUG] RA3 map detected. Advancing track.")
is_ra3_map = True
next_track()
else:
print(f"[DEBUG] RA3 Arena found on blacklist - disabling music.")
is_ra3_map = False
stop_playback()
elif "mapname" in line.lower() and serverstatus_sent:
serverstatus_sent = False
current_map = line.split()[-1].lower()
ra3_maps = load_ra3_maps(ra3_maps_path)
if current_map in ra3_maps:
print(f"[DEBUG] Found override for: {current_map}. Advancing track.")
is_ra3_map = True
next_track()
else:
print(f"[DEBUG] Unknown map: {current_map}. Stopping playback.")
is_ra3_map = False
threading.Timer(.3, lambda: stop_playback()).start()
#stop_playback()
def send_command(proc, cmd):
global master_fd
#subprocess version
try:
if os.name == "nt":
proc.stdin.write((cmd + "\r\n").encode())
proc.stdin.flush()
else:
os.write(master_fd, (cmd + "\n").encode())
except Exception as e:
print(f"[DEBUG] Failed to send command: {e}")
# ==========================================================
# ======================== MAIN ===========================
def main():
global playlist, ra3_maps, pty_proc, playlist_path, ra3_maps_path, arena0_blacklist, arena0_bl_path, master_fd
print(f"Loading Quake 3 Arena...")
# Use debug paths if enabled
game_exe = DEBUG_GAME_EXE if DEBUG else DEFAULT_GAME_EXE
playlist_path = DEBUG_PLAYLIST if DEBUG else DEFAULT_PLAYLIST
ra3_maps_path = DEBUG_RA3_MAPS if DEBUG else DEFAULT_RA3_MAPS
arena0_bl_path = DEBUG_ARENA0_BL if DEBUG else DEFAULT_ARENA0_BL
# Load playlist and map list
playlist = load_playlist(playlist_path)
ra3_maps = load_ra3_maps(ra3_maps_path)
arena0_bl = load_arena0_blacklist(arena0_bl_path)
# Initialize pygame mixer
pygame.mixer.init()
# Start playlist watcher thread
playlist_thread = threading.Thread(target=playlist_watcher,args=(playlist_path,), daemon=True)
playlist_thread.start()
# Start track watcher thread
watcher_thread = threading.Thread(target=track_watcher, daemon=True)
watcher_thread.start()
# # # Launch quake process via subprocess
# pty_proc = subprocess.Popen(
# [game_exe, "+set", "ttycon", "1"] + sys.argv[1:],
# stdin=subprocess.PIPE,
# stdout=subprocess.PIPE,
# stderr=subprocess.STDOUT,
# bufsize=0, # unbuffered
# universal_newlines=False)
if os.name == "nt":
pty_proc = subprocess.Popen(
[game_exe, "+set", "ttycon", "1"] + sys.argv[1:],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=0,
universal_newlines=False
)
master_fd = None
else:
master_fd, slave_fd = pty.openpty()
pty_proc = subprocess.Popen(
[game_exe, "+set", "ttycon", "1"] + sys.argv[1:],
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
close_fds=True
)
os.close(slave_fd)
# Monitor the game output
try:
if os.name == "nt":
monitor_game(pty_proc)
else:
monitor_game_pty(master_fd)
except KeyboardInterrupt:
print("Exiting...")
finally:
stop_flag.set()
stop_playback()
if pty_proc:
try:
pty_proc.stdin.close()
except:
pass
try:
pty_proc.stdout.close()
except:
pass
try:
pty_proc.terminate()
except:
pass
try:
pty_proc.wait(timeout=5)
except:
pass
pygame.mixer.quit()
if __name__ == "__main__":
lock_fd, lock_path = acquire_single_instance("q3a_launcher.lock")
if lock_fd is None:
print("Another instance is already running.")
sys.exit(1)
try:
main()
finally:
release_single_instance(lock_fd, lock_path)
# ==========================================================