Alright, I think this is going to be the final version!
I got it now where the program monitors the tries per second and dynamically adjusts the iterations per cycle to maintain as high a performance as possible. The tries per second (TPS) number it uses for this is a bit different than the one being displayed on the screen. The one being displayed is an average and it calculated by taking the total number of tries divided by the total time (Session or Overall respectively), whereas the TPS used to adjust the iterations per cycle (IPC) is taken of the previous one second of time. There is a minimum IPC of 2000 that is in the code. If you think your system would like fewer than that then just find every instance of "2000" in the code and change it to "1000" or whatever.
Anyway, the point is that it works and it is optimized as best as I can figure out to take full advantage of whatever system it is running on. The only thing the user has to do is play around with the number of processes to see how many your system can handle before losing performance.
I got it now where the program monitors the tries per second and dynamically adjusts the iterations per cycle to maintain as high a performance as possible. The tries per second (TPS) number it uses for this is a bit different than the one being displayed on the screen. The one being displayed is an average and it calculated by taking the total number of tries divided by the total time (Session or Overall respectively), whereas the TPS used to adjust the iterations per cycle (IPC) is taken of the previous one second of time. There is a minimum IPC of 2000 that is in the code. If you think your system would like fewer than that then just find every instance of "2000" in the code and change it to "1000" or whatever.
Anyway, the point is that it works and it is optimized as best as I can figure out to take full advantage of whatever system it is running on. The only thing the user has to do is play around with the number of processes to see how many your system can handle before losing performance.
Code:
import curses
import multiprocessing
import random
import time
import json
import os
import sys
# -------------------------------
# Configuration and Constants
# -------------------------------
TARGET = "abcdefghijklmnopqrstuvwxyz" # Target alphabet
ALLOWED_CHARS = "abcdefghijklmnopqrstuvwxyz" # Allowed characters
PERSISTENT_FILE = "overall_stats.json" # File for persistent stats
MIN_IPC = 2000 # Minimum iterations per cycle (never drop below this)
# -------------------------------
# Persistence Functions
# -------------------------------
def load_overall_stats():
default_data = {
"overall_attempts": 0,
"overall_elapsed": 0.0,
"best_data": {"attempt": " " * len(TARGET), "match_count": 0},
"score_distribution": [0] * (len(TARGET) + 1)
}
if os.path.exists(PERSISTENT_FILE):
try:
with open(PERSISTENT_FILE, "r") as f:
data = json.load(f)
# Ensure all keys are present
for key, default in default_data.items():
if key not in data:
data[key] = default
except Exception:
data = default_data
else:
data = default_data
return data
def save_overall_stats(overall_attempts, overall_elapsed, best_data, score_distribution):
data = {
"overall_attempts": overall_attempts,
"overall_elapsed": overall_elapsed,
"best_data": dict(best_data),
"score_distribution": list(score_distribution)
}
try:
with open(PERSISTENT_FILE, "w") as f:
json.dump(data, f)
except Exception as e:
sys.stderr.write(f"Error saving persistent data: {e}\n")
# -------------------------------
# Utility: Time Formatting
# -------------------------------
def format_time(sec):
total_seconds = sec
years = int(total_seconds // (365 * 24 * 3600))
total_seconds %= (365 * 24 * 3600)
days = int(total_seconds // (24 * 3600))
total_seconds %= (24 * 3600)
hours = int(total_seconds // 3600)
total_seconds %= 3600
minutes = int(total_seconds // 60)
seconds = total_seconds % 60
return f"{years} years, {days} days, {hours} hours, {minutes} minutes, {seconds:05.2f} seconds"
# -------------------------------
# Worker Process Function
# -------------------------------
def worker(p_target, session_attempts, best_data, paused, exit_event,
score_distribution, distribution_lock, iterations_list, worker_index):
iterations_per_cycle = 2000 # start at 2000 iterations per cycle
target_length = len(p_target)
local_best_match = best_data.get("match_count", 0)
rnd = random.Random()
# Variables for sliding-window TPS measurement (window length ~1 second)
window_start = time.time()
window_attempts = 0
last_window_tps = None
while not exit_event.is_set():
if paused.value:
time.sleep(0.1)
continue
start_cycle = time.time()
local_batch_attempts = 0
local_best_count = local_best_match
# Local distribution count for this cycle (for scores 0..target_length)
local_distribution = [0] * (target_length + 1)
for _ in range(iterations_per_cycle):
attempt = ''.join(rnd.choice(ALLOWED_CHARS) for _ in range(target_length))
match_count = sum(1 for i in range(target_length) if attempt[i] == p_target[i])
local_distribution[match_count] += 1
local_batch_attempts += 1
if match_count > local_best_count:
local_best_count = match_count
# Update global best if this is an improvement
if local_best_count > best_data.get("match_count", 0):
best_data["attempt"] = attempt
best_data["match_count"] = local_best_count
# Update total session attempts
with session_attempts.get_lock():
session_attempts.value += local_batch_attempts
# Merge this cycle's distribution counts into the shared distribution
with distribution_lock:
for i in range(len(local_distribution)):
# Ensure the list is long enough (it normally is)
if i < len(score_distribution):
score_distribution[i] += local_distribution[i]
else:
score_distribution.append(local_distribution[i])
# --- New Sliding-Window TPS-Based IPC Adjustment ---
window_attempts += local_batch_attempts
current_time = time.time()
window_duration = current_time - window_start
if window_duration >= 1.0:
# Compute instantaneous TPS for this window
current_window_tps = window_attempts / window_duration
if last_window_tps is not None:
# If TPS increased by >5%, increase iterations by 10%
if current_window_tps > last_window_tps * 1.05:
iterations_per_cycle = int(iterations_per_cycle * 1.1)
# If TPS dropped by >5%, decrease iterations by 10%
elif current_window_tps < last_window_tps * 0.95:
iterations_per_cycle = max(MIN_IPC, int(iterations_per_cycle * 0.9))
last_window_tps = current_window_tps
window_start = current_time
window_attempts = 0
iterations_list[worker_index] = iterations_per_cycle
local_best_match = local_best_count
# -------------------------------
# Curses UI Main Function
# -------------------------------
def main(stdscr):
global NUM_WORKERS
# Curses initialization
curses.curs_set(0)
stdscr.nodelay(True)
stdscr.keypad(True)
curses.start_color()
curses.use_default_colors()
curses.init_pair(1, curses.COLOR_GREEN, -1)
# Load persistent stats (including best_data and score_distribution)
persistent_data = load_overall_stats()
overall_attempts_loaded = persistent_data.get("overall_attempts", 0)
overall_elapsed_loaded = persistent_data.get("overall_elapsed", 0.0)
persistent_best = persistent_data.get("best_data", {"attempt": " " * len(TARGET), "match_count": 0})
persistent_distribution = persistent_data.get("score_distribution", [0] * (len(TARGET) + 1))
# Variables for tracking active (non-paused) runtime
session_active_time = 0.0
last_loop_time = time.time()
manager = multiprocessing.Manager()
# Initialize best_data and score_distribution with persistent values
best_data = manager.dict(persistent_best)
session_attempts = multiprocessing.Value('L', 0)
paused = multiprocessing.Value('b', False)
exit_event = multiprocessing.Event()
score_distribution = manager.list(persistent_distribution)
distribution_lock = multiprocessing.Lock()
iterations_list = manager.list([2000] * NUM_WORKERS)
workers = []
for i in range(NUM_WORKERS):
p = multiprocessing.Process(target=worker, args=(
TARGET, session_attempts, best_data, paused, exit_event,
score_distribution, distribution_lock, iterations_list, i
))
p.start()
workers.append(p)
try:
while True:
stdscr.clear()
current_time = time.time()
dt = current_time - last_loop_time
if not paused.value:
session_active_time += dt
last_loop_time = current_time
# Handle key presses
key = stdscr.getch()
if key != -1:
if key == 16: # Ctrl+P toggles pause/resume
with paused.get_lock():
paused.value = not paused.value
elif key == 17: # Ctrl+Q quits the program
break
session_elapsed = session_active_time
overall_elapsed = overall_elapsed_loaded + session_elapsed
with session_attempts.get_lock():
session_attempts_val = session_attempts.value
total_attempts = overall_attempts_loaded + session_attempts_val
session_tps = session_attempts_val / session_elapsed if session_elapsed > 0 else 0
overall_tps = total_attempts / overall_elapsed if overall_elapsed > 0 else 0
avg_iterations = int(sum(iterations_list) / len(iterations_list)) if len(iterations_list) > 0 else 0
# Build the display text:
line = 0
stdscr.addstr(line, 0, "Target Alphabet:")
line += 1
stdscr.addstr(line, 0, TARGET)
line += 2
stdscr.addstr(line, 0, "New Best Match:")
line += 1
best_attempt = best_data.get("attempt", " " * len(TARGET))
for i, ch in enumerate(best_attempt):
if i < len(TARGET) and ch == TARGET[i]:
stdscr.addstr(line, i, ch.upper(), curses.color_pair(1))
else:
stdscr.addstr(line, i, ch)
line += 2
stdscr.addstr(line, 0, f"Total Attempts: {total_attempts:,}")
line += 2
stdscr.addstr(line, 0, f"Session Elapsed Time: {format_time(session_elapsed)}")
line += 1
stdscr.addstr(line, 0, f"Overall Elapsed Time: {format_time(overall_elapsed)}")
line += 2
stdscr.addstr(line, 0, f"Session Tries per Second: {session_tps:,.2f}")
line += 1
stdscr.addstr(line, 0, f"Overall Tries per Second: {overall_tps:,.2f}")
line += 2
stdscr.addstr(line, 0, "Score Distribution:")
line += 1
best_match = best_data.get("match_count", 0)
for score in range(best_match + 1):
stdscr.addstr(line, 0, f"{score} correct: {score_distribution[score]:,}")
line += 1
line += 1
stdscr.addstr(line, 0, f"Number of Processes Running: {NUM_WORKERS}")
line += 1
stdscr.addstr(line, 0, f"Iterations per Cycle (avg): {avg_iterations}")
line += 2
status_str = "PAUSED" if paused.value else "RUNNING"
stdscr.addstr(line, 0, f"Status: {status_str} (Pause/Resume: Ctrl+P, Quit: Ctrl+Q)")
stdscr.refresh()
time.sleep(0.1)
finally:
exit_event.set()
for p in workers:
p.join(timeout=1)
overall_attempts_new = overall_attempts_loaded + session_attempts.value
overall_elapsed_new = overall_elapsed_loaded + session_active_time
save_overall_stats(overall_attempts_new, overall_elapsed_new, best_data, score_distribution)
# -------------------------------
# Main Entrypoint
# -------------------------------
if __name__ == '__main__':
multiprocessing.freeze_support() # For Windows support
try:
num_processes_input = input("Enter number of processes to run: ")
try:
num_processes = int(num_processes_input)
if num_processes < 1:
raise ValueError
except ValueError:
num_processes = min(24, multiprocessing.cpu_count())
print(f"Invalid input. Defaulting to {num_processes} processes.")
global NUM_WORKERS
NUM_WORKERS = num_processes
curses.wrapper(main)
except KeyboardInterrupt:
pass
Last edited: