from math import fabs import threading import time import os import sys import random import re import tkinter as tk #import psutil from pathlib import Path import queue from tkinter import CURRENT from pathlib import Path os.environ["PYGAME_HIDE_SUPPORT_PROMPT"] = "1" import pygame import readchar import winpty.ptyprocess import winpty import ctypes from ctypes import wintypes # Win32 constants SPI_GETMOUSE = 0x0003 SPI_SETMOUSE = 0x0004 SPIF_SENDCHANGE = 0x02 # Console event constants CTRL_C_EVENT = 0 CTRL_BREAK_EVENT = 1 CTRL_CLOSE_EVENT = 2 CTRL_LOGOFF_EVENT = 5 CTRL_SHUTDOWN_EVENT = 6 user32 = ctypes.WinDLL("user32", use_last_error=True) kernel32 = ctypes.WinDLL("kernel32", use_last_error=True) # Event to signal shutdown shutdown_event = threading.Event() ERROR_ALREADY_EXISTS = 183 SystemParametersInfo = user32.SystemParametersInfoW SystemParametersInfo.argtypes = ( wintypes.UINT, # uiAction wintypes.UINT, # uiParam ctypes.POINTER(ctypes.c_int), # pvParam (int[3]) wintypes.UINT # fWinIni ) SystemParametersInfo.restype = wintypes.BOOL # ========================= CONFIG ========================= DEBUG = True # Set True to enable debug overrides # Default paths (used if not in debug mode) DEFAULT_GAME_EXE = r".\RA3game.dll" DEFAULT_PLAYLIST = r".\arena\music\playlist.txt" DEFAULT_RA3_MAPS = r".\arena\music\ra3_maps.txt" # Debug override paths DEBUG_GAME_EXE = r"D:\GOG Games\Quake III\quake3e.exe" DEBUG_PLAYLIST = r"D:\GOG Games\Rocket Arena 3\arena\music\playlist.txt" DEBUG_RA3_MAPS = r"D:\GOG Games\Rocket Arena 3\arena\music\ra3_maps.txt" # Initial volume VOLUME_STEP = 0.1 # step for W/S volume control # ========================================================== # ====================== GLOBAL STATE ===================== volumecheck = False ra3_maps_path = "." playlist = [] playlist_index = 0 current_mode = "shuffle" # sequential, shuffle, loop volume = 0.5 is_playing = False stop_flag = threading.Event() serverstatus_sent = False map_checked = False current_map = "nomap" current_song = "nosong" playlist_path = "undefined" ra3_maps = set() song_queue = queue.Queue() WasPointerPrecisionEnabled = False ANSI_ESCAPE_RE = re.compile( r''' \x1B(?: [78] # ESC 7, ESC 8 (DEC save/restore) | \[ [0-?]* [ -/]* [@-~] # CSI sequences | \] .*? (?:\x07|\x1B\\) # OSC sequences | [@-Z\\-_] # Other 7-bit C1 controls ) ''', re.VERBOSE ) # ========================================================== # ===================== UTILITY FUNCTIONS ================= def console_handler(ctrl_type): print(f"Control event: {ctrl_type}") if ctrl_type == CTRL_CLOSE_EVENT: print("\n[X] Console close requested! Signaling cleanup...") shutdown_event.set() # Signal main thread to clean up stop_playback() pty_proc.close() pygame.mixer.quit() if WasPointerPrecisionEnabled: set_pointer_precision(True) return True # We've handled it return False # Let other events pass # Convert Python function to C callback HandlerRoutine = ctypes.WINFUNCTYPE(ctypes.c_bool, ctypes.c_uint) handler = HandlerRoutine(console_handler) ctypes.windll.kernel32.SetConsoleCtrlHandler(handler, True) def track_game_focus( game_pid, poll_interval=0.1, debounce_time=0.3,): """ Tracks focus for a fullscreen game by PID. - Runs code ONLY when focus state changes - Handles fullscreen window recreation - Debounces rapid transitions """ global WasPointerPrecisionEnabled, volume user32 = ctypes.WinDLL("user32", use_last_error=True) user32.GetForegroundWindow.restype = wintypes.HWND user32.GetWindowThreadProcessId.argtypes = ( wintypes.HWND, ctypes.POINTER(wintypes.DWORD), ) user32.GetWindowThreadProcessId.restype = wintypes.DWORD focused = False last_state_change = 0.0 while True: hwnd = user32.GetForegroundWindow() pid = wintypes.DWORD() is_focused = False if hwnd: user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid)) is_focused = (pid.value == game_pid) now = time.monotonic() # ---------- STATE CHANGE DETECTION ---------- if is_focused != focused: # Debounce to avoid flicker during fullscreen transitions if now - last_state_change >= debounce_time: focused = is_focused last_state_change = now if focused: print("[DEBUG] Game gained focus") #print("[DEBUG] Restoring music volume") #pygame.mixer.music.set_volume(volume) if WasPointerPrecisionEnabled: print("[DEBUG] Disabling mouse pointer precision") set_pointer_precision(False) # Example: # set_pointer_precision(False) # pause_background_tasks() # lower_system_volume() else: # ========================================== # FOCUS LOST (focused -> unfocused) # ADD YOUR "FOCUS LOST" CODE HERE # ========================================== print("[DEBUG] Game lost focus") #print("[DEBUG] Muting music") #pygame.mixer.music.set_volume(0) if WasPointerPrecisionEnabled: print("[DEBUG] Restoring mouse pointer precision") set_pointer_precision(True) # Example: # set_pointer_precision(True) # resume_background_tasks() # restore_system_volume() # -------------------------------------------- time.sleep(poll_interval) def acquire_single_instance(name: str): handle = kernel32.CreateMutexW( None, # lpMutexAttributes False, # bInitialOwner name # lpName ) if not handle: raise ctypes.WinError(ctypes.get_last_error()) if kernel32.GetLastError() == ERROR_ALREADY_EXISTS: kernel32.CloseHandle(handle) return None return handle def is_pointer_precision_enabled(): """ Returns True if Windows 'Enhance pointer precision' is enabled. """ mouse_params = (ctypes.c_int * 3)() if not SystemParametersInfo( SPI_GETMOUSE, 0, mouse_params, 0 ): raise ctypes.WinError(ctypes.get_last_error()) # mouse_params[2]: 0 = off, 1 = on return mouse_params[2] != 0 def set_pointer_precision(enabled: bool): """ Enables or disables Windows 'Enhance pointer precision'. """ mouse_params = (ctypes.c_int * 3)() if not SystemParametersInfo( SPI_GETMOUSE, 0, mouse_params, 0 ): raise ctypes.WinError(ctypes.get_last_error()) mouse_params[2] = 1 if enabled else 0 if not SystemParametersInfo( SPI_SETMOUSE, 0, mouse_params, SPIF_SENDCHANGE ): raise ctypes.WinError(ctypes.get_last_error()) def strip_ansi(text: str) -> str: return ANSI_ESCAPE_RE.sub("", text) def playlist_watcher(path, poll_interval=1.0): global playlist try: last_mtime = os.path.getmtime(path) except FileNotFoundError: last_mtime = 0 while not stop_flag.is_set(): try: current_mtime = os.path.getmtime(path) if current_mtime != last_mtime: last_mtime = current_mtime print("[DEBUG] Playlist file changed. Reloading playlist.") playlist = load_playlist(path) except FileNotFoundError: pass # Playlist temporarily missing; ignore time.sleep(poll_interval) def track_watcher(): global is_playing last_busy = False while not stop_flag.is_set(): busy = pygame.mixer.music.get_busy() if last_busy and not busy and is_playing: # Track just finished print("[DEBUG] Track finished. Advancing to next track.") next_track() last_busy = busy time.sleep(0.1) def parse_music_volume(line: str) -> float: # Remove all ^ color codes clean_line = re.sub(r'\^\d', '', line) # Extract the number inside quotes after "is:" match = re.search(r'is\s*:\s*"(.*?)"', clean_line, re.IGNORECASE) if not match: #raise ValueError(f"Could not parse volume from line: {line}") value_str = 0 else: value_str = match.group(1) try: return float(value_str) except ValueError: raise ValueError(f"Invalid float value for volume: {value_str}") def load_playlist(playlist_path): global playlist_index playlist_path = Path(playlist_path).resolve() base_dir = playlist_path.parent songs = [] with open(playlist_path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue if line.startswith("#"): continue song_path = Path(line) # If the playlist entry is relative, resolve it relative to playlist.txt if not song_path.is_absolute(): song_path = base_dir / song_path songs.append(str(song_path)) if current_mode == "shuffle": random.shuffle(songs) # Re-align song that's playing to new index if current_song != "nosong": _song_index = 0 _found_song = False for s in songs: _song_eval = Path(s).stem if current_song == _song_eval: _found_song = True playlist_index = _song_index break _song_index += 1 if not _found_song: playlist_index = 0 play_current() return songs def load_ra3_maps(path): maps = set() try: with open(path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if line: maps.add(line.lower()) except PermissionError: # File is locked by another process (e.g., Notepad) # Decide how your app should behave return set() return maps def next_track(): global volumecheck volumecheck = True send_command(pty_proc,"s_musicvolume") global playlist_index if current_mode == "loop": pass # keep same index else: playlist_index = (playlist_index + 1) % len(playlist) play_current() def previous_track(): global volumecheck volumecheck = True send_command(pty_proc,"s_musicVolume") global playlist_index if current_mode == "loop": pass # keep same index else: playlist_index = (playlist_index - 1) % len(playlist) play_current() def play_current(): #send_command(pty_proc,"s_musicVolume") global is_playing, current_song if not playlist: return track = playlist[playlist_index] if not os.path.isfile(track): print(f"[DEBUG] Track not found: {track}") return try: pygame.mixer.music.load(track) pygame.mixer.music.set_volume(volume) pygame.mixer.music.play() is_playing = True current_song = Path(track).stem print(f"[DEBUG] Playing: {current_song} at volume {volume}") threading.Timer(.1, lambda: send_command(pty_proc,f"echo\r\necho\r\necho\r\necho Playing: {current_song}")).start() except Exception as e: print(f"[DEBUG] Couldn't start MP3 player': {e}") #send_command(pty_proc,f"echo Playing: {Path(track).stem}") def stop_playback(): global is_playing pygame.mixer.music.stop() is_playing = False def toggle_pause(): global volumecheck volumecheck = True send_command(pty_proc,"s_musicVolume") global is_playing if pygame.mixer.music.get_busy(): pygame.mixer.music.pause() is_playing = False else: pygame.mixer.music.unpause() is_playing = True def change_mode(): global current_mode, playlist_path, playlist modes = ["sequential", "shuffle", "loop"] idx = modes.index(current_mode) current_mode = modes[(idx + 1) % len(modes)] print(f"[DEBUG] Mode changed to: {current_mode}") send_command(pty_proc,f"echo _musicmode {current_mode}") playlist = load_playlist(playlist_path) #def adjust_volume(up=True): # global volume # volume = min(1.0, max(0.0, volume + (VOLUME_STEP if up else -VOLUME_STEP))) # pygame.mixer.music.set_volume(volume) # print(f"[DEBUG] Volume: {volume:.2f}") # ========================================================== # ==================== QUAKE MONITOR ====================== def monitor_game(pty_proc): global serverstatus_sent, map_checked buffer = "" while not stop_flag.is_set() or shutdown_event.is_set(): try: data = pty_proc.read(1024) if not data: break # Normalize to string if isinstance(data, bytes): data = data.decode(errors="ignore") buffer += data while "\n" in buffer: line, buffer = buffer.split("\n", 1) line = strip_ansi(line).strip() if line: #print(f"[GAME] {line}") #print(f"[GAME RAW] {repr(line)}") handle_game_line(line, pty_proc) except EOFError or KeyboardInterrupt: break def handle_game_line(line, pty_proc): global volume, current_map, ra3_maps, ra3_maps_path, volumecheck global serverstatus_sent, map_checked if "--- Common Initialization Complete ---" in line: volumecheck = True threading.Timer(1.0, lambda: send_command(pty_proc,"s_musicVolume")).start() if current_mode == "shuffle": global playlist_index playlist_index = random.randint(0, len(playlist) - 1) threading.Timer(2.0, play_current).start() elif line.startswith("\"s_musicVolume\"") and volumecheck == True: volumecheck = False svolume = parse_music_volume(line) if svolume > 0: if volume != svolume: volume = svolume pygame.mixer.music.set_volume(volume) print(f"[DEBUG] Set music volume to {volume} by game client.") elif line.startswith("]\\nexttrack"): ra3_maps = load_ra3_maps(ra3_maps_path) if current_map in ra3_maps: next_track() else: threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start() elif line.startswith("]\\prevtrack"): ra3_maps = load_ra3_maps(ra3_maps_path) if current_map in ra3_maps: previous_track() else: threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start() elif line.startswith("]\\pausetrack"): ra3_maps = load_ra3_maps(ra3_maps_path) if current_map in ra3_maps: toggle_pause() else: threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start() elif line.startswith("]\\musicmode"): ra3_maps = load_ra3_maps(ra3_maps_path) if current_map in ra3_maps: change_mode() else: threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start() elif line.startswith("Com_TouchMemory:"): threading.Timer(1.0, lambda: send_command_and_mark(pty_proc)).start() elif "mapname" in line.lower() and serverstatus_sent and not map_checked: current_map = line.split()[-1].lower() map_checked = True ra3_maps = load_ra3_maps(ra3_maps_path) if current_map in ra3_maps: print(f"[DEBUG] Known map: {current_map}. Advancing track.") next_track() else: print(f"[DEBUG] Unknown map: {current_map}. Stopping playback.") stop_playback() def send_command_and_mark(pty_proc): global serverstatus_sent, map_checked send_command(pty_proc, "serverstatus") serverstatus_sent = True map_checked = False def send_command(pty_proc, cmd): try: #pty_proc.write(("\x1bOF\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f" + cmd + "\n")) pty_proc.write(cmd + "\r\n") #pty_proc.write((cmd + "\n").encode('utf-8')) pty_proc.flush() print(f"[DEBUG] Sent command: {cmd}") except Exception as e: print(f"[DEBUG] Failed to send command: {e}") # ========================================================== # ==================== KEYBOARD HANDLER =================== def keyboard_listener(): while not stop_flag.is_set(): key = readchar.readkey() if key.lower() == '[': previous_track() elif key.lower() == ']': next_track() elif key == '\'': toggle_pause() elif key == '\\': change_mode() # ========================================================== # ======================== MAIN =========================== def main(): mutex = acquire_single_instance("Global\\Ra3WithMp3") if mutex is None: print("Another instance is already running.") sys.exit(1) global playlist, ra3_maps, pty_proc, playlist_path, ra3_maps_path, WasPointerPrecisionEnabled print(f"Loading Rocket Arena 3 with MP3 playback.") # Use debug paths if enabled game_exe = DEBUG_GAME_EXE if DEBUG else DEFAULT_GAME_EXE playlist_path = DEBUG_PLAYLIST if DEBUG else DEFAULT_PLAYLIST ra3_maps_path = DEBUG_RA3_MAPS if DEBUG else DEFAULT_RA3_MAPS # Load playlist and map list playlist = load_playlist(playlist_path) ra3_maps = load_ra3_maps(ra3_maps_path) if not playlist: print("Playlist is empty!") return if not ra3_maps: print("RA3 maps list is empty!") return # Initialize pygame mixer pygame.mixer.init() # Start playlist watcher thread playlist_thread = threading.Thread(target=playlist_watcher,args=(playlist_path,), daemon=True) playlist_thread.start() # Start keyboard listener thread kb_thread = threading.Thread(target=keyboard_listener, daemon=True) kb_thread.start() # Start track watcher thread watcher_thread = threading.Thread(target=track_watcher, daemon=True) watcher_thread.start() root = tk.Tk() root.withdraw() screenWidth = root.winfo_screenwidth() screenHeight = root.winfo_screenheight() argsProc = sys.argv[1:] args_lower = [arg.lower() for arg in argsProc] #args_for_game = ["com_skipIntroVideo", "1", "-skipmovies"] + sys.argv[1:] #args_for_game = [""] #if "--no_match_res" in args_lower: # args_for_game = ["com_skipIntroVideo", "1", "-skipmovies"] + sys.argv[1:] #else: # args_for_game = ["+set", "r_mode", "-1", "+set", "r_customWidth", f"{screenWidth}", "+set", "r_customHeight", f"{screenHeight}", "+set", "com_skipIntroVideo", "1", "-skipmovies"] + sys.argv[1:] #WasPointerPrecisionEnabled=is_pointer_precision_enabled() #if WasPointerPrecisionEnabled: # set_pointer_precision(False) # Launch quake process via PTY try: pty_proc = winpty.PtyProcess.spawn([game_exe] + sys.argv[1:] + ["+set", "fs_game", "arena", "+exec", "music_keys.cfg"]) except Exception as e: print(f"Failed to start game via PTY: {e}") return game_pid = pty_proc.pid #threading.Thread(target=track_game_focus, args=(game_pid,), daemon=True).start() # Monitor the game output try: monitor_game(pty_proc) except KeyboardInterrupt: print("Exiting...") finally: stop_flag.set() stop_playback() pty_proc.close() pygame.mixer.quit() if WasPointerPrecisionEnabled: set_pointer_precision(True) if __name__ == "__main__": main() # ==========================================================