RA3_MP3_Player/RA3MP3Playback.py

494 lines
16 KiB
Python

from multiprocessing.heap import Arena
from pathlib import Path
import threading
import time
import os
import sys
import random
import re
import queue
import readchar
import winpty
import ctypes
os.environ["PYGAME_HIDE_SUPPORT_PROMPT"] = "1"
import pygame
#user32 = ctypes.WinDLL("user32", use_last_error=True)
kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
# Event to signal shutdown
shutdown_event = threading.Event()
ERROR_ALREADY_EXISTS = 183
# ========================= CONFIG =========================
DEBUG = True # Set True to enable debug overrides
# Default paths (used if not in debug mode)
DEFAULT_GAME_EXE = r".\RA3game.dll"
DEFAULT_PLAYLIST = r".\arena\music\playlist.txt"
DEFAULT_RA3_MAPS = r".\arena\music\ra3_maps.txt"
# Debug override paths
DEBUG_GAME_EXE = r"D:\GOG Games\Quake III\quake3e_con.exe"
DEBUG_PLAYLIST = r"D:\GOG Games\Rocket Arena 3\arena\music\playlist.txt"
DEBUG_RA3_MAPS = r"D:\GOG Games\Rocket Arena 3\arena\music\ra3_maps.txt"
# Initial volume
VOLUME_STEP = 0.1 # step for W/S volume control
# ==========================================================
# ====================== GLOBAL STATE =====================
volumecheck = False
gametypecheck = False
arenacheck = False
last_arena0 = "undefined"
is_ra3 = False
is_ra3_map = False
is_in_map = False
ra3_maps_path = "."
playlist = []
playlist_index = 0
current_mode = "shuffle" # sequential, shuffle, loop
volume = 0.5
is_playing = False
stop_flag = threading.Event()
serverstatus_sent = False
current_map = "nomap"
current_song = "nosong"
playlist_path = "undefined"
ra3_maps = set()
song_queue = queue.Queue()
ANSI_ESCAPE_RE = re.compile(
r'''
\x1B(?:
[78] # ESC 7, ESC 8 (DEC save/restore)
| \[ [0-?]* [ -/]* [@-~] # CSI sequences
| \] .*? (?:\x07|\x1B\\) # OSC sequences
| [@-Z\\-_] # Other 7-bit C1 controls
)
''',
re.VERBOSE
)
# ==========================================================
# ===================== UTILITY FUNCTIONS =================
def acquire_single_instance(name: str):
handle = kernel32.CreateMutexW(
None, # lpMutexAttributes
False, # bInitialOwner
name # lpName
)
if not handle:
raise ctypes.WinError(ctypes.get_last_error())
if kernel32.GetLastError() == ERROR_ALREADY_EXISTS:
kernel32.CloseHandle(handle)
return None
return handle
def strip_ansi(text: str) -> str:
return ANSI_ESCAPE_RE.sub("", text)
def playlist_watcher(path, poll_interval=1.0):
global playlist
try:
last_mtime = os.path.getmtime(path)
except FileNotFoundError:
last_mtime = 0
while not stop_flag.is_set():
try:
current_mtime = os.path.getmtime(path)
if current_mtime != last_mtime:
last_mtime = current_mtime
print("[DEBUG] Playlist file changed. Reloading playlist.")
playlist = load_playlist(path)
except FileNotFoundError:
pass # Playlist temporarily missing; ignore
time.sleep(poll_interval)
def track_watcher():
global is_playing
last_busy = False
while not stop_flag.is_set():
busy = pygame.mixer.music.get_busy()
if last_busy and not busy and is_playing:
# Track just finished
print("[DEBUG] Track finished. Advancing to next track.")
next_track()
last_busy = busy
time.sleep(0.1)
def parse_music_volume(line: str) -> float:
# Remove all ^<digit> color codes
clean_line = re.sub(r'\^\d', '', line)
# Extract the number inside quotes after "is:"
match = re.search(r'is\s*:\s*"(.*?)"', clean_line, re.IGNORECASE)
if not match:
#raise ValueError(f"Could not parse volume from line: {line}")
value_str = 0
else:
value_str = match.group(1)
try:
return float(value_str)
except ValueError:
raise ValueError(f"Invalid float value for volume: {value_str}")
def load_playlist(playlist_path):
global playlist_index
playlist_path = Path(playlist_path).resolve()
base_dir = playlist_path.parent
songs = []
with open(playlist_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
if line.startswith("#"):
continue
song_path = Path(line)
# If the playlist entry is relative, resolve it relative to playlist.txt
if not song_path.is_absolute():
song_path = base_dir / song_path
songs.append(str(song_path))
if current_mode == "shuffle":
random.shuffle(songs)
# Re-align song that's playing to new index
if current_song != "nosong":
_song_index = 0
_found_song = False
for s in songs:
_song_eval = Path(s).stem
if current_song == _song_eval:
_found_song = True
playlist_index = _song_index
break
_song_index += 1
if not _found_song:
playlist_index = 0
play_current()
return songs
def load_ra3_maps(path):
maps = set()
try:
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
maps.add(line.lower())
except PermissionError:
# File is locked by another process (e.g., Notepad)
# Decide how your app should behave
return set()
return maps
def next_track():
global volumecheck
volumecheck = True
send_command(pty_proc,"s_musicvolume")
global playlist_index
if current_mode == "loop":
pass # keep same index
else:
playlist_index = (playlist_index + 1) % len(playlist)
play_current()
def previous_track():
global volumecheck
volumecheck = True
send_command(pty_proc,"s_musicVolume")
global playlist_index
if current_mode == "loop":
pass # keep same index
else:
playlist_index = (playlist_index - 1) % len(playlist)
play_current()
def play_current():
global is_playing, current_song
if not playlist:
return
track = playlist[playlist_index]
if not os.path.isfile(track):
print(f"[DEBUG] Track not found: {track}")
return
try:
pygame.mixer.music.load(track)
pygame.mixer.music.set_volume(volume)
pygame.mixer.music.play()
is_playing = True
current_song = Path(track).stem
print(f"[DEBUG] Playing: {current_song} at volume {volume}")
threading.Timer(.1, lambda: send_command(pty_proc,f"echo\r\necho\r\necho\r\necho Playing: {current_song}")).start()
except Exception as e:
print(f"[DEBUG] Couldn't start MP3 player': {e}")
def stop_playback():
global is_playing
pygame.mixer.music.stop()
is_playing = False
def toggle_pause():
global volumecheck
volumecheck = True
send_command(pty_proc,"s_musicVolume")
global is_playing
if pygame.mixer.music.get_busy():
pygame.mixer.music.pause()
is_playing = False
else:
pygame.mixer.music.unpause()
is_playing = True
def change_mode():
global current_mode, playlist_path, playlist
modes = ["sequential", "shuffle", "loop"]
idx = modes.index(current_mode)
current_mode = modes[(idx + 1) % len(modes)]
print(f"[DEBUG] Mode changed to: {current_mode}")
send_command(pty_proc,f"echo _musicmode {current_mode}")
playlist = load_playlist(playlist_path)
#def adjust_volume(up=True):
# global volume
# volume = min(1.0, max(0.0, volume + (VOLUME_STEP if up else -VOLUME_STEP)))
# pygame.mixer.music.set_volume(volume)
# print(f"[DEBUG] Volume: {volume:.2f}")
# ==========================================================
# ==================== QUAKE MONITOR ======================
def monitor_game(pty_proc):
global serverstatus_sent
buffer = ""
while not stop_flag.is_set() or shutdown_event.is_set():
try:
data = pty_proc.read(1024)
if not data:
break
# Normalize to string
if isinstance(data, bytes):
data = data.decode(errors="ignore")
buffer += data
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
line = strip_ansi(line).strip()
if line:
#print(f"[GAME] {line}")
#print(f"[GAME RAW] {repr(line)}")
handle_game_line(line, pty_proc)
except EOFError or KeyboardInterrupt:
break
def handle_game_line(line, pty_proc):
global volume, current_map, ra3_maps, ra3_maps_path, volumecheck, is_ra3, gametypecheck, is_playing, arenacheck, is_in_map, is_ra3_map, last_arena0
global serverstatus_sent
if is_ra3 == False or is_in_map == False: is_ra3_map = False
#Loading vm file vm/ui.qvm...
#if "Sound initialization successful." in line:
if "Loading vm file vm/ui.qvm..." in line:
gametypecheck = True
is_in_map = False
threading.Timer(.3, lambda: send_command(pty_proc,"fs_game")).start()
elif "\"fs_game\" is:\"" in line and gametypecheck:
gametypecheck = False
if "\"fs_game\" is:\"arena" in line:
if is_ra3 == False:
print(f"[DEBUG] Active mod is Rocket Arena 3 - enabling music playback")
is_ra3 = True
volumecheck = True
threading.Timer(1.0, lambda: send_command(pty_proc,"s_musicVolume")).start()
if current_mode == "shuffle":
global playlist_index
playlist_index = random.randint(0, len(playlist) - 1)
threading.Timer(2.0, play_current).start()
else:
if not is_playing:
next_track()
else:
print(f"[DEBUG] Active mod is NOT Rocket Arena 3 - disabling music playback")
is_ra3 = False
if is_playing:
stop_playback()
elif line.startswith("]\\showvars"):
threading.Timer(.1, lambda: send_command(pty_proc,f"echo is_ra3 = {is_ra3}\r\necho is_in_map = {is_in_map}\r\necho is_ra3_map = {is_ra3_map}")).start()
if is_ra3:
if line.startswith("\"s_musicVolume\"") and volumecheck == True:
volumecheck = False
svolume = parse_music_volume(line)
if svolume > 0:
if volume != svolume:
volume = svolume
pygame.mixer.music.set_volume(volume)
print(f"[DEBUG] Set music volume to {volume} by game client.")
elif line.startswith("]\\nexttrack"):
if (is_in_map and is_ra3_map) or (is_in_map == False):
next_track()
else:
threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start()
elif line.startswith("]\\prevtrack"):
if (is_in_map and is_ra3_map) or (is_in_map == False):
previous_track()
else:
threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start()
elif line.startswith("]\\pausetrack"):
if (is_in_map and is_ra3_map) or (is_in_map == False):
toggle_pause()
else:
threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start()
elif line.startswith("]\\musicmode"):
if (is_in_map and is_ra3_map) or (is_in_map == False):
change_mode()
else:
threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start()
elif line.startswith("Com_TouchMemory:"):
is_in_map = True
arenacheck = True
threading.Timer(.5, lambda: send_command(pty_proc,f"arena0")).start()
elif "\"arena0\" is:" in line and arenacheck:
if line == last_arena0:
same_map = True
else:
same_map = False
last_arena0 = line
arenacheck = False
if line.startswith("\"arena0\" is:\"Arena Number 1"):
print(f"[DEBUG] Not an RA3 map. Checking for override.")
serverstatus_sent = True
threading.Timer(.1, lambda: send_command(pty_proc,f"serverstatus")).start()
else:
if not same_map:
print(f"[DEBUG] RA3 map detected. Advancing track.")
is_ra3_map = True
next_track()
elif "mapname" in line.lower() and serverstatus_sent:
serverstatus_sent = False
current_map = line.split()[-1].lower()
ra3_maps = load_ra3_maps(ra3_maps_path)
if current_map in ra3_maps:
print(f"[DEBUG] Found override for: {current_map}. Advancing track.")
is_ra3_map = True
next_track()
else:
print(f"[DEBUG] Unknown map: {current_map}. Stopping playback.")
is_ra3_map = False
stop_playback()
def send_command(pty_proc, cmd):
try:
pty_proc.write(cmd + "\r\n")
pty_proc.flush()
print(f"[DEBUG] Sent command: {cmd}")
except Exception as e:
print(f"[DEBUG] Failed to send command: {e}")
# ==========================================================
# ==================== KEYBOARD HANDLER ===================
def keyboard_listener():
while not stop_flag.is_set():
key = readchar.readkey()
if key.lower() == '[':
previous_track()
elif key.lower() == ']':
next_track()
elif key == '\'':
toggle_pause()
elif key == '\\':
change_mode()
# ==========================================================
# ======================== MAIN ===========================
def main():
mutex = acquire_single_instance("Global\\q3aLauncherAndMp3Playback")
if mutex is None:
print("Another instance is already running.")
sys.exit(1)
global playlist, ra3_maps, pty_proc, playlist_path, ra3_maps_path
print(f"Loading Quake 3 Arena...")
# Use debug paths if enabled
game_exe = DEBUG_GAME_EXE if DEBUG else DEFAULT_GAME_EXE
playlist_path = DEBUG_PLAYLIST if DEBUG else DEFAULT_PLAYLIST
ra3_maps_path = DEBUG_RA3_MAPS if DEBUG else DEFAULT_RA3_MAPS
# Load playlist and map list
playlist = load_playlist(playlist_path)
ra3_maps = load_ra3_maps(ra3_maps_path)
if not playlist:
print("Playlist is empty!")
return
if not ra3_maps:
print("RA3 maps list is empty!")
return
# Initialize pygame mixer
pygame.mixer.init()
# Start playlist watcher thread
playlist_thread = threading.Thread(target=playlist_watcher,args=(playlist_path,), daemon=True)
playlist_thread.start()
# Start keyboard listener thread
kb_thread = threading.Thread(target=keyboard_listener, daemon=True)
kb_thread.start()
# Start track watcher thread
watcher_thread = threading.Thread(target=track_watcher, daemon=True)
watcher_thread.start()
# Launch quake process via PTY
try:
pty_proc = winpty.PtyProcess.spawn([game_exe, "--showterminalconsole", "+exec", "music_keys.cfg"] + sys.argv[1:] )
except Exception as e:
print(f"Failed to start game: {e}")
return
# Monitor the game output
try:
monitor_game(pty_proc)
except KeyboardInterrupt:
print("Exiting...")
finally:
stop_flag.set()
stop_playback()
pty_proc.close()
pygame.mixer.quit()
if __name__ == "__main__":
main()
# ==========================================================