from pathlib import Path import pty import threading import time import os import sys import random import re import queue import tempfile import psutil import subprocess import getpass os.environ["PYGAME_HIDE_SUPPORT_PROMPT"] = "1" import pygame # Event to signal shutdown shutdown_event = threading.Event() # ========================= CONFIG ========================= DEBUG = False # Set True to enable debug overrides # Default paths (used if not in debug mode) DEFAULT_WIN_GAME_EXE = r"./qgame.dll" DEFAULT_LIN_GAME_EXE = r"./qgame.so" DEFAULT_PLAYLIST = r"./arena/music/playlist.txt" DEFAULT_RA3_MAPS = r"./arena/music/ra3_maps.txt" DEFAULT_ARENA0_BL = r"./arena/music/arena0_bl.txt" DEFAULT_GAME_EXE = "" # Debug override paths DEBUG_WIN_GAME_EXE = r"D:/GOG Games/Quake III/qgame.dll" DEBUG_LIN_GAME_EXE = r"D:/GOG Games/Quake III/qgame.so" DEBUG_PLAYLIST = r"D:/GOG Games/Quake III/arena/music/playlist.txt" DEBUG_RA3_MAPS = r"D:/GOG Games/Quake III/arena/music/ra3_maps.txt" DEBUG_ARENA0_BL = r"D:/GOG Games/Quake III/arena/music/arena0_bl.txt" DEBUG_GAME_EXE = "" if (os.name == "nt"): DEFAULT_GAME_EXE = DEFAULT_WIN_GAME_EXE DEBUG_GAME_EXE = DEBUG_WIN_GAME_EXE else: DEFAULT_GAME_EXE = DEFAULT_LIN_GAME_EXE DEBUG_GAME_EXE = DEBUG_LIN_GAME_EXE # Initial volume VOLUME_STEP = 0.1 # step for W/S volume control # ========================================================== # ====================== GLOBAL STATE ===================== volumecheck = False master_fd = None gametypecheck = False arenacheck = False last_arena0 = "undefined" is_ra3 = False is_ra3_map = False is_in_map = False ra3_maps_path = "." arena0_bl_path = "." playlist = [] arena0_bl = [] playlist_index = 0 current_mode = "shuffle" # sequential, shuffle, loop volume = 0.5 is_playing = False stop_flag = threading.Event() serverstatus_sent = False current_map = "nomap" current_song = "nosong" playlist_path = "undefined" ra3_maps = set() song_queue = queue.Queue() ANSI_ESCAPE_RE = re.compile( r''' \x1B(?: [78] # ESC 7, ESC 8 (DEC save/restore) | \[ [0-?]* [ -/]* [@-~] # CSI sequences | \] .*? (?:\x07|\x1B\\) # OSC sequences | [@-Z\\-_] # Other 7-bit C1 controls ) | \x08\ \x08 # literal "\x08 \x08" sequence ''', re.VERBOSE ) # ========================================================== # ===================== UTILITY FUNCTIONS ================= # def apply_backspaces(s): # # Ensure we are working with str # if isinstance(s, bytes): # s = s.decode(errors="ignore") # buf = [] # for c in s: # if c == "\x08": # if buf: # buf.pop() # else: # buf.append(c) # return "".join(buf) def acquire_single_instance(lockfile_base: str): """ Ensures only one instance of the program is running per user. Uses a lock file in the temp directory and psutil to check for stale locks. Returns (file descriptor, lockfile path) if acquired, None if another instance is running. """ username = getpass.getuser() lockfile_name = f"{lockfile_base}_{username}.lock" lockfile_path = os.path.join(tempfile.gettempdir(), lockfile_name) # If lock file exists, check if PID inside is running if os.path.exists(lockfile_path): try: with open(lockfile_path, "r") as f: pid = int(f.read()) if psutil.pid_exists(pid): # Another instance is running return None, lockfile_path else: # Stale lock file; remove it os.unlink(lockfile_path) except Exception: # Could not read PID; remove stale lock try: os.unlink(lockfile_path) except: pass # Create lock file exclusively try: fd = os.open(lockfile_path, os.O_CREAT | os.O_EXCL | os.O_RDWR) os.write(fd, str(os.getpid()).encode()) return fd, lockfile_path except FileExistsError: # Race condition: another process created it just now return None, lockfile_path def release_single_instance(fd, lockfile_path): """Closes the lock file and removes it.""" try: os.close(fd) except: pass try: os.unlink(lockfile_path) except: pass def Check_Arena_Blacklist(arenaline: str) -> bool: global arena0_bl_path arena_bl_local = load_arena0_blacklist(arena0_bl_path) for item in arena_bl_local: if arenaline.lower().startswith(f"\"arena0\" is:\"{item.lower()}"): return True return False def strip_ansi(text: str) -> str: return ANSI_ESCAPE_RE.sub("", text) def playlist_watcher(path, poll_interval=1.0): global playlist try: last_mtime = os.path.getmtime(path) except FileNotFoundError: last_mtime = 0 while not stop_flag.is_set(): try: current_mtime = os.path.getmtime(path) if current_mtime != last_mtime: last_mtime = current_mtime print("[DEBUG] Playlist file changed. Reloading playlist.") playlist = load_playlist(path) except FileNotFoundError: pass # Playlist temporarily missing; ignore time.sleep(poll_interval) def track_watcher(): global is_playing last_busy = False while not stop_flag.is_set(): busy = pygame.mixer.music.get_busy() if last_busy and not busy and is_playing: # Track just finished print("[DEBUG] Track finished. Advancing to next track.") next_track() last_busy = busy time.sleep(0.1) def parse_music_volume(line: str) -> float: # Remove all ^ color codes clean_line = re.sub(r'\^\d', '', line) # Extract the number inside quotes after "is:" match = re.search(r'is\s*:\s*"(.*?)"', clean_line, re.IGNORECASE) if not match: #raise ValueError(f"Could not parse volume from line: {line}") value_str = 0 else: value_str = match.group(1) try: return float(value_str) except ValueError: raise ValueError(f"Invalid float value for volume: {value_str}") def load_playlist(playlist_path): global playlist_index playlist_path = Path(playlist_path).resolve() base_dir = playlist_path.parent songs = [] try: with open(playlist_path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue if line.startswith("#"): continue song_path = Path(line) # If the playlist entry is relative, resolve it relative to playlist.txt if not song_path.is_absolute(): song_path = base_dir / song_path songs.append(str(song_path)) if current_mode == "shuffle": random.shuffle(songs) # Re-align song that's playing to new index if current_song != "nosong": _song_index = 0 _found_song = False for s in songs: _song_eval = Path(s).stem if current_song == _song_eval: _found_song = True playlist_index = _song_index break _song_index += 1 if not _found_song: playlist_index = 0 play_current() except: print(f"[DEBUG] Failed to open playlist.txt") pass return songs def load_ra3_maps(path): maps = set() try: with open(path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if line: maps.add(line.lower()) except: print(f"[DEBUG] Failed to open ra3_maps.txt") return set() return maps def load_arena0_blacklist(path): maps = set() try: with open(path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if line: maps.add(line.lower()) except: print(f"[DEBUG] Failed to open ra3_maps.txt") return set() return maps def next_track(): global playlist if not playlist: return global volumecheck volumecheck = True send_command(pty_proc,"s_musicvolume") global playlist_index if current_mode == "loop": pass # keep same index else: playlist_index = (playlist_index + 1) % len(playlist) play_current() def previous_track(): global playlist if not playlist: return global volumecheck volumecheck = True send_command(pty_proc,"s_musicVolume") global playlist_index if current_mode == "loop": pass # keep same index else: playlist_index = (playlist_index - 1) % len(playlist) play_current() def play_current(): global is_playing, current_song, playlist if not playlist: return track = playlist[playlist_index] if not os.path.isfile(track): print(f"[DEBUG] Track not found: {track}") return try: pygame.mixer.music.load(track) pygame.mixer.music.set_volume(volume) pygame.mixer.music.play() is_playing = True current_song = Path(track).stem print(f"[DEBUG] Playing: {current_song} at volume {volume}") threading.Timer(.1, lambda: send_command(pty_proc,f"echo\r\necho\r\necho\r\necho Playing: {current_song}")).start() except Exception as e: print(f"[DEBUG] Couldn't start MP3 player': {e}") def stop_playback(): global is_playing pygame.mixer.music.stop() is_playing = False def toggle_pause(): global playlist if not playlist: return global volumecheck volumecheck = True send_command(pty_proc,"s_musicVolume") global is_playing if pygame.mixer.music.get_busy(): pygame.mixer.music.pause() is_playing = False else: pygame.mixer.music.unpause() is_playing = True def change_mode(): global current_mode, playlist_path, playlist modes = ["sequential", "shuffle", "loop"] idx = modes.index(current_mode) current_mode = modes[(idx + 1) % len(modes)] print(f"[DEBUG] Mode changed to: {current_mode}") send_command(pty_proc,f"echo _musicmode {current_mode}") playlist = load_playlist(playlist_path) def monitor_game_pty(master_fd): buffer = b"" while not stop_flag.is_set(): try: data = os.read(master_fd, 1024) except OSError: break if not data: break buffer += data while b"\n" in buffer: line, buffer = buffer.split(b"\n", 1) #if os.name != "nt": # line = apply_backspaces(line) line = strip_ansi(line.decode(errors="ignore")).strip() if line: if line.startswith("]"): line = line[1:] if "--debug" in sys.argv: print(f"[GAME] {line}", flush=True) if "--rawdebug" in sys.argv: print(f"[GAME RAW] {repr(line)}", flush=True) handle_game_line(line, pty_proc) def monitor_game(proc): #subprocess version global serverstatus_sent buffer = b"" while not stop_flag.is_set(): data = proc.stdout.read(1024) if not data: break buffer += data while b"\n" in buffer: line, buffer = buffer.split(b"\n", 1) try: line = line.decode(errors="ignore") except: continue line = strip_ansi(line).strip() if line: if any(arg.lower() == "--debug" for arg in sys.argv): print(f"[GAME] {line}") if any(arg.lower() == "--rawdebug" for arg in sys.argv): print(f"[GAME RAW] {repr(line)}") handle_game_line(line, proc) def handle_game_line(line, pty_proc): global volume, current_map, ra3_maps, ra3_maps_path, volumecheck, is_ra3, gametypecheck, is_playing, arenacheck, is_in_map, is_ra3_map, last_arena0 global serverstatus_sent if is_ra3 == False or is_in_map == False: is_ra3_map = False #Loading vm file vm/ui.qvm... #if "Sound initialization successful." in line: if "Loading vm file vm/ui.qvm..." in line: gametypecheck = True is_in_map = False threading.Timer(.3, lambda: send_command(pty_proc,"fs_game")).start() elif "\"fs_game\" is:\"" in line and gametypecheck: gametypecheck = False if "\"fs_game\" is:\"arena" in line: if is_ra3 == False: print(f"[DEBUG] Active mod is Rocket Arena 3 - enabling music playback") is_ra3 = True volumecheck = True threading.Timer(.5, lambda: send_command(pty_proc,"s_musicVolume")).start() if playlist: if current_mode == "shuffle": global playlist_index playlist_index = random.randint(0, len(playlist) - 1) threading.Timer(1.0, play_current).start() else: if not is_playing: next_track() threading.Timer(.2, lambda: send_command(pty_proc,"bind [ echo ]\\prevtrack\r\nbind ] echo ]\\nexttrack\r\nbind \\ echo ]\\musicmode\r\nbind ' echo ]\\pausetrack")).start() else: #print(f"[DEBUG] Active mod is NOT Rocket Arena 3 - disabling music playback") is_ra3 = False if is_playing: stop_playback() elif line.startswith("]\\showvars"): threading.Timer(.1, lambda: send_command(pty_proc,f"echo is_ra3 = {is_ra3}\r\necho is_in_map = {is_in_map}\r\necho is_ra3_map = {is_ra3_map}")).start() if is_ra3: if line.startswith("\"s_musicVolume\"") and volumecheck == True: volumecheck = False svolume = parse_music_volume(line) if svolume > 0: if volume != svolume: volume = svolume pygame.mixer.music.set_volume(volume) print(f"[DEBUG] Set music volume to {volume} by game client.") elif line.startswith("]\\nexttrack"): if (is_in_map and is_ra3_map) or (is_in_map == False): next_track() else: threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start() elif line.startswith("]\\prevtrack"): if (is_in_map and is_ra3_map) or (is_in_map == False): previous_track() else: threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start() elif line.startswith("]\\pausetrack"): if (is_in_map and is_ra3_map) or (is_in_map == False): toggle_pause() else: threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start() elif line.startswith("]\\musicmode"): if (is_in_map and is_ra3_map) or (is_in_map == False): change_mode() else: threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start() elif line.startswith("Com_TouchMemory:"): is_in_map = True arenacheck = True threading.Timer(.5, lambda: send_command(pty_proc,f"arena0")).start() elif "\"arena0\" is:" in line and arenacheck: if line == last_arena0: same_map = True else: same_map = False last_arena0 = line arenacheck = False if line.startswith("\"arena0\" is:\"Arena Number 1"): print(f"[DEBUG] Not an RA3 map. Checking for override.") serverstatus_sent = True threading.Timer(.1, lambda: send_command(pty_proc,f"serverstatus")).start() else: if not same_map: if not Check_Arena_Blacklist(line): print(f"[DEBUG] RA3 map detected. Advancing track.") is_ra3_map = True next_track() else: print(f"[DEBUG] RA3 Arena found on blacklist - disabling music.") is_ra3_map = False stop_playback() elif "mapname" in line.lower() and serverstatus_sent: serverstatus_sent = False current_map = line.split()[-1].lower() ra3_maps = load_ra3_maps(ra3_maps_path) if current_map in ra3_maps: print(f"[DEBUG] Found override for: {current_map}. Advancing track.") is_ra3_map = True next_track() else: print(f"[DEBUG] Unknown map: {current_map}. Stopping playback.") is_ra3_map = False threading.Timer(.3, lambda: stop_playback()).start() #stop_playback() def send_command(proc, cmd): global master_fd #subprocess version try: if os.name == "nt": proc.stdin.write((cmd + "\r\n").encode()) proc.stdin.flush() else: os.write(master_fd, (cmd + "\n").encode()) except Exception as e: print(f"[DEBUG] Failed to send command: {e}") # ========================================================== # ======================== MAIN =========================== def main(): global playlist, ra3_maps, pty_proc, playlist_path, ra3_maps_path, arena0_blacklist, arena0_bl_path, master_fd print(f"Loading Quake 3 Arena...") # Use debug paths if enabled game_exe = DEBUG_GAME_EXE if DEBUG else DEFAULT_GAME_EXE playlist_path = DEBUG_PLAYLIST if DEBUG else DEFAULT_PLAYLIST ra3_maps_path = DEBUG_RA3_MAPS if DEBUG else DEFAULT_RA3_MAPS arena0_bl_path = DEBUG_ARENA0_BL if DEBUG else DEFAULT_ARENA0_BL # Load playlist and map list playlist = load_playlist(playlist_path) ra3_maps = load_ra3_maps(ra3_maps_path) arena0_bl = load_arena0_blacklist(arena0_bl_path) # Initialize pygame mixer pygame.mixer.init() # Start playlist watcher thread playlist_thread = threading.Thread(target=playlist_watcher,args=(playlist_path,), daemon=True) playlist_thread.start() # Start track watcher thread watcher_thread = threading.Thread(target=track_watcher, daemon=True) watcher_thread.start() # # # Launch quake process via subprocess # pty_proc = subprocess.Popen( # [game_exe, "+set", "ttycon", "1"] + sys.argv[1:], # stdin=subprocess.PIPE, # stdout=subprocess.PIPE, # stderr=subprocess.STDOUT, # bufsize=0, # unbuffered # universal_newlines=False) if os.name == "nt": pty_proc = subprocess.Popen( [game_exe, "+set", "ttycon", "1"] + sys.argv[1:], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=0, universal_newlines=False ) master_fd = None else: master_fd, slave_fd = pty.openpty() pty_proc = subprocess.Popen( [game_exe, "+set", "ttycon", "1"] + sys.argv[1:], stdin=slave_fd, stdout=slave_fd, stderr=slave_fd, close_fds=True ) os.close(slave_fd) # Monitor the game output try: if os.name == "nt": monitor_game(pty_proc) else: monitor_game_pty(master_fd) except KeyboardInterrupt: print("Exiting...") finally: stop_flag.set() stop_playback() if pty_proc: try: pty_proc.stdin.close() except: pass try: pty_proc.stdout.close() except: pass try: pty_proc.terminate() except: pass try: pty_proc.wait(timeout=5) except: pass pygame.mixer.quit() if __name__ == "__main__": lock_fd, lock_path = acquire_single_instance("q3a_launcher.lock") if lock_fd is None: print("Another instance is already running.") sys.exit(1) try: main() finally: release_single_instance(lock_fd, lock_path) # ==========================================================