From 90fd0cd57194a24bc3bec143f9c95fc4a6b84376 Mon Sep 17 00:00:00 2001 From: edschuy95 Date: Sun, 4 Jan 2026 23:59:47 -0500 Subject: [PATCH] Getting close to a complete app. --- RA3MP3Playback.py | 243 +++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 229 insertions(+), 14 deletions(-) diff --git a/RA3MP3Playback.py b/RA3MP3Playback.py index b2e4399..a38ce16 100644 --- a/RA3MP3Playback.py +++ b/RA3MP3Playback.py @@ -1,3 +1,4 @@ +from math import fabs import threading import time import os @@ -5,6 +6,7 @@ import sys import random import re import tkinter as tk +import psutil from pathlib import Path import queue from tkinter import CURRENT @@ -16,6 +18,37 @@ import pygame import readchar import winpty.ptyprocess import winpty +import ctypes +from ctypes import wintypes + +# Win32 constants +SPI_GETMOUSE = 0x0003 +SPI_SETMOUSE = 0x0004 +SPIF_SENDCHANGE = 0x02 + +# Console event constants +CTRL_C_EVENT = 0 +CTRL_BREAK_EVENT = 1 +CTRL_CLOSE_EVENT = 2 +CTRL_LOGOFF_EVENT = 5 +CTRL_SHUTDOWN_EVENT = 6 + +user32 = ctypes.WinDLL("user32", use_last_error=True) +kernel32 = ctypes.WinDLL("kernel32", use_last_error=True) + +# Event to signal shutdown +shutdown_event = threading.Event() + +ERROR_ALREADY_EXISTS = 183 + +SystemParametersInfo = user32.SystemParametersInfoW +SystemParametersInfo.argtypes = ( + wintypes.UINT, # uiAction + wintypes.UINT, # uiParam + ctypes.POINTER(ctypes.c_int), # pvParam (int[3]) + wintypes.UINT # fWinIni +) +SystemParametersInfo.restype = wintypes.BOOL # ========================= CONFIG ========================= @@ -36,6 +69,7 @@ VOLUME_STEP = 0.1 # step for W/S volume control # ========================================================== # ====================== GLOBAL STATE ===================== +ra3_maps_path = "." playlist = [] playlist_index = 0 current_mode = "shuffle" # sequential, shuffle, loop @@ -49,10 +83,11 @@ current_song = "nosong" playlist_path = "undefined" ra3_maps = set() song_queue = queue.Queue() +WasPointerPrecisionEnabled = False ANSI_ESCAPE_RE = re.compile( r''' \x1B(?: - [78] # ESC 7, ESC 8 (DEC save/restore) + [78] # ESC 7, ESC 8 (DEC save/restore) | \[ [0-?]* [ -/]* [@-~] # CSI sequences | \] .*? (?:\x07|\x1B\\) # OSC sequences | [@-Z\\-_] # Other 7-bit C1 controls @@ -65,6 +100,150 @@ ANSI_ESCAPE_RE = re.compile( # ========================================================== # ===================== UTILITY FUNCTIONS ================= +def console_handler(ctrl_type): + print(f"Control event: {ctrl_type}") + if ctrl_type == CTRL_CLOSE_EVENT: + print("\n[X] Console close requested! Signaling cleanup...") + shutdown_event.set() # Signal main thread to clean up + return True # We've handled it + return False # Let other events pass + +# Convert Python function to C callback +HandlerRoutine = ctypes.WINFUNCTYPE(ctypes.c_bool, ctypes.c_uint) +handler = HandlerRoutine(console_handler) +ctypes.windll.kernel32.SetConsoleCtrlHandler(handler, True) + +def track_game_focus( game_pid, poll_interval=0.1, debounce_time=0.3,): + """ + Tracks focus for a fullscreen game by PID. + + - Runs code ONLY when focus state changes + - Handles fullscreen window recreation + - Debounces rapid transitions + """ + global WasPointerPrecisionEnabled, volume + + user32 = ctypes.WinDLL("user32", use_last_error=True) + + user32.GetForegroundWindow.restype = wintypes.HWND + user32.GetWindowThreadProcessId.argtypes = ( + wintypes.HWND, + ctypes.POINTER(wintypes.DWORD), + ) + user32.GetWindowThreadProcessId.restype = wintypes.DWORD + + focused = False + last_state_change = 0.0 + + while True: + hwnd = user32.GetForegroundWindow() + pid = wintypes.DWORD() + is_focused = False + + if hwnd: + user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid)) + is_focused = (pid.value == game_pid) + + now = time.monotonic() + + # ---------- STATE CHANGE DETECTION ---------- + if is_focused != focused: + # Debounce to avoid flicker during fullscreen transitions + if now - last_state_change >= debounce_time: + focused = is_focused + last_state_change = now + + if focused: + print("[DEBUG] Game gained focus") + #print("[DEBUG] Restoring music volume") + #pygame.mixer.music.set_volume(volume) + if WasPointerPrecisionEnabled: + print("[DEBUG] Disabling mouse pointer precision") + set_pointer_precision(False) + + # Example: + # set_pointer_precision(False) + # pause_background_tasks() + # lower_system_volume() + + else: + # ========================================== + # FOCUS LOST (focused -> unfocused) + # ADD YOUR "FOCUS LOST" CODE HERE + # ========================================== + print("[DEBUG] Game lost focus") + #print("[DEBUG] Muting music") + #pygame.mixer.music.set_volume(0) + if WasPointerPrecisionEnabled: + print("[DEBUG] Restoring mouse pointer precision") + set_pointer_precision(True) + + + # Example: + # set_pointer_precision(True) + # resume_background_tasks() + # restore_system_volume() + # -------------------------------------------- + + time.sleep(poll_interval) + +def acquire_single_instance(name: str): + handle = kernel32.CreateMutexW( + None, # lpMutexAttributes + False, # bInitialOwner + name # lpName + ) + + if not handle: + raise ctypes.WinError(ctypes.get_last_error()) + + if kernel32.GetLastError() == ERROR_ALREADY_EXISTS: + kernel32.CloseHandle(handle) + return None + + return handle + +def is_pointer_precision_enabled(): + """ + Returns True if Windows 'Enhance pointer precision' is enabled. + """ + mouse_params = (ctypes.c_int * 3)() + + if not SystemParametersInfo( + SPI_GETMOUSE, + 0, + mouse_params, + 0 + ): + raise ctypes.WinError(ctypes.get_last_error()) + + # mouse_params[2]: 0 = off, 1 = on + return mouse_params[2] != 0 + +def set_pointer_precision(enabled: bool): + """ + Enables or disables Windows 'Enhance pointer precision'. + """ + mouse_params = (ctypes.c_int * 3)() + + if not SystemParametersInfo( + SPI_GETMOUSE, + 0, + mouse_params, + 0 + ): + raise ctypes.WinError(ctypes.get_last_error()) + + mouse_params[2] = 1 if enabled else 0 + + if not SystemParametersInfo( + SPI_SETMOUSE, + 0, + mouse_params, + SPIF_SENDCHANGE + ): + raise ctypes.WinError(ctypes.get_last_error()) + def strip_ansi(text: str) -> str: return ANSI_ESCAPE_RE.sub("", text) @@ -162,11 +341,16 @@ def load_playlist(playlist_path): def load_ra3_maps(path): maps = set() - with open(path, "r", encoding="utf-8") as f: - for line in f: - line = line.strip() - if line: - maps.add(line.lower()) + try: + with open(path, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if line: + maps.add(line.lower()) + except PermissionError: + # File is locked by another process (e.g., Notepad) + # Decide how your app should behave + return set() return maps def next_track(): @@ -246,7 +430,7 @@ def monitor_game(pty_proc): buffer = "" - while not stop_flag.is_set(): + while not stop_flag.is_set() or shutdown_event.is_set(): try: data = pty_proc.read(1024) if not data: @@ -266,19 +450,19 @@ def monitor_game(pty_proc): #print(f"[GAME RAW] {repr(line)}") handle_game_line(line, pty_proc) - except EOFError: + except EOFError or KeyboardInterrupt: break def handle_game_line(line, pty_proc): - global volume, current_map + global volume, current_map, ra3_maps, ra3_maps_path global serverstatus_sent, map_checked if "--- Common Initialization Complete ---" in line: - threading.Timer(3.0, lambda: send_command(pty_proc,"s_musicVolume")).start() + threading.Timer(1.0, lambda: send_command(pty_proc,"s_musicVolume")).start() if current_mode == "shuffle": global playlist_index playlist_index = random.randint(0, len(playlist) - 1) - threading.Timer(7.0, play_current).start() + threading.Timer(2.0, play_current).start() elif line.startswith("s_musicVolume") and "\"s_musicVolume\" is:" in line: svolume = parse_music_volume(line) if svolume > 0: @@ -287,21 +471,25 @@ def handle_game_line(line, pty_proc): pygame.mixer.music.set_volume(volume) print(f"[DEBUG] Set music volume to {volume} by game client.") elif line.startswith("]\\nexttrack"): + ra3_maps = load_ra3_maps(ra3_maps_path) if current_map in ra3_maps: next_track() else: threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start() elif line.startswith("]\\prevtrack"): + ra3_maps = load_ra3_maps(ra3_maps_path) if current_map in ra3_maps: previous_track() else: threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start() elif line.startswith("]\\pausetrack"): + ra3_maps = load_ra3_maps(ra3_maps_path) if current_map in ra3_maps: toggle_pause() else: threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start() elif line.startswith("]\\musicmode"): + ra3_maps = load_ra3_maps(ra3_maps_path) if current_map in ra3_maps: change_mode() else: @@ -311,6 +499,7 @@ def handle_game_line(line, pty_proc): elif "mapname" in line.lower() and serverstatus_sent and not map_checked: current_map = line.split()[-1].lower() map_checked = True + ra3_maps = load_ra3_maps(ra3_maps_path) if current_map in ra3_maps: print(f"[DEBUG] Known map: {current_map}. Advancing track.") next_track() @@ -349,9 +538,15 @@ def keyboard_listener(): # ======================== MAIN =========================== def main(): - global playlist, ra3_maps, pty_proc, playlist_path + mutex = acquire_single_instance("Global\\Ra3WithMp3") - print(f"RA3 MP3 Player - returning RA3 to greatness!") + if mutex is None: + print("Another instance is already running.") + sys.exit(1) + + global playlist, ra3_maps, pty_proc, playlist_path, ra3_maps_path, WasPointerPrecisionEnabled + + print(f"Loading Rocket Arena 3 with MP3 playback.") # Use debug paths if enabled game_exe = DEBUG_GAME_EXE if DEBUG else DEFAULT_GAME_EXE @@ -390,13 +585,31 @@ def main(): screenWidth = root.winfo_screenwidth() screenHeight = root.winfo_screenheight() + argsProc = sys.argv[1:] + args_lower = [arg.lower() for arg in argsProc] + + args_for_game = [""] + if "--no_match_res" in args_lower: + args_for_game = ["com_skipIntroVideo", "1", "-skipmovies"] + sys.argv[1:] + else: + args_for_game = ["+set", "r_mode", "-1", "+set", "r_customWidth", f"{screenWidth}", "+set", "r_customHeight", f"{screenHeight}", "+set", "com_skipIntroVideo", "1", "-skipmovies"] + sys.argv[1:] + + WasPointerPrecisionEnabled=is_pointer_precision_enabled() + + if WasPointerPrecisionEnabled: + set_pointer_precision(False) + # Launch quake process via PTY try: - pty_proc = winpty.PtyProcess.spawn([game_exe, "+set", "r_mode", "-1", "+set", "r_customWidth", f"{screenWidth}", "+set", "r_customHeight", f"{screenHeight}"] + sys.argv[1:]) + pty_proc = winpty.PtyProcess.spawn([game_exe] + args_for_game) except Exception as e: print(f"Failed to start game via PTY: {e}") return + game_pid = pty_proc.pid + + threading.Thread(target=track_game_focus, args=(game_pid,), daemon=True).start() + # Monitor the game output try: monitor_game(pty_proc) @@ -407,6 +620,8 @@ def main(): stop_playback() pty_proc.close() pygame.mixer.quit() + if WasPointerPrecisionEnabled: + set_pointer_precision(True) if __name__ == "__main__": main()