Add project files.

This commit is contained in:
eviled 2026-01-02 18:12:41 -05:00
parent e776cc7163
commit 9cd3bc15c9
14 changed files with 27066 additions and 0 deletions

413
RA3MP3Playback.py Normal file
View file

@ -0,0 +1,413 @@
import threading
import time
import os
import sys
import random
import re
import tkinter as tk
from pathlib import Path
import queue
from tkinter import CURRENT
from pathlib import Path
os.environ["PYGAME_HIDE_SUPPORT_PROMPT"] = "1"
import pygame
import readchar
import winpty.ptyprocess
import winpty
# ========================= CONFIG =========================
DEBUG = True # Set True to enable debug overrides
# Default paths (used if not in debug mode)
DEFAULT_GAME_EXE = r".\RA3game.dll"
DEFAULT_PLAYLIST = r".\arena\music\playlist.txt"
DEFAULT_RA3_MAPS = r".\arena\music\ra3_maps.txt"
# Debug override paths
DEBUG_GAME_EXE = r"D:\GOG Games\Rocket Arena 3\RA3game.dll"
DEBUG_PLAYLIST = r"D:\GOG Games\Rocket Arena 3\arena\music\playlist.txt"
DEBUG_RA3_MAPS = r"D:\GOG Games\Rocket Arena 3\arena\music\ra3_maps.txt"
# Initial volume
VOLUME_STEP = 0.1 # step for W/S volume control
# ==========================================================
# ====================== GLOBAL STATE =====================
playlist = []
playlist_index = 0
current_mode = "shuffle" # sequential, shuffle, loop
volume = 0.5
is_playing = False
stop_flag = threading.Event()
serverstatus_sent = False
map_checked = False
current_map = "nomap"
current_song = "nosong"
playlist_path = "undefined"
ra3_maps = set()
song_queue = queue.Queue()
ANSI_ESCAPE_RE = re.compile(
r'''
\x1B(?:
[78] # ESC 7, ESC 8 (DEC save/restore)
| \[ [0-?]* [ -/]* [@-~] # CSI sequences
| \] .*? (?:\x07|\x1B\\) # OSC sequences
| [@-Z\\-_] # Other 7-bit C1 controls
)
''',
re.VERBOSE
)
# ==========================================================
# ===================== UTILITY FUNCTIONS =================
def strip_ansi(text: str) -> str:
return ANSI_ESCAPE_RE.sub("", text)
def playlist_watcher(path, poll_interval=1.0):
global playlist
try:
last_mtime = os.path.getmtime(path)
except FileNotFoundError:
last_mtime = 0
while not stop_flag.is_set():
try:
current_mtime = os.path.getmtime(path)
if current_mtime != last_mtime:
last_mtime = current_mtime
print("[DEBUG] Playlist file changed. Reloading playlist.")
playlist = load_playlist(path)
except FileNotFoundError:
pass # Playlist temporarily missing; ignore
time.sleep(poll_interval)
def track_watcher():
global is_playing
last_busy = False
while not stop_flag.is_set():
busy = pygame.mixer.music.get_busy()
if last_busy and not busy and is_playing:
# Track just finished
print("[DEBUG] Track finished. Advancing to next track.")
next_track()
last_busy = busy
time.sleep(0.1)
def parse_music_volume(line: str) -> float:
# Remove all ^<digit> color codes
clean_line = re.sub(r'\^\d', '', line)
# Extract the number inside quotes after "is:"
match = re.search(r'is\s*:\s*"(.*?)"', clean_line, re.IGNORECASE)
if not match:
#raise ValueError(f"Could not parse volume from line: {line}")
value_str = 0
else:
value_str = match.group(1)
try:
return float(value_str)
except ValueError:
raise ValueError(f"Invalid float value for volume: {value_str}")
def load_playlist(playlist_path):
global playlist_index
playlist_path = Path(playlist_path).resolve()
base_dir = playlist_path.parent
songs = []
with open(playlist_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
if line.startswith("#"):
continue
song_path = Path(line)
# If the playlist entry is relative, resolve it relative to playlist.txt
if not song_path.is_absolute():
song_path = base_dir / song_path
songs.append(str(song_path))
if current_mode == "shuffle":
random.shuffle(songs)
# Re-align song that's playing to new index
if current_song != "nosong":
_song_index = 0
_found_song = False
for s in songs:
_song_eval = Path(s).stem
if current_song == _song_eval:
_found_song = True
playlist_index = _song_index
break
_song_index += 1
if not _found_song:
playlist_index = 0
play_current()
return songs
def load_ra3_maps(path):
maps = set()
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
maps.add(line.lower())
return maps
def next_track():
send_command(pty_proc,"s_musicvolume")
global playlist_index
if current_mode == "loop":
pass # keep same index
else:
playlist_index = (playlist_index + 1) % len(playlist)
play_current()
def previous_track():
send_command(pty_proc,"s_musicVolume")
global playlist_index
if current_mode == "loop":
pass # keep same index
else:
playlist_index = (playlist_index - 1) % len(playlist)
play_current()
def play_current():
#send_command(pty_proc,"s_musicVolume")
global is_playing, current_song
if not playlist:
return
track = playlist[playlist_index]
if not os.path.isfile(track):
print(f"[DEBUG] Track not found: {track}")
return
try:
pygame.mixer.music.load(track)
pygame.mixer.music.set_volume(volume)
pygame.mixer.music.play()
is_playing = True
current_song = Path(track).stem
print(f"[DEBUG] Playing: {current_song} at volume {volume}")
threading.Timer(.1, lambda: send_command(pty_proc,f"echo Playing: {current_song}")).start()
except Exception as e:
print(f"[DEBUG] Couldn't start MP3 player': {e}")
#send_command(pty_proc,f"echo Playing: {Path(track).stem}")
def stop_playback():
global is_playing
pygame.mixer.music.stop()
is_playing = False
def toggle_pause():
send_command(pty_proc,"s_musicVolume")
global is_playing
if pygame.mixer.music.get_busy():
pygame.mixer.music.pause()
is_playing = False
else:
pygame.mixer.music.unpause()
is_playing = True
def change_mode():
global current_mode, playlist_path, playlist
modes = ["sequential", "shuffle", "loop"]
idx = modes.index(current_mode)
current_mode = modes[(idx + 1) % len(modes)]
print(f"[DEBUG] Mode changed to: {current_mode}")
send_command(pty_proc,f"echo _musicmode {current_mode}")
playlist = load_playlist(playlist_path)
#def adjust_volume(up=True):
# global volume
# volume = min(1.0, max(0.0, volume + (VOLUME_STEP if up else -VOLUME_STEP)))
# pygame.mixer.music.set_volume(volume)
# print(f"[DEBUG] Volume: {volume:.2f}")
# ==========================================================
# ==================== QUAKE MONITOR ======================
def monitor_game(pty_proc):
global serverstatus_sent, map_checked
buffer = ""
while not stop_flag.is_set():
try:
data = pty_proc.read(1024)
if not data:
break
# Normalize to string
if isinstance(data, bytes):
data = data.decode(errors="ignore")
buffer += data
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
line = strip_ansi(line).strip()
if line:
#print(f"[GAME] {line}")
#print(f"[GAME RAW] {repr(line)}")
handle_game_line(line, pty_proc)
except EOFError:
break
def handle_game_line(line, pty_proc):
global volume, current_map
global serverstatus_sent, map_checked
if "--- Common Initialization Complete ---" in line:
threading.Timer(3.0, lambda: send_command(pty_proc,"s_musicVolume")).start()
if current_mode == "shuffle":
global playlist_index
playlist_index = random.randint(0, len(playlist) - 1)
threading.Timer(7.0, play_current).start()
elif line.startswith("s_musicVolume") and "\"s_musicVolume\" is:" in line:
svolume = parse_music_volume(line)
if svolume > 0:
if volume != svolume:
volume = svolume
pygame.mixer.music.set_volume(volume)
print(f"[DEBUG] Set music volume to {volume} by game client.")
elif line.startswith("]\\nexttrack"):
if current_map in ra3_maps:
next_track()
else:
threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start()
elif line.startswith("]\\prevtrack"):
if current_map in ra3_maps:
previous_track()
else:
threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start()
elif line.startswith("]\\pausetrack"):
if current_map in ra3_maps:
toggle_pause()
else:
threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start()
elif line.startswith("]\\musicmode"):
if current_map in ra3_maps:
change_mode()
else:
threading.Timer(.1, lambda: send_command(pty_proc,f"echo Music controls only available for RA3 maps.")).start()
elif line.startswith("Com_TouchMemory:"):
threading.Timer(1.0, lambda: send_command_and_mark(pty_proc)).start()
elif "mapname" in line.lower() and serverstatus_sent and not map_checked:
current_map = line.split()[-1].lower()
map_checked = True
if current_map in ra3_maps:
print(f"[DEBUG] Known map: {current_map}. Advancing track.")
next_track()
else:
print(f"[DEBUG] Unknown map: {current_map}. Stopping playback.")
stop_playback()
def send_command_and_mark(pty_proc):
global serverstatus_sent, map_checked
send_command(pty_proc, "serverstatus")
serverstatus_sent = True
map_checked = False
def send_command(pty_proc, cmd):
try:
pty_proc.write(("\x1bOF\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f\x7f" + cmd + "\n"))
pty_proc.flush()
print(f"[DEBUG] Sent command: {cmd}")
except Exception as e:
print(f"[DEBUG] Failed to send command: {e}")
# ==========================================================
# ==================== KEYBOARD HANDLER ===================
def keyboard_listener():
while not stop_flag.is_set():
key = readchar.readkey()
if key.lower() == '[':
previous_track()
elif key.lower() == ']':
next_track()
elif key == '\'':
toggle_pause()
elif key == '\\':
change_mode()
# ==========================================================
# ======================== MAIN ===========================
def main():
global playlist, ra3_maps, pty_proc, playlist_path
print(f"RA3 MP3 Player - returning RA3 to greatness!")
# Use debug paths if enabled
game_exe = DEBUG_GAME_EXE if DEBUG else DEFAULT_GAME_EXE
playlist_path = DEBUG_PLAYLIST if DEBUG else DEFAULT_PLAYLIST
ra3_maps_path = DEBUG_RA3_MAPS if DEBUG else DEFAULT_RA3_MAPS
# Load playlist and map list
playlist = load_playlist(playlist_path)
ra3_maps = load_ra3_maps(ra3_maps_path)
if not playlist:
print("Playlist is empty!")
return
if not ra3_maps:
print("RA3 maps list is empty!")
return
# Initialize pygame mixer
pygame.mixer.init()
# Start playlist watcher thread
playlist_thread = threading.Thread(target=playlist_watcher,args=(playlist_path,), daemon=True)
playlist_thread.start()
# Start keyboard listener thread
kb_thread = threading.Thread(target=keyboard_listener, daemon=True)
kb_thread.start()
# Start track watcher thread
watcher_thread = threading.Thread(target=track_watcher, daemon=True)
watcher_thread.start()
root = tk.Tk()
root.withdraw()
screenWidth = root.winfo_screenwidth()
screenHeight = root.winfo_screenheight()
# Launch quake process via PTY
try:
pty_proc = winpty.PtyProcess.spawn([game_exe, "+set", "r_mode", "-1", "+set", "r_customWidth", f"{screenWidth}", "+set", "r_customHeight", f"{screenHeight}"] + sys.argv[1:])
except Exception as e:
print(f"Failed to start game via PTY: {e}")
return
# Monitor the game output
try:
monitor_game(pty_proc)
except KeyboardInterrupt:
print("Exiting...")
finally:
stop_flag.set()
stop_playback()
pty_proc.close()
pygame.mixer.quit()
if __name__ == "__main__":
main()
# ==========================================================