From e06dc938cfa5a610004ad4f70a22e09827b42149 Mon Sep 17 00:00:00 2001 From: edschuy95 Date: Thu, 8 Jan 2026 15:41:23 -0500 Subject: [PATCH] Making this into a mod agnostic launcher that activates mp3 playback specifically when RA3 is detected. --- RA3MP3Playback.py | 381 +++++++++++++----------------------------- RA3MP3Playback.pyproj | 4 + 2 files changed, 119 insertions(+), 266 deletions(-) diff --git a/RA3MP3Playback.py b/RA3MP3Playback.py index da934a8..823c0fe 100644 --- a/RA3MP3Playback.py +++ b/RA3MP3Playback.py @@ -1,39 +1,20 @@ -from math import fabs +from multiprocessing.heap import Arena +from pathlib import Path import threading import time import os import sys import random import re -import tkinter as tk -#import psutil -from pathlib import Path import queue -from tkinter import CURRENT -from pathlib import Path - -os.environ["PYGAME_HIDE_SUPPORT_PROMPT"] = "1" - -import pygame import readchar -import winpty.ptyprocess import winpty import ctypes -from ctypes import wintypes -# Win32 constants -SPI_GETMOUSE = 0x0003 -SPI_SETMOUSE = 0x0004 -SPIF_SENDCHANGE = 0x02 +os.environ["PYGAME_HIDE_SUPPORT_PROMPT"] = "1" +import pygame -# Console event constants -CTRL_C_EVENT = 0 -CTRL_BREAK_EVENT = 1 -CTRL_CLOSE_EVENT = 2 -CTRL_LOGOFF_EVENT = 5 -CTRL_SHUTDOWN_EVENT = 6 - -user32 = ctypes.WinDLL("user32", use_last_error=True) +#user32 = ctypes.WinDLL("user32", use_last_error=True) kernel32 = ctypes.WinDLL("kernel32", use_last_error=True) # Event to signal shutdown @@ -41,16 +22,6 @@ shutdown_event = threading.Event() ERROR_ALREADY_EXISTS = 183 -SystemParametersInfo = user32.SystemParametersInfoW -SystemParametersInfo.argtypes = ( - wintypes.UINT, # uiAction - wintypes.UINT, # uiParam - ctypes.POINTER(ctypes.c_int), # pvParam (int[3]) - wintypes.UINT # fWinIni -) -SystemParametersInfo.restype = wintypes.BOOL - - # ========================= CONFIG ========================= DEBUG = True # Set True to enable debug overrides @@ -60,7 +31,7 @@ DEFAULT_PLAYLIST = r".\arena\music\playlist.txt" DEFAULT_RA3_MAPS = r".\arena\music\ra3_maps.txt" # Debug override paths -DEBUG_GAME_EXE = r"D:\GOG Games\Quake III\quake3e.exe" +DEBUG_GAME_EXE = r"D:\GOG Games\Quake III\quake3e_con.exe" DEBUG_PLAYLIST = r"D:\GOG Games\Rocket Arena 3\arena\music\playlist.txt" DEBUG_RA3_MAPS = r"D:\GOG Games\Rocket Arena 3\arena\music\ra3_maps.txt" @@ -70,6 +41,12 @@ VOLUME_STEP = 0.1 # step for W/S volume control # ====================== GLOBAL STATE ===================== volumecheck = False +gametypecheck = False +arenacheck = False +last_arena0 = "undefined" +is_ra3 = False +is_ra3_map = False +is_in_map = False ra3_maps_path = "." playlist = [] playlist_index = 0 @@ -78,13 +55,11 @@ volume = 0.5 is_playing = False stop_flag = threading.Event() serverstatus_sent = False -map_checked = False current_map = "nomap" current_song = "nosong" playlist_path = "undefined" ra3_maps = set() song_queue = queue.Queue() -WasPointerPrecisionEnabled = False ANSI_ESCAPE_RE = re.compile( r''' \x1B(?: @@ -101,98 +76,6 @@ ANSI_ESCAPE_RE = re.compile( # ========================================================== # ===================== UTILITY FUNCTIONS ================= -def console_handler(ctrl_type): - print(f"Control event: {ctrl_type}") - if ctrl_type == CTRL_CLOSE_EVENT: - print("\n[X] Console close requested! Signaling cleanup...") - shutdown_event.set() # Signal main thread to clean up - stop_playback() - pty_proc.close() - pygame.mixer.quit() - if WasPointerPrecisionEnabled: - set_pointer_precision(True) - return True # We've handled it - return False # Let other events pass - -# Convert Python function to C callback -HandlerRoutine = ctypes.WINFUNCTYPE(ctypes.c_bool, ctypes.c_uint) -handler = HandlerRoutine(console_handler) -ctypes.windll.kernel32.SetConsoleCtrlHandler(handler, True) - -def track_game_focus( game_pid, poll_interval=0.1, debounce_time=0.3,): - """ - Tracks focus for a fullscreen game by PID. - - - Runs code ONLY when focus state changes - - Handles fullscreen window recreation - - Debounces rapid transitions - """ - global WasPointerPrecisionEnabled, volume - - user32 = ctypes.WinDLL("user32", use_last_error=True) - - user32.GetForegroundWindow.restype = wintypes.HWND - user32.GetWindowThreadProcessId.argtypes = ( - wintypes.HWND, - ctypes.POINTER(wintypes.DWORD), - ) - user32.GetWindowThreadProcessId.restype = wintypes.DWORD - - focused = False - last_state_change = 0.0 - - while True: - hwnd = user32.GetForegroundWindow() - pid = wintypes.DWORD() - is_focused = False - - if hwnd: - user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid)) - is_focused = (pid.value == game_pid) - - now = time.monotonic() - - # ---------- STATE CHANGE DETECTION ---------- - if is_focused != focused: - # Debounce to avoid flicker during fullscreen transitions - if now - last_state_change >= debounce_time: - focused = is_focused - last_state_change = now - - if focused: - print("[DEBUG] Game gained focus") - #print("[DEBUG] Restoring music volume") - #pygame.mixer.music.set_volume(volume) - if WasPointerPrecisionEnabled: - print("[DEBUG] Disabling mouse pointer precision") - set_pointer_precision(False) - - # Example: - # set_pointer_precision(False) - # pause_background_tasks() - # lower_system_volume() - - else: - # ========================================== - # FOCUS LOST (focused -> unfocused) - # ADD YOUR "FOCUS LOST" CODE HERE - # ========================================== - print("[DEBUG] Game lost focus") - #print("[DEBUG] Muting music") - #pygame.mixer.music.set_volume(0) - if WasPointerPrecisionEnabled: - print("[DEBUG] Restoring mouse pointer precision") - set_pointer_precision(True) - - - # Example: - # set_pointer_precision(True) - # resume_background_tasks() - # restore_system_volume() - # -------------------------------------------- - - time.sleep(poll_interval) - def acquire_single_instance(name: str): handle = kernel32.CreateMutexW( None, # lpMutexAttributes @@ -209,47 +92,6 @@ def acquire_single_instance(name: str): return handle -def is_pointer_precision_enabled(): - """ - Returns True if Windows 'Enhance pointer precision' is enabled. - """ - mouse_params = (ctypes.c_int * 3)() - - if not SystemParametersInfo( - SPI_GETMOUSE, - 0, - mouse_params, - 0 - ): - raise ctypes.WinError(ctypes.get_last_error()) - - # mouse_params[2]: 0 = off, 1 = on - return mouse_params[2] != 0 - -def set_pointer_precision(enabled: bool): - """ - Enables or disables Windows 'Enhance pointer precision'. - """ - mouse_params = (ctypes.c_int * 3)() - - if not SystemParametersInfo( - SPI_GETMOUSE, - 0, - mouse_params, - 0 - ): - raise ctypes.WinError(ctypes.get_last_error()) - - mouse_params[2] = 1 if enabled else 0 - - if not SystemParametersInfo( - SPI_SETMOUSE, - 0, - mouse_params, - SPIF_SENDCHANGE - ): - raise ctypes.WinError(ctypes.get_last_error()) - def strip_ansi(text: str) -> str: return ANSI_ESCAPE_RE.sub("", text) @@ -382,7 +224,6 @@ def previous_track(): play_current() def play_current(): - #send_command(pty_proc,"s_musicVolume") global is_playing, current_song if not playlist: return @@ -400,7 +241,6 @@ def play_current(): threading.Timer(.1, lambda: send_command(pty_proc,f"echo\r\necho\r\necho\r\necho Playing: {current_song}")).start() except Exception as e: print(f"[DEBUG] Couldn't start MP3 player': {e}") - #send_command(pty_proc,f"echo Playing: {Path(track).stem}") def stop_playback(): global is_playing @@ -438,7 +278,7 @@ def change_mode(): # ==================== QUAKE MONITOR ====================== def monitor_game(pty_proc): - global serverstatus_sent, map_checked + global serverstatus_sent buffer = "" @@ -467,71 +307,107 @@ def monitor_game(pty_proc): def handle_game_line(line, pty_proc): - global volume, current_map, ra3_maps, ra3_maps_path, volumecheck - global serverstatus_sent, map_checked - if "--- Common Initialization Complete ---" in line: - volumecheck = True - threading.Timer(1.0, lambda: send_command(pty_proc,"s_musicVolume")).start() - if current_mode == "shuffle": - global playlist_index - playlist_index = random.randint(0, len(playlist) - 1) - threading.Timer(2.0, play_current).start() - elif line.startswith("\"s_musicVolume\"") and volumecheck == True: - volumecheck = False - svolume = parse_music_volume(line) - if svolume > 0: - if volume != svolume: - volume = svolume - pygame.mixer.music.set_volume(volume) - print(f"[DEBUG] Set music volume to {volume} by game client.") - elif line.startswith("]\\nexttrack"): - ra3_maps = load_ra3_maps(ra3_maps_path) - if current_map in ra3_maps: - next_track() - else: - threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start() - elif line.startswith("]\\prevtrack"): - ra3_maps = load_ra3_maps(ra3_maps_path) - if current_map in ra3_maps: - previous_track() - else: - threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start() - elif line.startswith("]\\pausetrack"): - ra3_maps = load_ra3_maps(ra3_maps_path) - if current_map in ra3_maps: - toggle_pause() - else: - threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start() - elif line.startswith("]\\musicmode"): - ra3_maps = load_ra3_maps(ra3_maps_path) - if current_map in ra3_maps: - change_mode() - else: - threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start() - elif line.startswith("Com_TouchMemory:"): - threading.Timer(1.0, lambda: send_command_and_mark(pty_proc)).start() - elif "mapname" in line.lower() and serverstatus_sent and not map_checked: - current_map = line.split()[-1].lower() - map_checked = True - ra3_maps = load_ra3_maps(ra3_maps_path) - if current_map in ra3_maps: - print(f"[DEBUG] Known map: {current_map}. Advancing track.") - next_track() - else: - print(f"[DEBUG] Unknown map: {current_map}. Stopping playback.") - stop_playback() + global volume, current_map, ra3_maps, ra3_maps_path, volumecheck, is_ra3, gametypecheck, is_playing, arenacheck, is_in_map, is_ra3_map, last_arena0 + global serverstatus_sent -def send_command_and_mark(pty_proc): - global serverstatus_sent, map_checked - send_command(pty_proc, "serverstatus") - serverstatus_sent = True - map_checked = False + if is_ra3 == False or is_in_map == False: is_ra3_map = False + + #Loading vm file vm/ui.qvm... + #if "Sound initialization successful." in line: + if "Loading vm file vm/ui.qvm..." in line: + gametypecheck = True + is_in_map = False + threading.Timer(.3, lambda: send_command(pty_proc,"fs_game")).start() + elif "\"fs_game\" is:\"" in line and gametypecheck: + gametypecheck = False + if "\"fs_game\" is:\"arena" in line: + if is_ra3 == False: + print(f"[DEBUG] Active mod is Rocket Arena 3 - enabling music playback") + is_ra3 = True + volumecheck = True + threading.Timer(1.0, lambda: send_command(pty_proc,"s_musicVolume")).start() + if current_mode == "shuffle": + global playlist_index + playlist_index = random.randint(0, len(playlist) - 1) + threading.Timer(2.0, play_current).start() + else: + if not is_playing: + next_track() + else: + print(f"[DEBUG] Active mod is NOT Rocket Arena 3 - disabling music playback") + is_ra3 = False + if is_playing: + stop_playback() + elif line.startswith("]\\showvars"): + threading.Timer(.1, lambda: send_command(pty_proc,f"echo is_ra3 = {is_ra3}\r\necho is_in_map = {is_in_map}\r\necho is_ra3_map = {is_ra3_map}")).start() + + + if is_ra3: + if line.startswith("\"s_musicVolume\"") and volumecheck == True: + volumecheck = False + svolume = parse_music_volume(line) + if svolume > 0: + if volume != svolume: + volume = svolume + pygame.mixer.music.set_volume(volume) + print(f"[DEBUG] Set music volume to {volume} by game client.") + + elif line.startswith("]\\nexttrack"): + if (is_in_map and is_ra3_map) or (is_in_map == False): + next_track() + else: + threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start() + elif line.startswith("]\\prevtrack"): + if (is_in_map and is_ra3_map) or (is_in_map == False): + previous_track() + else: + threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start() + elif line.startswith("]\\pausetrack"): + if (is_in_map and is_ra3_map) or (is_in_map == False): + toggle_pause() + else: + threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start() + elif line.startswith("]\\musicmode"): + if (is_in_map and is_ra3_map) or (is_in_map == False): + change_mode() + else: + threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start() + elif line.startswith("Com_TouchMemory:"): + is_in_map = True + arenacheck = True + threading.Timer(.5, lambda: send_command(pty_proc,f"arena0")).start() + elif "\"arena0\" is:" in line and arenacheck: + if line == last_arena0: + same_map = True + else: + same_map = False + last_arena0 = line + arenacheck = False + if line.startswith("\"arena0\" is:\"Arena Number 1"): + print(f"[DEBUG] Not an RA3 map. Checking for override.") + serverstatus_sent = True + threading.Timer(.1, lambda: send_command(pty_proc,f"serverstatus")).start() + else: + if not same_map: + print(f"[DEBUG] RA3 map detected. Advancing track.") + is_ra3_map = True + next_track() + elif "mapname" in line.lower() and serverstatus_sent: + serverstatus_sent = False + current_map = line.split()[-1].lower() + ra3_maps = load_ra3_maps(ra3_maps_path) + if current_map in ra3_maps: + print(f"[DEBUG] Found override for: {current_map}. Advancing track.") + is_ra3_map = True + next_track() + else: + print(f"[DEBUG] Unknown map: {current_map}. Stopping playback.") + is_ra3_map = False + stop_playback() def send_command(pty_proc, cmd): try: - #pty_proc.write(("\x1bOF\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f" + cmd + "\n")) pty_proc.write(cmd + "\r\n") - #pty_proc.write((cmd + "\n").encode('utf-8')) pty_proc.flush() print(f"[DEBUG] Sent command: {cmd}") except Exception as e: @@ -554,15 +430,15 @@ def keyboard_listener(): # ======================== MAIN =========================== def main(): - mutex = acquire_single_instance("Global\\Ra3WithMp3") + mutex = acquire_single_instance("Global\\q3aLauncherAndMp3Playback") if mutex is None: print("Another instance is already running.") sys.exit(1) - global playlist, ra3_maps, pty_proc, playlist_path, ra3_maps_path, WasPointerPrecisionEnabled + global playlist, ra3_maps, pty_proc, playlist_path, ra3_maps_path - print(f"Loading Rocket Arena 3 with MP3 playback.") + print(f"Loading Quake 3 Arena...") # Use debug paths if enabled game_exe = DEBUG_GAME_EXE if DEBUG else DEFAULT_GAME_EXE @@ -594,39 +470,14 @@ def main(): # Start track watcher thread watcher_thread = threading.Thread(target=track_watcher, daemon=True) watcher_thread.start() - - root = tk.Tk() - root.withdraw() - - screenWidth = root.winfo_screenwidth() - screenHeight = root.winfo_screenheight() - - argsProc = sys.argv[1:] - args_lower = [arg.lower() for arg in argsProc] - - #args_for_game = ["com_skipIntroVideo", "1", "-skipmovies"] + sys.argv[1:] - #args_for_game = [""] - #if "--no_match_res" in args_lower: - # args_for_game = ["com_skipIntroVideo", "1", "-skipmovies"] + sys.argv[1:] - #else: - # args_for_game = ["+set", "r_mode", "-1", "+set", "r_customWidth", f"{screenWidth}", "+set", "r_customHeight", f"{screenHeight}", "+set", "com_skipIntroVideo", "1", "-skipmovies"] + sys.argv[1:] - - #WasPointerPrecisionEnabled=is_pointer_precision_enabled() - - #if WasPointerPrecisionEnabled: - # set_pointer_precision(False) - + # Launch quake process via PTY try: - pty_proc = winpty.PtyProcess.spawn([game_exe] + sys.argv[1:] + ["+set", "fs_game", "arena", "+exec", "music_keys.cfg"]) + pty_proc = winpty.PtyProcess.spawn([game_exe, "--showterminalconsole", "+exec", "music_keys.cfg"] + sys.argv[1:] ) except Exception as e: - print(f"Failed to start game via PTY: {e}") + print(f"Failed to start game: {e}") return - game_pid = pty_proc.pid - - #threading.Thread(target=track_game_focus, args=(game_pid,), daemon=True).start() - # Monitor the game output try: monitor_game(pty_proc) @@ -637,8 +488,6 @@ def main(): stop_playback() pty_proc.close() pygame.mixer.quit() - if WasPointerPrecisionEnabled: - set_pointer_precision(True) if __name__ == "__main__": main() diff --git a/RA3MP3Playback.pyproj b/RA3MP3Playback.pyproj index 845159f..0198969 100644 --- a/RA3MP3Playback.pyproj +++ b/RA3MP3Playback.pyproj @@ -11,6 +11,10 @@ . RA3MP3Playback RA3MP3Playback + Standard Python launcher + + + False true