from pathlib import Path from textmenu import TextMenu import threading import time import os import sys import random import re import queue import tempfile import psutil import subprocess import getpass import shutil if (os.name != "nt"): import pty os.environ["PYGAME_HIDE_SUPPORT_PROMPT"] = "1" import pygame # Event to signal shutdown shutdown_event = threading.Event() # ========================= CONFIG ========================= DEBUG = False # Set True to enable debug overrides # Default paths (used if not in debug mode) DEFAULT_WIN_GAME_EXE = r"./qgame.dll" DEFAULT_LIN_GAME_EXE = r"./qgame.so" DEFAULT_PLAYLIST = r"./arena/music/playlist.txt" DEFAULT_RA3_MAPS = r"./arena/music/ra3_maps.txt" DEFAULT_ARENA0_BL = r"./arena/music/arena0_bl.txt" DEFAULT_GAME_EXE = "" # Debug override paths DEBUG_WIN_GAME_EXE = r"D:/GOG Games/Quake III/qgame.dll" DEBUG_LIN_GAME_EXE = r"D:/GOG Games/Quake III/qgame.so" DEBUG_PLAYLIST = r"D:/GOG Games/Quake III/arena/music/playlist.txt" DEBUG_RA3_MAPS = r"D:/GOG Games/Quake III/arena/music/ra3_maps.txt" DEBUG_ARENA0_BL = r"D:/GOG Games/Quake III/arena/music/arena0_bl.txt" DEBUG_GAME_EXE = "" if (os.name == "nt"): DEFAULT_GAME_EXE = DEFAULT_WIN_GAME_EXE DEBUG_GAME_EXE = DEBUG_WIN_GAME_EXE else: DEFAULT_GAME_EXE = DEFAULT_LIN_GAME_EXE DEBUG_GAME_EXE = DEBUG_LIN_GAME_EXE # Initial volume VOLUME_STEP = 0.1 # step for W/S volume control # ========================================================== # ====================== GLOBAL STATE ===================== volumecheck = False master_fd = None gametypecheck = False arenacheck = False last_arena0 = "undefined" is_ra3 = False is_ra3_map = False is_in_map = False ra3_maps_path = "." arena0_bl_path = "." playlist = [] arena0_bl = [] playlist_index = 0 current_mode = "shuffle" # sequential, shuffle, loop volume = 0.5 is_playing = False stop_flag = threading.Event() serverstatus_sent = False current_map = "nomap" current_song = "nosong" playlist_path = "undefined" ra3_maps = set() song_queue = queue.Queue() ANSI_ESCAPE_RE = re.compile( r''' \x1B(?: [78] # ESC 7, ESC 8 (DEC save/restore) | \[ [0-?]* [ -/]* [@-~] # CSI sequences | \] .*? (?:\x07|\x1B\\) # OSC sequences | [@-Z\\-_] # Other 7-bit C1 controls ) | \x08\ \x08 # literal "\x08 \x08" sequence ''', re.VERBOSE ) # ========================================================== # ===================== UTILITY FUNCTIONS ================= # Strip Quake-style color codes (^ followed by any character) _QUAKE_COLOR_RE = re.compile(r"\^.") def _strip_quake_colors(text): return _QUAKE_COLOR_RE.sub("", text) def find_pk3_subfolders(base_path): """ Returns a list of strings for each subfolder under base_path that contains at least one .pk3 file. The list always begins with: baseq3\nQuake III Arena Each subsequent entry is formatted as: folder_name + "\n" + description """ base = Path(base_path) # Always start with baseq3 results = ["baseq3\nQuake III Arena"] if not base.is_dir(): return results for entry in base.iterdir(): if not entry.is_dir(): continue folder_name = entry.name folder_name_lower = folder_name.lower() # Skip baseq3 during scan (already added) if folder_name_lower == "baseq3": continue # Check for at least one .pk3 file has_pk3 = any( f.is_file() and f.suffix.lower() == ".pk3" for f in entry.iterdir() ) if not has_pk3: continue # Special-case missionpack if folder_name_lower == "missionpack": results.append(f"{folder_name}\nQuake III Team Arena") continue # Description.txt handling description_file = entry / "description.txt" if description_file.is_file(): try: with description_file.open("r", encoding="utf-8", errors="ignore") as f: first_line = _strip_quake_colors(f.readline().strip()) if first_line: results.append(f"{folder_name}\n{first_line}") else: results.append(f"{folder_name}\nNo description available") except OSError: results.append(f"{folder_name}\nNo description available") else: results.append(f"{folder_name}\nNo description available") return results def acquire_single_instance(lockfile_base: str): """ Ensures only one instance of the program is running per user. Uses a lock file in the temp directory and psutil to check for stale locks. Returns (file descriptor, lockfile path) if acquired, None if another instance is running. """ username = getpass.getuser() lockfile_name = f"{lockfile_base}_{username}.lock" lockfile_path = os.path.join(tempfile.gettempdir(), lockfile_name) # If lock file exists, check if PID inside is running if os.path.exists(lockfile_path): try: with open(lockfile_path, "r") as f: pid = int(f.read()) if psutil.pid_exists(pid): # Another instance is running return None, lockfile_path else: # Stale lock file; remove it os.unlink(lockfile_path) except Exception: # Could not read PID; remove stale lock try: os.unlink(lockfile_path) except: pass # Create lock file exclusively try: fd = os.open(lockfile_path, os.O_CREAT | os.O_EXCL | os.O_RDWR) os.write(fd, str(os.getpid()).encode()) return fd, lockfile_path except FileExistsError: # Race condition: another process created it just now return None, lockfile_path def release_single_instance(fd, lockfile_path): """Closes the lock file and removes it.""" try: os.close(fd) except: pass try: os.unlink(lockfile_path) except: pass def Check_Arena_Blacklist(arenaline: str) -> bool: global arena0_bl_path arena_bl_local = load_arena0_blacklist(arena0_bl_path) for item in arena_bl_local: if arenaline.lower().startswith(f"\"arena0\" is:\"{item.lower()}"): return True return False def strip_ansi(text: str) -> str: return ANSI_ESCAPE_RE.sub("", text) def playlist_watcher(path, poll_interval=1.0): global playlist try: last_mtime = os.path.getmtime(path) except FileNotFoundError: last_mtime = 0 while not stop_flag.is_set(): try: current_mtime = os.path.getmtime(path) if current_mtime != last_mtime: last_mtime = current_mtime print("[DEBUG] Playlist file changed. Reloading playlist.") playlist = load_playlist(path) except FileNotFoundError: pass # Playlist temporarily missing; ignore time.sleep(poll_interval) def track_watcher(): global is_playing last_busy = False while not stop_flag.is_set(): busy = pygame.mixer.music.get_busy() if last_busy and not busy and is_playing: # Track just finished print("[DEBUG] Track finished. Advancing to next track.") next_track() last_busy = busy time.sleep(0.1) def parse_music_volume(line: str) -> float: # Remove all ^ color codes clean_line = re.sub(r'\^\d', '', line) # Extract the number inside quotes after "is:" match = re.search(r'is\s*:\s*"(.*?)"', clean_line, re.IGNORECASE) if not match: #raise ValueError(f"Could not parse volume from line: {line}") value_str = 0 else: value_str = match.group(1) try: return float(value_str) except ValueError: raise ValueError(f"Invalid float value for volume: {value_str}") def load_playlist(playlist_path): global playlist_index playlist_path = Path(playlist_path).resolve() base_dir = playlist_path.parent songs = [] try: with open(playlist_path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue if line.startswith("#"): continue song_path = Path(line) # If the playlist entry is relative, resolve it relative to playlist.txt if not song_path.is_absolute(): song_path = base_dir / song_path songs.append(str(song_path)) if current_mode == "shuffle": random.shuffle(songs) # Re-align song that's playing to new index if current_song != "nosong": _song_index = 0 _found_song = False for s in songs: _song_eval = Path(s).stem if current_song == _song_eval: _found_song = True playlist_index = _song_index break _song_index += 1 if not _found_song: playlist_index = 0 play_current() except: print(f"[DEBUG] Failed to open playlist.txt") pass return songs def load_ra3_maps(path): maps = set() try: with open(path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if line: maps.add(line.lower()) except: print(f"[DEBUG] Failed to open ra3_maps.txt") return set() return maps def load_arena0_blacklist(path): maps = set() try: with open(path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if line: maps.add(line.lower()) except: print(f"[DEBUG] Failed to open ra3_maps.txt") return set() return maps def next_track(): global playlist if not playlist: return global volumecheck volumecheck = True send_command(pty_proc,"s_musicvolume") global playlist_index if current_mode == "loop": pass # keep same index else: playlist_index = (playlist_index + 1) % len(playlist) play_current() def previous_track(): global playlist if not playlist: return global volumecheck volumecheck = True send_command(pty_proc,"s_musicVolume") global playlist_index if current_mode == "loop": pass # keep same index else: playlist_index = (playlist_index - 1) % len(playlist) play_current() def play_current(): global is_playing, current_song, playlist if not playlist: return track = playlist[playlist_index] if not os.path.isfile(track): print(f"[DEBUG] Track not found: {track}") return try: pygame.mixer.music.load(track) pygame.mixer.music.set_volume(volume) pygame.mixer.music.play() is_playing = True current_song = Path(track).stem print(f"[DEBUG] Playing: {current_song} at volume {volume}") threading.Timer(.1, lambda: send_command(pty_proc,f"echo\r\necho\r\necho\r\necho Playing: {current_song}")).start() except Exception as e: print(f"[DEBUG] Couldn't start MP3 player': {e}") def stop_playback(): global is_playing pygame.mixer.music.stop() is_playing = False def toggle_pause(): global playlist if not playlist: return global volumecheck volumecheck = True send_command(pty_proc,"s_musicVolume") global is_playing if pygame.mixer.music.get_busy(): pygame.mixer.music.pause() is_playing = False else: pygame.mixer.music.unpause() is_playing = True def change_mode(): global current_mode, playlist_path, playlist modes = ["sequential", "shuffle", "loop"] idx = modes.index(current_mode) current_mode = modes[(idx + 1) % len(modes)] print(f"[DEBUG] Mode changed to: {current_mode}") send_command(pty_proc,f"echo _musicmode {current_mode}") playlist = load_playlist(playlist_path) def monitor_game_pty(master_fd): buffer = b"" while not stop_flag.is_set(): try: data = os.read(master_fd, 1024) except OSError: break if not data: break buffer += data while b"\n" in buffer: line, buffer = buffer.split(b"\n", 1) #if os.name != "nt": # line = apply_backspaces(line) line = strip_ansi(line.decode(errors="ignore")).strip() if line: if line.startswith("]"): line = line[1:] if "--debug" in sys.argv: print(f"[GAME] {line}", flush=True) if "--rawdebug" in sys.argv: print(f"[GAME RAW] {repr(line)}", flush=True) handle_game_line(line, pty_proc) def monitor_game(proc): #subprocess version global serverstatus_sent buffer = b"" while not stop_flag.is_set(): data = proc.stdout.read(1024) if not data: break buffer += data while b"\n" in buffer: line, buffer = buffer.split(b"\n", 1) try: line = line.decode(errors="ignore") except: continue line = strip_ansi(line).strip() if line: if any(arg.lower() == "--debug" for arg in sys.argv): print(f"[GAME] {line}") if any(arg.lower() == "--rawdebug" for arg in sys.argv): print(f"[GAME RAW] {repr(line)}") handle_game_line(line, proc) def handle_game_line(line, pty_proc): global volume, current_map, ra3_maps, ra3_maps_path, volumecheck, is_ra3, gametypecheck, is_playing, arenacheck, is_in_map, is_ra3_map, last_arena0 global serverstatus_sent if is_ra3 == False or is_in_map == False: is_ra3_map = False #Loading vm file vm/ui.qvm... #if "Sound initialization successful." in line: if "Loading vm file vm/ui.qvm..." in line: gametypecheck = True is_in_map = False threading.Timer(.3, lambda: send_command(pty_proc,"fs_game")).start() elif "\"fs_game\" is:\"" in line and gametypecheck: gametypecheck = False if "\"fs_game\" is:\"arena" in line: if is_ra3 == False: print(f"[DEBUG] Active mod is Rocket Arena 3 - enabling music playback") is_ra3 = True volumecheck = True threading.Timer(.5, lambda: send_command(pty_proc,"s_musicVolume")).start() if playlist: if current_mode == "shuffle": global playlist_index playlist_index = random.randint(0, len(playlist) - 1) threading.Timer(1.0, play_current).start() else: if not is_playing: next_track() threading.Timer(.2, lambda: send_command(pty_proc,"bind [ echo ]\\prevtrack\r\nbind ] echo ]\\nexttrack\r\nbind \\ echo ]\\musicmode\r\nbind ' echo ]\\pausetrack")).start() else: #print(f"[DEBUG] Active mod is NOT Rocket Arena 3 - disabling music playback") is_ra3 = False if is_playing: stop_playback() elif line.startswith("]\\showvars"): threading.Timer(.1, lambda: send_command(pty_proc,f"echo is_ra3 = {is_ra3}\r\necho is_in_map = {is_in_map}\r\necho is_ra3_map = {is_ra3_map}")).start() if is_ra3: if line.startswith("\"s_musicVolume\"") and volumecheck == True: volumecheck = False svolume = parse_music_volume(line) if svolume > 0: if volume != svolume: volume = svolume pygame.mixer.music.set_volume(volume) print(f"[DEBUG] Set music volume to {volume} by game client.") elif line.startswith("]\\nexttrack"): if (is_in_map and is_ra3_map) or (is_in_map == False): next_track() else: threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start() elif line.startswith("]\\prevtrack"): if (is_in_map and is_ra3_map) or (is_in_map == False): previous_track() else: threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start() elif line.startswith("]\\pausetrack"): if (is_in_map and is_ra3_map) or (is_in_map == False): toggle_pause() else: threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start() elif line.startswith("]\\musicmode"): if (is_in_map and is_ra3_map) or (is_in_map == False): change_mode() else: threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start() elif line.startswith("Com_TouchMemory:"): is_in_map = True arenacheck = True threading.Timer(.5, lambda: send_command(pty_proc,f"arena0")).start() elif "\"arena0\" is:" in line and arenacheck: if line == last_arena0: same_map = True else: same_map = False last_arena0 = line arenacheck = False if line.startswith("\"arena0\" is:\"Arena Number 1"): print(f"[DEBUG] Not an RA3 map. Checking for override.") serverstatus_sent = True threading.Timer(.1, lambda: send_command(pty_proc,f"serverstatus")).start() else: if not same_map: if not Check_Arena_Blacklist(line): print(f"[DEBUG] RA3 map detected. Advancing track.") is_ra3_map = True next_track() else: print(f"[DEBUG] RA3 Arena found on blacklist - disabling music.") is_ra3_map = False stop_playback() elif "mapname" in line.lower() and serverstatus_sent: serverstatus_sent = False current_map = line.split()[-1].lower() ra3_maps = load_ra3_maps(ra3_maps_path) if current_map in ra3_maps: print(f"[DEBUG] Found override for: {current_map}. Advancing track.") is_ra3_map = True next_track() else: print(f"[DEBUG] Unknown map: {current_map}. Stopping playback.") is_ra3_map = False threading.Timer(.3, lambda: stop_playback()).start() #stop_playback() def send_command(proc, cmd): global master_fd #subprocess version try: if os.name == "nt": proc.stdin.write((cmd + "\r\n").encode()) proc.stdin.flush() else: os.write(master_fd, (cmd + "\n").encode()) except Exception as e: print(f"[DEBUG] Failed to send command: {e}") # ========================================================== # ======================== MAIN =========================== def main(): global playlist, ra3_maps, pty_proc, playlist_path, ra3_maps_path, arena0_blacklist, arena0_bl_path, master_fd print(f"Loading Quake 3 Arena...") # Use debug paths if enabled game_exe = DEBUG_GAME_EXE if DEBUG else DEFAULT_GAME_EXE playlist_path = DEBUG_PLAYLIST if DEBUG else DEFAULT_PLAYLIST ra3_maps_path = DEBUG_RA3_MAPS if DEBUG else DEFAULT_RA3_MAPS arena0_bl_path = DEBUG_ARENA0_BL if DEBUG else DEFAULT_ARENA0_BL # Load playlist and map list playlist = load_playlist(playlist_path) ra3_maps = load_ra3_maps(ra3_maps_path) arena0_bl = load_arena0_blacklist(arena0_bl_path) # Initialize pygame mixer pygame.mixer.init() # Start playlist watcher thread playlist_thread = threading.Thread(target=playlist_watcher,args=(playlist_path,), daemon=True) playlist_thread.start() # Start track watcher thread watcher_thread = threading.Thread(target=track_watcher, daemon=True) watcher_thread.start() chosen_mod = None run_mod = [] if "fs_game" not in sys.argv[1:]: game_path = Path(game_exe) items = find_pk3_subfolders(game_path.parent) menu = TextMenu( items, width=60, height=20, title="Quake III Arena mod loader menu", border_fg="light red", border_bg="black", inside_fg="dark red", inside_bg="black", selected_fg="black", selected_bg="light red", timeout=10 ) choice=menu.show() if choice != None: chosen_mod = choice.split("\n", 1)[0] if chosen_mod != None: run_mod = ["+set", "fs_game", chosen_mod] if os.name == "nt": pty_proc = subprocess.Popen( [game_exe, "+set", "ttycon", "1"] + run_mod + sys.argv[1:], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=0, universal_newlines=False ) master_fd = None else: master_fd, slave_fd = pty.openpty() pty_proc = subprocess.Popen( [game_exe, "+set", "ttycon", "1"] + run_mod + sys.argv[1:], stdin=slave_fd, stdout=slave_fd, stderr=slave_fd, close_fds=True ) os.close(slave_fd) # Monitor the game output try: if os.name == "nt": monitor_game(pty_proc) else: monitor_game_pty(master_fd) except KeyboardInterrupt: print("Exiting...") finally: stop_flag.set() stop_playback() if pty_proc: try: pty_proc.stdin.close() except: pass try: pty_proc.stdout.close() except: pass try: pty_proc.terminate() except: pass try: pty_proc.wait(timeout=5) except: pass pygame.mixer.quit() _RELAUNCHED_ENV = "Q3A_LAUNCHED_IN_TERMINAL" def has_terminal(): try: return sys.stdin.isatty() and sys.stdout.isatty() except Exception: return False def relaunch_in_terminal(): # Prevent infinite relaunch loops if os.environ.get(_RELAUNCHED_ENV) == "1": return env = os.environ.copy() env[_RELAUNCHED_ENV] = "1" argv = [os.path.abspath(sys.argv[0])] + sys.argv[1:] # --- 1. Preferred: xdg-terminal-exec (Bazzite / Fedora Atomic / Wayland) --- if shutil.which("xdg-terminal-exec"): try: subprocess.Popen( ["xdg-terminal-exec"] + argv, env=env ) sys.exit(0) except Exception: pass # fall through to legacy terminals # --- 2. Legacy fallback terminals --- terminals = [ ("konsole", ["--hold", "-e"]), ("gnome-terminal", ["--", "bash", "-c"]), ("xterm", ["-hold", "-e"]), ("kitty", ["-e"]), ("alacritty", ["-e"]), ] for term, args in terminals: if not shutil.which(term): continue try: if term == "gnome-terminal": cmd = [term] + args + [ f'"{" ".join(argv)}"; exec bash' ] else: cmd = [term] + args + argv subprocess.Popen(cmd, env=env) sys.exit(0) except Exception: continue # --- 3. Last-resort failure --- sys.stderr.write( "Unable to open a terminal window.\n" "Please run this application from a terminal.\n" ) sys.exit(1) if __name__ == "__main__": if (os.name != "nt"): if not has_terminal(): relaunch_in_terminal() lock_fd, lock_path = acquire_single_instance("q3a_launcher.lock") if lock_fd is None: print("Another instance is already running.") sys.exit(1) try: main() finally: release_single_instance(lock_fd, lock_path) # ==========================================================