Getting close to a complete app.

This commit is contained in:
edschuy95 2026-01-04 23:59:47 -05:00
parent 9cd3bc15c9
commit 90fd0cd571

View file

@ -1,3 +1,4 @@
from math import fabs
import threading import threading
import time import time
import os import os
@ -5,6 +6,7 @@ import sys
import random import random
import re import re
import tkinter as tk import tkinter as tk
import psutil
from pathlib import Path from pathlib import Path
import queue import queue
from tkinter import CURRENT from tkinter import CURRENT
@ -16,6 +18,37 @@ import pygame
import readchar import readchar
import winpty.ptyprocess import winpty.ptyprocess
import winpty import winpty
import ctypes
from ctypes import wintypes
# Win32 constants
SPI_GETMOUSE = 0x0003
SPI_SETMOUSE = 0x0004
SPIF_SENDCHANGE = 0x02
# Console event constants
CTRL_C_EVENT = 0
CTRL_BREAK_EVENT = 1
CTRL_CLOSE_EVENT = 2
CTRL_LOGOFF_EVENT = 5
CTRL_SHUTDOWN_EVENT = 6
user32 = ctypes.WinDLL("user32", use_last_error=True)
kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
# Event to signal shutdown
shutdown_event = threading.Event()
ERROR_ALREADY_EXISTS = 183
SystemParametersInfo = user32.SystemParametersInfoW
SystemParametersInfo.argtypes = (
wintypes.UINT, # uiAction
wintypes.UINT, # uiParam
ctypes.POINTER(ctypes.c_int), # pvParam (int[3])
wintypes.UINT # fWinIni
)
SystemParametersInfo.restype = wintypes.BOOL
# ========================= CONFIG ========================= # ========================= CONFIG =========================
@ -36,6 +69,7 @@ VOLUME_STEP = 0.1 # step for W/S volume control
# ========================================================== # ==========================================================
# ====================== GLOBAL STATE ===================== # ====================== GLOBAL STATE =====================
ra3_maps_path = "."
playlist = [] playlist = []
playlist_index = 0 playlist_index = 0
current_mode = "shuffle" # sequential, shuffle, loop current_mode = "shuffle" # sequential, shuffle, loop
@ -49,10 +83,11 @@ current_song = "nosong"
playlist_path = "undefined" playlist_path = "undefined"
ra3_maps = set() ra3_maps = set()
song_queue = queue.Queue() song_queue = queue.Queue()
WasPointerPrecisionEnabled = False
ANSI_ESCAPE_RE = re.compile( ANSI_ESCAPE_RE = re.compile(
r''' r'''
\x1B(?: \x1B(?:
[78] # ESC 7, ESC 8 (DEC save/restore) [78] # ESC 7, ESC 8 (DEC save/restore)
| \[ [0-?]* [ -/]* [@-~] # CSI sequences | \[ [0-?]* [ -/]* [@-~] # CSI sequences
| \] .*? (?:\x07|\x1B\\) # OSC sequences | \] .*? (?:\x07|\x1B\\) # OSC sequences
| [@-Z\\-_] # Other 7-bit C1 controls | [@-Z\\-_] # Other 7-bit C1 controls
@ -65,6 +100,150 @@ ANSI_ESCAPE_RE = re.compile(
# ========================================================== # ==========================================================
# ===================== UTILITY FUNCTIONS ================= # ===================== UTILITY FUNCTIONS =================
def console_handler(ctrl_type):
print(f"Control event: {ctrl_type}")
if ctrl_type == CTRL_CLOSE_EVENT:
print("\n[X] Console close requested! Signaling cleanup...")
shutdown_event.set() # Signal main thread to clean up
return True # We've handled it
return False # Let other events pass
# Convert Python function to C callback
HandlerRoutine = ctypes.WINFUNCTYPE(ctypes.c_bool, ctypes.c_uint)
handler = HandlerRoutine(console_handler)
ctypes.windll.kernel32.SetConsoleCtrlHandler(handler, True)
def track_game_focus( game_pid, poll_interval=0.1, debounce_time=0.3,):
"""
Tracks focus for a fullscreen game by PID.
- Runs code ONLY when focus state changes
- Handles fullscreen window recreation
- Debounces rapid transitions
"""
global WasPointerPrecisionEnabled, volume
user32 = ctypes.WinDLL("user32", use_last_error=True)
user32.GetForegroundWindow.restype = wintypes.HWND
user32.GetWindowThreadProcessId.argtypes = (
wintypes.HWND,
ctypes.POINTER(wintypes.DWORD),
)
user32.GetWindowThreadProcessId.restype = wintypes.DWORD
focused = False
last_state_change = 0.0
while True:
hwnd = user32.GetForegroundWindow()
pid = wintypes.DWORD()
is_focused = False
if hwnd:
user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid))
is_focused = (pid.value == game_pid)
now = time.monotonic()
# ---------- STATE CHANGE DETECTION ----------
if is_focused != focused:
# Debounce to avoid flicker during fullscreen transitions
if now - last_state_change >= debounce_time:
focused = is_focused
last_state_change = now
if focused:
print("[DEBUG] Game gained focus")
#print("[DEBUG] Restoring music volume")
#pygame.mixer.music.set_volume(volume)
if WasPointerPrecisionEnabled:
print("[DEBUG] Disabling mouse pointer precision")
set_pointer_precision(False)
# Example:
# set_pointer_precision(False)
# pause_background_tasks()
# lower_system_volume()
else:
# ==========================================
# FOCUS LOST (focused -> unfocused)
# ADD YOUR "FOCUS LOST" CODE HERE
# ==========================================
print("[DEBUG] Game lost focus")
#print("[DEBUG] Muting music")
#pygame.mixer.music.set_volume(0)
if WasPointerPrecisionEnabled:
print("[DEBUG] Restoring mouse pointer precision")
set_pointer_precision(True)
# Example:
# set_pointer_precision(True)
# resume_background_tasks()
# restore_system_volume()
# --------------------------------------------
time.sleep(poll_interval)
def acquire_single_instance(name: str):
handle = kernel32.CreateMutexW(
None, # lpMutexAttributes
False, # bInitialOwner
name # lpName
)
if not handle:
raise ctypes.WinError(ctypes.get_last_error())
if kernel32.GetLastError() == ERROR_ALREADY_EXISTS:
kernel32.CloseHandle(handle)
return None
return handle
def is_pointer_precision_enabled():
"""
Returns True if Windows 'Enhance pointer precision' is enabled.
"""
mouse_params = (ctypes.c_int * 3)()
if not SystemParametersInfo(
SPI_GETMOUSE,
0,
mouse_params,
0
):
raise ctypes.WinError(ctypes.get_last_error())
# mouse_params[2]: 0 = off, 1 = on
return mouse_params[2] != 0
def set_pointer_precision(enabled: bool):
"""
Enables or disables Windows 'Enhance pointer precision'.
"""
mouse_params = (ctypes.c_int * 3)()
if not SystemParametersInfo(
SPI_GETMOUSE,
0,
mouse_params,
0
):
raise ctypes.WinError(ctypes.get_last_error())
mouse_params[2] = 1 if enabled else 0
if not SystemParametersInfo(
SPI_SETMOUSE,
0,
mouse_params,
SPIF_SENDCHANGE
):
raise ctypes.WinError(ctypes.get_last_error())
def strip_ansi(text: str) -> str: def strip_ansi(text: str) -> str:
return ANSI_ESCAPE_RE.sub("", text) return ANSI_ESCAPE_RE.sub("", text)
@ -162,11 +341,16 @@ def load_playlist(playlist_path):
def load_ra3_maps(path): def load_ra3_maps(path):
maps = set() maps = set()
with open(path, "r", encoding="utf-8") as f: try:
for line in f: with open(path, "r", encoding="utf-8") as f:
line = line.strip() for line in f:
if line: line = line.strip()
maps.add(line.lower()) if line:
maps.add(line.lower())
except PermissionError:
# File is locked by another process (e.g., Notepad)
# Decide how your app should behave
return set()
return maps return maps
def next_track(): def next_track():
@ -246,7 +430,7 @@ def monitor_game(pty_proc):
buffer = "" buffer = ""
while not stop_flag.is_set(): while not stop_flag.is_set() or shutdown_event.is_set():
try: try:
data = pty_proc.read(1024) data = pty_proc.read(1024)
if not data: if not data:
@ -266,19 +450,19 @@ def monitor_game(pty_proc):
#print(f"[GAME RAW] {repr(line)}") #print(f"[GAME RAW] {repr(line)}")
handle_game_line(line, pty_proc) handle_game_line(line, pty_proc)
except EOFError: except EOFError or KeyboardInterrupt:
break break
def handle_game_line(line, pty_proc): def handle_game_line(line, pty_proc):
global volume, current_map global volume, current_map, ra3_maps, ra3_maps_path
global serverstatus_sent, map_checked global serverstatus_sent, map_checked
if "--- Common Initialization Complete ---" in line: if "--- Common Initialization Complete ---" in line:
threading.Timer(3.0, lambda: send_command(pty_proc,"s_musicVolume")).start() threading.Timer(1.0, lambda: send_command(pty_proc,"s_musicVolume")).start()
if current_mode == "shuffle": if current_mode == "shuffle":
global playlist_index global playlist_index
playlist_index = random.randint(0, len(playlist) - 1) playlist_index = random.randint(0, len(playlist) - 1)
threading.Timer(7.0, play_current).start() threading.Timer(2.0, play_current).start()
elif line.startswith("s_musicVolume") and "\"s_musicVolume\" is:" in line: elif line.startswith("s_musicVolume") and "\"s_musicVolume\" is:" in line:
svolume = parse_music_volume(line) svolume = parse_music_volume(line)
if svolume > 0: if svolume > 0:
@ -287,21 +471,25 @@ def handle_game_line(line, pty_proc):
pygame.mixer.music.set_volume(volume) pygame.mixer.music.set_volume(volume)
print(f"[DEBUG] Set music volume to {volume} by game client.") print(f"[DEBUG] Set music volume to {volume} by game client.")
elif line.startswith("]\\nexttrack"): elif line.startswith("]\\nexttrack"):
ra3_maps = load_ra3_maps(ra3_maps_path)
if current_map in ra3_maps: if current_map in ra3_maps:
next_track() next_track()
else: else:
threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start() threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start()
elif line.startswith("]\\prevtrack"): elif line.startswith("]\\prevtrack"):
ra3_maps = load_ra3_maps(ra3_maps_path)
if current_map in ra3_maps: if current_map in ra3_maps:
previous_track() previous_track()
else: else:
threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start() threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start()
elif line.startswith("]\\pausetrack"): elif line.startswith("]\\pausetrack"):
ra3_maps = load_ra3_maps(ra3_maps_path)
if current_map in ra3_maps: if current_map in ra3_maps:
toggle_pause() toggle_pause()
else: else:
threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start() threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start()
elif line.startswith("]\\musicmode"): elif line.startswith("]\\musicmode"):
ra3_maps = load_ra3_maps(ra3_maps_path)
if current_map in ra3_maps: if current_map in ra3_maps:
change_mode() change_mode()
else: else:
@ -311,6 +499,7 @@ def handle_game_line(line, pty_proc):
elif "mapname" in line.lower() and serverstatus_sent and not map_checked: elif "mapname" in line.lower() and serverstatus_sent and not map_checked:
current_map = line.split()[-1].lower() current_map = line.split()[-1].lower()
map_checked = True map_checked = True
ra3_maps = load_ra3_maps(ra3_maps_path)
if current_map in ra3_maps: if current_map in ra3_maps:
print(f"[DEBUG] Known map: {current_map}. Advancing track.") print(f"[DEBUG] Known map: {current_map}. Advancing track.")
next_track() next_track()
@ -349,9 +538,15 @@ def keyboard_listener():
# ======================== MAIN =========================== # ======================== MAIN ===========================
def main(): def main():
global playlist, ra3_maps, pty_proc, playlist_path mutex = acquire_single_instance("Global\\Ra3WithMp3")
print(f"RA3 MP3 Player - returning RA3 to greatness!") if mutex is None:
print("Another instance is already running.")
sys.exit(1)
global playlist, ra3_maps, pty_proc, playlist_path, ra3_maps_path, WasPointerPrecisionEnabled
print(f"Loading Rocket Arena 3 with MP3 playback.")
# Use debug paths if enabled # Use debug paths if enabled
game_exe = DEBUG_GAME_EXE if DEBUG else DEFAULT_GAME_EXE game_exe = DEBUG_GAME_EXE if DEBUG else DEFAULT_GAME_EXE
@ -390,13 +585,31 @@ def main():
screenWidth = root.winfo_screenwidth() screenWidth = root.winfo_screenwidth()
screenHeight = root.winfo_screenheight() screenHeight = root.winfo_screenheight()
argsProc = sys.argv[1:]
args_lower = [arg.lower() for arg in argsProc]
args_for_game = [""]
if "--no_match_res" in args_lower:
args_for_game = ["com_skipIntroVideo", "1", "-skipmovies"] + sys.argv[1:]
else:
args_for_game = ["+set", "r_mode", "-1", "+set", "r_customWidth", f"{screenWidth}", "+set", "r_customHeight", f"{screenHeight}", "+set", "com_skipIntroVideo", "1", "-skipmovies"] + sys.argv[1:]
WasPointerPrecisionEnabled=is_pointer_precision_enabled()
if WasPointerPrecisionEnabled:
set_pointer_precision(False)
# Launch quake process via PTY # Launch quake process via PTY
try: try:
pty_proc = winpty.PtyProcess.spawn([game_exe, "+set", "r_mode", "-1", "+set", "r_customWidth", f"{screenWidth}", "+set", "r_customHeight", f"{screenHeight}"] + sys.argv[1:]) pty_proc = winpty.PtyProcess.spawn([game_exe] + args_for_game)
except Exception as e: except Exception as e:
print(f"Failed to start game via PTY: {e}") print(f"Failed to start game via PTY: {e}")
return return
game_pid = pty_proc.pid
threading.Thread(target=track_game_focus, args=(game_pid,), daemon=True).start()
# Monitor the game output # Monitor the game output
try: try:
monitor_game(pty_proc) monitor_game(pty_proc)
@ -407,6 +620,8 @@ def main():
stop_playback() stop_playback()
pty_proc.close() pty_proc.close()
pygame.mixer.quit() pygame.mixer.quit()
if WasPointerPrecisionEnabled:
set_pointer_precision(True)
if __name__ == "__main__": if __name__ == "__main__":
main() main()