Daankular's picture
download
raw
40.9 kB
import os
import sys
import json
import subprocess
import argparse
import shutil
import platform
CONFIG_PATH = "setup_config.json"
ENVS_FILE = "envs.json"
IS_WIN = os.name == 'nt'
_PY_PATH = f'"{os.path.join("{dir}", "Scripts", "python.exe")}"' if IS_WIN else f'"{os.path.join("{dir}", "bin", "python")}"'
ENV_TEMPLATES = {
"uv": {
"create": "uv venv --seed --python {ver} \"{dir}\"",
"run": _PY_PATH,
"install": "uv pip install --index-strategy unsafe-best-match --python \"{dir}\""
},
"venv": {
"create": "\"{sys_py}\" -m venv \"{dir}\"",
"run": _PY_PATH,
"install": f"{_PY_PATH} -m pip install"
},
"conda": {
"create": "conda create -y -p \"{dir}\" python={ver}",
"run": os.path.join("{dir}", "python.exe"),
"install": "conda run -p \"{dir}\" pip install"
},
"none": {
"create": "",
"run": "python" if IS_WIN else "python3",
"install": "pip install"
}
}
VERSION_CHECK_SCRIPT = """
import sys
import importlib
import importlib.metadata
pkgs = ['torch', 'triton', 'sageattention', 'spas_sage_attn', 'flash_attn']
res = []
try:
res.append(f"python={sys.version.split()[0]}")
except:
res.append("python=Unknown")
for p in pkgs:
try:
ver = importlib.metadata.version(p)
res.append(f"{p}={ver}")
except importlib.metadata.PackageNotFoundError:
try:
# Fallback to __version__
m = importlib.import_module(p)
ver = getattr(m, '__version__', 'Installed')
res.append(f"{p}={ver}")
except ImportError:
res.append(f"{p}=Missing")
except Exception:
res.append(f"{p}=Error")
print("||".join(res))
"""
class EnvsManager:
def __init__(self):
self.data = {"active": None, "envs": {}}
self.load()
def load(self):
if os.path.exists(ENVS_FILE):
try:
with open(ENVS_FILE, 'r') as f:
self.data = json.load(f)
except:
print(f"[!] Warning: {ENVS_FILE} corrupted. Starting fresh.")
def save(self):
with open(ENVS_FILE, 'w') as f:
json.dump(self.data, f, indent=4)
def get_active(self):
return self.data.get("active")
def set_active(self, name):
if name in self.data["envs"]:
self.data["active"] = name
self.save()
print(f"[*] '{name}' is now the active environment.")
else:
print(f"[!] Environment '{name}' not found.")
def add_env(self, name, type, path):
if path:
cwd = os.getcwd()
abs_path = os.path.abspath(path)
try:
rel_path = os.path.relpath(abs_path, cwd)
if rel_path.startswith("..") or rel_path == ".":
final_path = abs_path
else:
final_path = os.path.join(".", rel_path)
except ValueError:
final_path = abs_path
else:
final_path = ""
self.data["envs"][name] = {"type": type, "path": final_path}
if not self.data["active"]:
self.data["active"] = name
self.save()
def remove_env(self, name):
if name in self.data["envs"]:
entry = self.data["envs"][name]
path = entry["path"]
if os.path.exists(path) and entry["type"] != "none":
try:
print(f"[*] Deleting directory: {path}")
if entry["type"] == "conda":
run_cmd(f"conda env remove -p \"{path}\" -y")
else:
shutil.rmtree(path)
except Exception as e:
print(f"[!] Error removing directory: {e}")
del self.data["envs"][name]
if self.data["active"] == name:
self.data["active"] = None
keys = list(self.data["envs"].keys())
if keys:
self.data["active"] = keys[0]
print(f"[*] Active environment switched to '{keys[0]}'.")
else:
print("[*] No environments left.")
self.save()
def list_envs(self):
return self.data["envs"]
def resolve_target_env(self):
"""Intelligently determine which env to use for operations."""
envs = self.list_envs()
if not envs:
print("[!] No environments found. Please run install first.")
sys.exit(1)
active = self.get_active()
if len(envs) == 1:
return list(envs.keys())[0]
print("\nMultiple environments detected:")
keys = list(envs.keys())
for i, k in enumerate(keys):
marker = "*" if k == active else " "
print(f"{i+1}. [{marker}] {k} ({envs[k]['type']})")
print(f"Default: {active}")
choice = input("Select environment (Number) or Press Enter for Default: ").strip()
if choice == "":
return active
try:
idx = int(choice) - 1
if 0 <= idx < len(keys):
return keys[idx]
except:
pass
return active
def load_config():
if not os.path.exists(CONFIG_PATH):
print(f"Error: {CONFIG_PATH} not found.")
sys.exit(1)
with open(CONFIG_PATH, 'r') as f: return json.load(f)
def get_gpu_info():
if sys.platform == "darwin":
try:
out = subprocess.check_output(
["system_profiler", "SPDisplaysDataType"],
encoding="utf-8",
stderr=subprocess.DEVNULL
)
for line in out.split("\n"):
if "Chip" in line:
name = line.split(":", 1)[1].strip()
return name, "APPLE"
except:
pass
return "Apple Silicon (MPS)", "APPLE"
try:
name = subprocess.check_output(
["nvidia-smi", "--query-gpu=name", "--format=csv,noheader"],
encoding='utf-8',
stderr=subprocess.DEVNULL
).strip()
return name, "NVIDIA"
except: pass
if IS_WIN:
try:
name = subprocess.check_output(
"wmic path win32_VideoController get name",
shell=True,
encoding='utf-8',
stderr=subprocess.DEVNULL
)
name = name.replace("Name", "").strip().split('\n')[0].strip()
if "Radeon" in name or "AMD" in name: return name, "AMD"
return name, "INTEL"
except: pass
else:
try:
name = subprocess.check_output(
"lspci | grep -i vga",
shell=True,
encoding='utf-8',
stderr=subprocess.DEVNULL
)
if "NVIDIA" in name: return name, "NVIDIA"
if "AMD" in name or "Advanced Micro Devices" in name: return name, "AMD"
except: pass
return "Unknown", "UNKNOWN"
def get_profile_key(gpu_name, vendor):
g = gpu_name.upper()
if vendor == "APPLE":
return "MPS"
if vendor == "NVIDIA":
if "50" in g: return "RTX_50"
if "40" in g: return "RTX_40"
if "30" in g: return "RTX_30"
if "20" in g or "QUADRO" in g: return "RTX_20"
return "GTX_10"
elif vendor == "AMD":
if any(x in g for x in ["7600", "7700", "7800", "7900"]): return "AMD_GFX110X"
if any(x in g for x in ["7000", "Z1", "PHOENIX"]): return "AMD_GFX1151"
if any(x in g for x in ["8000", "STRIX", "1201"]): return "AMD_GFX1201"
return "AMD_GFX110X"
return "RTX_40"
def get_os_key():
if sys.platform == "darwin":
return "macos"
return "win" if IS_WIN else "linux"
def resolve_cmd(cmd_entry):
if isinstance(cmd_entry, dict):
return cmd_entry.get(get_os_key())
return cmd_entry
def run_cmd(cmd, env_vars=None):
if not cmd: return
if "&&" in cmd and not IS_WIN:
print(f"\n>>> Running (Shell): {cmd}")
custom_env = os.environ.copy()
if env_vars: custom_env.update(env_vars)
subprocess.run(cmd, shell=True, check=True, env=custom_env)
return
print(f"\n>>> Running: {cmd}")
custom_env = os.environ.copy()
if env_vars:
for k, v in env_vars.items():
print(f" [ENV SET] {k}={v}")
custom_env[k] = v
subprocess.run(cmd, shell=True, check=True, env=custom_env)
def run_pip_component(pip, cmd):
if not cmd: return
run_cmd(cmd.format(pip=pip) if "{pip}" in cmd else f"{pip} {cmd}")
def install_plugin_requirements(pip_cmd):
plugins_dir = "plugins"
if os.path.exists(plugins_dir) and os.path.isdir(plugins_dir):
for entry in os.listdir(plugins_dir):
plugin_req = os.path.join(plugins_dir, entry, "requirements.txt")
if os.path.isfile(plugin_req):
print(f"\n[*] Installing requirements for plugin '{entry}'...")
run_cmd(f"{pip_cmd} -r \"{plugin_req}\"")
def get_env_details(name, env_data):
env_type = env_data["type"]
dir_name = env_data["path"]
entry = ENV_TEMPLATES[env_type]
py_exec = entry['run'].format(dir=dir_name).strip('"')
full_cmd = [py_exec, "-c", VERSION_CHECK_SCRIPT]
try:
output = subprocess.check_output(full_cmd, encoding='utf-8', stderr=subprocess.DEVNULL)
data = {k: v for k, v in [x.split('=') for x in output.strip().split('||')]}
data['path'] = dir_name
data['type'] = env_type
return data
except Exception as e:
return {'error': str(e), 'type': env_type, 'path': dir_name}
def show_status():
manager = EnvsManager()
print("\n" + "="*90)
print(f"{'INSTALLED ENVIRONMENTS & VERSIONS':^90}")
print("="*90)
envs = manager.list_envs()
active = manager.get_active()
if not envs:
print(" No environments installed.")
print("="*90)
return
print(f"{'NAME':<15} | {'TYPE':<5} | {'PYTHON':<8} | {'TORCH':<15} | {'TRITON':<9} | {'SAGE':<10} | {'SPARGE':<10} | {'FLASH':<10}")
print("-" * 90)
for name, data in envs.items():
details = get_env_details(name, data)
marker = "*" if name == active else " "
display_name = f"[{marker}] {name}"
if 'error' in details:
print(f"{display_name:<15} | {data['type']:<5} | [Error reading environment]")
continue
print(f"{display_name:<15} | {data['type']:<5} | "
f"{details.get('python','?'):<8} | "
f"{details.get('torch','?'):<15} | "
f"{details.get('triton','?'):<9} | "
f"{details.get('sageattention','?'):<10} | "
f"{details.get('spas_sage_attn','?'):<10} | "
f"{details.get('flash_attn','?'):<10}")
print("-" * 90)
print(f" * = Active Environment")
print("="*90 + "\n")
def install_logic(env_name, env_type, env_path, py_k, torch_k, triton_k, sage_k, sparge_k, flash_k, kernel_list, config):
template = ENV_TEMPLATES[env_type]
target_py_ver = config['components']['python'][py_k]['ver']
print(f"\n[1/3] Preparing Environment: {env_name} ({env_type})...")
if env_type != "none":
if env_type == "venv":
py_ver_short = ".".join(target_py_ver.split(".")[:2])
if IS_WIN:
create_cmd = f"py -{py_ver_short} -m venv \"{env_path}\""
else:
create_cmd = f"python{py_ver_short} -m venv \"{env_path}\""
else:
create_cmd = template["create"].format(
ver=target_py_ver,
dir=env_path,
sys_py=sys.executable
)
if create_cmd:
run_cmd(create_cmd)
pip = template["install"].format(dir=env_path)
print(f"\n[2/3] Installing Torch: {config['components']['torch'][torch_k]['label']}...")
torch_cmd = resolve_cmd(config['components']['torch'][torch_k]['cmd'])
run_cmd(f"{pip} {torch_cmd}")
print(f"\n[3/3] Installing Requirements & Extras...")
run_cmd(f"{pip} -r requirements.txt")
if triton_k:
cmd = resolve_cmd(config['components']['triton'][triton_k]['cmd'])
if cmd: run_cmd(f"{pip} {cmd}")
if sage_k:
cmd = resolve_cmd(config['components']['sage'][sage_k]['cmd'])
if cmd.startswith("http") or cmd.startswith("sageattention"):
run_cmd(f"{pip} {cmd}")
else:
if env_type == "venv" or env_type == "uv":
act = f". \"{env_path}/bin/activate\" && " if not IS_WIN else ""
run_cmd(f"{act}{cmd}")
elif env_type == "conda":
pass
if sparge_k:
cmd = resolve_cmd(config['components']['sparge'][sparge_k]['cmd'])
if cmd: run_pip_component(pip, cmd)
if flash_k:
cmd = resolve_cmd(config['components']['flash'][flash_k]['cmd'])
if cmd: run_cmd(f"{pip} {cmd}")
for k in kernel_list:
if k in config['components']['kernels']:
cmd = resolve_cmd(config['components']['kernels'][k]['cmd'])
if cmd: run_cmd(f"{pip} {cmd}")
install_plugin_requirements(pip)
def menu(title, options, recommended_key=None):
print(f"\n--- {title} ---")
keys = list(options.keys())
for i, k in enumerate(keys):
rec = " [RECOMMENDED FOR YOUR GPU]" if k == recommended_key else ""
print(f"{i+1}. {options[k]['label']}{rec}")
choice = input(f"Select option (Enter for Recommended): ")
if choice == "" and recommended_key: return recommended_key
try: return keys[int(choice)-1]
except: return recommended_key
def do_install_interactive(env_type, config, detected_key):
manager = EnvsManager()
create_wgp_config(detected_key, config)
default_name = f"env_{env_type}" if env_type != "none" else "system"
print(f"\n--- Configuration for {env_type} ---")
name = input(f"Enter a name for this environment (Default: {default_name}): ").strip()
if not name: name = default_name
cwd = os.getcwd()
path = os.path.join(cwd, name) if env_type != "none" else ""
if name in manager.list_envs():
print(f"\n[!] Warning: Environment '{name}' already exists in registry.")
choice = input("Do you want to overwrite it? (This will delete the old folder) [y/N]: ").lower()
if choice != 'y': return
manager.remove_env(name)
elif os.path.exists(path) and env_type != "none":
print(f"\n[!] Warning: Directory '{path}' exists but is not registered.")
choice = input("Do you want to overwrite this directory? [y/N]: ").lower()
if choice != 'y': return
try: shutil.rmtree(path)
except: pass
print("\n--- Select Install Mode ---")
print("1. Autoselect (Based on your GPU)")
print("2. Manual Selection")
print("3. Use Latest")
mode = input("Select option (1-3) [Default: 1]: ").strip()
if mode == "2":
base = config['gpu_profiles'][detected_key]
py_k = menu("Python Version", config['components']['python'], base['python'])
torch_k = menu("Torch Version", config['components']['torch'], base['torch'])
triton_k = menu("Triton", config['components']['triton'], base['triton'])
sage_k = menu("Sage Attention", config['components']['sage'], base['sage'])
sparge_k = menu("Sparge Attention", config['components']['sparge'], base.get('sparge'))
flash_k = menu("Flash Attention", config['components']['flash'], base['flash'])
kernels = base['kernels']
install_logic(name, env_type, path, py_k, torch_k, triton_k, sage_k, sparge_k, flash_k, kernels, config)
elif mode == "3":
p = config['gpu_profiles']['RTX_50']
install_logic(name, env_type, path, p['python'], p['torch'], p['triton'], p['sage'], p.get('sparge'), p.get('flash'), p['kernels'], config)
else:
p = config['gpu_profiles'][detected_key]
install_logic(name, env_type, path, p['python'], p['torch'], p['triton'], p['sage'], p.get('sparge'), p.get('flash'), p['kernels'], config)
manager.add_env(name, env_type, path)
if len(manager.list_envs()) > 1:
choice = input(f"\nDo you want to make '{name}' the active environment? [Y/n]: ").lower()
if choice != 'n':
manager.set_active(name)
else:
print(f"\n[*] '{name}' is the only environment, setting as active.")
manager.set_active(name)
def do_install_auto(env_type, config, detected_key):
manager = EnvsManager()
create_wgp_config(detected_key, config)
name = f"env_{env_type}" if env_type != "none" else "system"
cwd = os.getcwd()
path = os.path.join(cwd, name) if env_type != "none" else ""
if name in manager.list_envs():
manager.remove_env(name)
elif os.path.exists(path) and env_type != "none":
try: shutil.rmtree(path)
except: pass
print(f"\n[*] Starting Automatic Install (Hardware Profile: {detected_key})...")
p = config['gpu_profiles'][detected_key]
install_logic(name, env_type, path, p['python'], p['torch'], p['triton'], p['sage'], p.get('sparge'), p.get('flash'), p['kernels'], config)
manager.add_env(name, env_type, path)
manager.set_active(name)
print(f"\n[*] Automatic Install Complete! '{name}' is now active.")
def open_terminal():
manager = EnvsManager()
env_name = manager.get_active()
if not env_name:
print("[!] No active environment. Please select or install one first.")
input("Press Enter...")
return
env_data = manager.list_envs().get(env_name)
if not env_data:
print(f"[!] Could not find environment data for '{env_name}'.")
return
e_type = env_data["type"]
e_path = env_data["path"]
print(f"\n[*] Spawning interactive terminal for '{env_name}'...")
print(f"[*] (Type 'exit' when you are done to return to the menu)\n")
if IS_WIN:
if e_type in ["venv", "uv"]:
act_bat = os.path.join(e_path, 'Scripts', 'activate.bat')
subprocess.run(f'cmd.exe /K "{act_bat}"')
elif e_type == "conda":
conda_bat = "conda.bat"
if not shutil.which("conda"):
user = os.environ.get("USERPROFILE", "")
paths = [
os.path.join(user, "Miniconda3", "condabin", "conda.bat"),
os.path.join(user, "Anaconda3", "condabin", "conda.bat"),
r"C:\ProgramData\Miniconda3\condabin\conda.bat"
]
for p in paths:
if os.path.exists(p):
conda_bat = p
break
subprocess.run(f'cmd.exe /K "{conda_bat}" activate "{e_path}"')
else:
subprocess.run('cmd.exe /K')
else:
rc_cmd = "if [ -f ~/.bashrc ]; then source ~/.bashrc; fi\n"
if e_type in ["venv", "uv"]:
rc_cmd += f"source '{os.path.join(e_path, 'bin', 'activate')}'\n"
elif e_type == "conda":
rc_cmd += (
"if command -v conda >/dev/null 2>&1; then eval \"$(conda shell.bash hook)\"; "
"else for base in \"$HOME/miniconda3\" \"$HOME/anaconda3\" \"/opt/miniconda3\" \"/opt/anaconda3\"; do "
"if [ -f \"$base/etc/profile.d/conda.sh\" ]; then source \"$base/etc/profile.d/conda.sh\"; break; fi; done; fi\n"
f"conda activate '{e_path}'\n"
)
linux_shell_cmd = f"bash --rcfile <(cat << 'EOF_WAN2GP'\n{rc_cmd}EOF_WAN2GP\n)"
subprocess.run(linux_shell_cmd, shell=True, executable='/bin/bash')
def switch_git_branch():
if not os.path.exists(".git"):
print("[!] Not a git repository. Cannot switch branches.")
input("Press Enter...")
return
print("\n[*] Fetching latest branches from remote...")
try:
subprocess.run(["git", "fetch", "--prune"], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
out = subprocess.check_output(["git", "branch", "-a"], encoding='utf-8')
except Exception as e:
print(f"[!] Git error: {e}")
input("Press Enter...")
return
branches = set()
active_branch = ""
for line in out.splitlines():
line = line.strip()
if not line: continue
if "->" in line: continue
is_active = line.startswith("*")
b = line.lstrip("* ").strip()
if b.startswith("remotes/origin/"):
b = b[len("remotes/origin/"):]
elif b.startswith("remotes/"):
b = "/".join(b.split("/")[2:])
if b and b != "HEAD":
branches.add(b)
if is_active:
active_branch = b
branch_list = sorted(list(branches))
if not branch_list:
print("[!] No branches found.")
input("Press Enter...")
return
print("\n--- Available Branches ---")
for i, b in enumerate(branch_list):
marker = "*" if b == active_branch else " "
print(f"{i+1}. [{marker}] {b}")
val = input(f"\nEnter number or name of branch (Default: {active_branch}): ").strip()
if not val:
return
target = val
if val.isdigit():
idx = int(val) - 1
if 0 <= idx < len(branch_list):
target = branch_list[idx]
if target == active_branch:
print(f"[*] Already on '{target}'.")
input("Press Enter...")
return
print(f"[*] Switching to '{target}'...")
try:
subprocess.run(["git", "checkout", target], check=True, capture_output=True, text=True)
print(f"[*] Successfully switched to '{target}'.")
except subprocess.CalledProcessError as e:
err_msg = e.stderr.lower() if e.stderr else ""
if "overwritten" in err_msg or "please commit your changes or stash them" in err_msg:
print(f"\n[!] Failed to switch to '{target}' due to uncommitted changes.")
print("How would you like to handle this?")
print(" 1. Carry changes over to new branch (Not recommended)")
print(" 2. Stash changes")
print(" 3. Discard modifications")
print(" 4. Cancel")
opt = input("\nSelect option (1-4): ").strip()
if opt == "1":
try:
print("\n[*] Stashing changes...")
subprocess.run(["git", "stash", "push", "-m", f"Auto-stash before carrying to {target}"], check=True)
subprocess.run(["git", "checkout", target], check=True)
print(f"[*] Switched to '{target}'. Reapplying changes...")
pop_res = subprocess.run(["git", "stash", "pop"])
if pop_res.returncode != 0:
print("\n[!] Note: There were merge conflicts when reapplying your changes.")
print(" The changes are still saved in your stash, but you will need to resolve conflicts in the files.")
else:
print("[*] Successfully carried changes over.")
except Exception as inner_e:
print(f"[!] Failed to carry changes: {inner_e}")
elif opt == "2":
try:
print("\n[*] Stashing changes...")
subprocess.run(["git", "stash", "push", "-m", f"Auto-stash before switching to {target}"], check=True)
subprocess.run(["git", "checkout", target], check=True)
print(f"[*] Successfully set changes aside and switched to '{target}'.")
except Exception as inner_e:
print(f"[!] Failed to stash and switch: {inner_e}")
elif opt == "3":
try:
print("\n[*] Discarding tracked changes...")
subprocess.run(["git", "checkout", "-f", target], check=True)
print(f"[*] Successfully switched to '{target}'.")
except Exception as inner_e:
print(f"[!] Failed to force switch: {inner_e}")
else:
print("[*] Cancelled.")
else:
print(f"[!] Error switching branch:\n{e.stderr.strip() if e.stderr else str(e)}")
except Exception as e:
print(f"[!] Error: {e}")
input("Press Enter...")
def manage_git_stashes():
if not os.path.exists(".git"):
print("[!] Not a git repository.")
input("Press Enter...")
return
try:
out = subprocess.check_output(["git", "stash", "list"], encoding='utf-8')
except Exception as e:
print(f"[!] Git error: {e}")
input("Press Enter...")
return
stashes = [line for line in out.splitlines() if line.strip()]
if not stashes:
print("[*] No stashed changes found. You're all clean!")
input("Press Enter...")
return
print("\n--- Saved Stashes ---")
for i, s in enumerate(stashes):
print(f"{i+1}. {s}")
print("\nOptions:")
print(" A. Apply a stash (Restore changes but keep the stash)")
print(" P. Pop a stash (Restore changes and delete the stash)")
print(" D. Delete a stash")
print(" C. Cancel")
choice = input("\nSelect action (A/P/D/C): ").strip().upper()
if choice not in ["A", "P", "D"]:
return
idx_str = input(f"Enter stash number (1-{len(stashes)}): ").strip()
if not idx_str.isdigit() or not (1 <= int(idx_str) <= len(stashes)):
print("[!] Invalid stash number.")
input("Press Enter...")
return
stash_ref = f"stash@{{{int(idx_str)-1}}}"
try:
if choice == "A":
print(f"\n[*] Applying {stash_ref}...")
subprocess.run(["git", "stash", "apply", stash_ref])
elif choice == "P":
print(f"\n[*] Popping {stash_ref}...")
subprocess.run(["git", "stash", "pop", stash_ref])
elif choice == "D":
conf = input(f"Are you sure you want to delete {stash_ref}? (y/n): ").strip().lower()
if conf == 'y':
subprocess.run(["git", "stash", "drop", stash_ref])
print(f"[*] Dropped {stash_ref}.")
except Exception as e:
print(f"\n[!] Command returned an error or warning (you may need to resolve file conflicts manually).")
input("\nPress Enter...")
def do_manage():
manager = EnvsManager()
while True:
os.system('cls' if IS_WIN else 'clear')
print("==========================================================================================")
print(f"{'ENVIRONMENT MANAGER':^90}")
print("==========================================================================================")
envs = manager.list_envs()
active = manager.get_active()
keys = list(envs.keys())
if not envs:
print(" No environments installed.")
else:
for name in keys:
data = envs[name]
status = "(Active)" if name == active else ""
print(f" - {name:<15} [{data['type']}] {status}")
print("------------------------------------------------------------------------------------------")
print("1. Set Active Environment")
print("2. Delete Environment")
print("3. Add Existing Environment")
print("4. List Environment Details")
print("5. Open Terminal in Active Environment")
print("6. Switch Git Branch")
print("7. Manage Git Stashes")
print("8. Exit")
choice = input("\nSelect option: ")
if choice == "1":
if not keys:
input("No environments to activate. Press Enter...")
continue
print("\nAvailable Environments:")
for i, k in enumerate(keys):
print(f" {i+1}. {k}")
val = input("\nEnter name or number of environment to activate: ").strip()
if val.isdigit() and 1 <= int(val) <= len(keys):
name = keys[int(val)-1]
else:
name = val
manager.set_active(name)
input("Press Enter...")
elif choice == "2":
if not keys:
input("No environments to delete. Press Enter...")
continue
print("\nAvailable Environments:")
for i, k in enumerate(keys):
print(f" {i+1}. {k}")
val = input("\nEnter name or number of environment to DELETE: ").strip()
if val.isdigit() and 1 <= int(val) <= len(keys):
name = keys[int(val)-1]
else:
name = val
if name in envs:
conf = input(f"Are you sure you want to delete '{name}' and its files? (y/n): ")
if conf.lower() == 'y':
manager.remove_env(name)
input("Deleted. Press Enter...")
else:
print(f"[!] Environment '{name}' not found.")
input("Press Enter...")
elif choice == "3":
path = input("Enter the path to the existing environment folder: ").strip()
if not os.path.exists(path):
print("[!] Error: Path does not exist.")
else:
name = input("Enter a nickname for this environment: ").strip()
if not name: name = os.path.basename(path.rstrip(os.sep))
print("\nSelect Environment Type:")
print("1. venv")
print("2. uv")
print("3. conda")
t_choice = input("Choice (Default 1): ")
e_type = "uv" if t_choice == "2" else "conda" if t_choice == "3" else "venv"
manager.add_env(name, e_type, os.path.abspath(path))
print(f"[*] Registered '{name}' at {os.path.abspath(path)}")
input("Press Enter...")
elif choice == "4":
show_status()
input("Press Enter...")
elif choice == "5":
open_terminal()
elif choice == "6":
switch_git_branch()
elif choice == "7":
manage_git_stashes()
elif choice == "8":
break
def do_upgrade(config):
manager = EnvsManager()
print("\n" + "="*90)
print(f"{'WAN2GP MANUAL COMPONENT UPGRADE':^90}")
print("="*90)
env_name = manager.resolve_target_env()
env_data = manager.list_envs()[env_name]
gpu_name, vendor = get_gpu_info()
rec = config['gpu_profiles'][get_profile_key(gpu_name, vendor)]
py_k = menu("Python Version", config['components']['python'], rec['python'])
torch_k = menu("Torch Version", config['components']['torch'], rec['torch'])
triton_k = menu("Triton", config['components']['triton'], rec['triton'])
sage_k = menu("Sage Attention", config['components']['sage'], rec['sage'])
sparge_k = menu("Sparge Attention", config['components']['sparge'], rec.get('sparge'))
flash_k = menu("Flash Attention", config['components']['flash'], rec['flash'])
install_logic(env_name, env_data['type'], env_data['path'], py_k, torch_k, triton_k, sage_k, sparge_k, flash_k, rec['kernels'], config)
def get_system_specs():
ram_gb = 0
vram_gb = 0
if IS_WIN:
try:
out = subprocess.check_output(
["powershell", "-NoProfile", "-Command", "(Get-CimInstance Win32_ComputerSystem).TotalPhysicalMemory"],
encoding='utf-8', stderr=subprocess.DEVNULL
).strip()
if out:
ram_gb = int(out) / (1024**3)
except:
try:
out = subprocess.check_output(
"wmic computersystem get TotalPhysicalMemory /value",
shell=True, encoding='utf-8', stderr=subprocess.DEVNULL
)
for line in out.splitlines():
if "TotalPhysicalMemory=" in line:
ram_gb = int(line.split('=')[1]) / (1024**3)
break
except:
pass
else:
try:
with open('/proc/meminfo', 'r') as f:
for line in f:
if 'MemTotal' in line:
kb_val = float(line.split()[1])
ram_gb = kb_val / (1024**2)
break
except: pass
if ram_gb == 0:
print("[!] Warning: Could not detect System RAM. Defaulting to 16GB.")
ram_gb = 16
try:
out = subprocess.check_output(
["nvidia-smi", "--query-gpu=memory.total", "--format=csv,noheader,nounits"],
encoding='utf-8', stderr=subprocess.DEVNULL
).strip()
vram_gb = float(out.split('\n')[0]) / 1024
except:
print("[!] Warning: Could not detect VRAM via nvidia-smi. Defaulting to 8GB.")
vram_gb = 8
return ram_gb, vram_gb
def create_wgp_config(profile_key, config_data):
WGP_CONFIG_FILE = "wgp_config.json"
if os.path.exists(WGP_CONFIG_FILE):
return
print("\n[*] Auto-generating wgp_config.json based on hardware...")
ram, vram = get_system_specs()
print(f" Detected: {int(ram)}GB RAM / {int(vram)}GB VRAM")
has_high_ram = ram > 60
has_mid_ram = ram > 30
has_huge_vram = vram > 22
has_high_vram = vram > 11
pid = 5
if has_high_ram and has_huge_vram:
pid = 1
elif has_high_ram:
pid = 2
elif has_mid_ram and has_huge_vram:
pid = 3
elif has_mid_ram and has_high_vram:
pid = 4
else:
pid = 5
prof_settings = config_data['gpu_profiles'].get(profile_key, {})
attn_mode = prof_settings.get("attention", "")
if not attn_mode:
if "50" in profile_key or "40" in profile_key or "30" in profile_key:
attn_mode = "sage2"
elif "20" in profile_key:
attn_mode = "sage"
compile_mode = ""
triton_key = prof_settings.get('triton')
if triton_key and triton_key != "none":
compile_mode = "transformer"
config_out = {
"attention_mode": attn_mode,
"compile": compile_mode,
"video_profile": pid,
"image_profile": pid,
"audio_profile": pid,
}
try:
with open(WGP_CONFIG_FILE, 'w') as f:
json.dump(config_out, f, indent=4)
print(f" Created config with Profile {pid}, Attention: '{attn_mode}', Compile: '{compile_mode}'")
except Exception as e:
print(f"[!] Error writing config: {e}")
def inject_system_paths():
if not IS_WIN:
return
paths = []
user = os.environ.get("USERPROFILE", "")
local_app = os.environ.get("LOCALAPPDATA", "")
appdata = os.environ.get("APPDATA", "")
for base in [os.path.join(user, "Miniconda3"), os.path.join(user, "Anaconda3"), r"C:\ProgramData\Miniconda3"]:
c_bin = os.path.join(base, "condabin")
if os.path.exists(c_bin):
paths.extend([c_bin, os.path.join(base, "Scripts"), base])
break
if user: paths.append(os.path.join(user, ".local", "bin"))
if appdata: paths.append(os.path.join(appdata, "uv", "bin"))
if local_app:
paths.extend([
os.path.join(local_app, "Programs", "Python", "PyManager"),
os.path.join(local_app, "Programs", "Python", "Python311", "Scripts")
])
current_path = os.environ.get("PATH", "")
for p in paths:
if p and os.path.exists(p) and p not in current_path:
current_path = f"{p};{current_path}"
os.environ["PATH"] = current_path
def repair_git_repo():
print("[*] Repairing WAN2GP repository...")
if not os.path.exists(".git"):
run_cmd("git init")
try:
subprocess.run(["git", "remote", "add", "origin", "https://github.com/deepbeepmeep/Wan2GP.git"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
except:
pass
run_cmd("git fetch origin")
try:
subprocess.run(["git", "rev-parse", "--verify", "origin/main"], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
default_branch = "main"
except subprocess.CalledProcessError:
default_branch = "master"
print(f"[*] Force resetting local files to match origin/{default_branch}...")
run_cmd(f"git reset --hard origin/{default_branch}")
run_cmd(f"git branch -M {default_branch}")
run_cmd(f"git branch --set-upstream-to=origin/{default_branch} {default_branch}")
if __name__ == "__main__":
inject_system_paths()
parser = argparse.ArgumentParser()
parser.add_argument("mode", choices=["install", "update", "upgrade", "status", "manage", "get_env_info"])
parser.add_argument("--env", default="venv", help="Type of env for install (venv, uv, conda, none)")
parser.add_argument("--auto", action="store_true", help="Run 1-click automatic install")
args = parser.parse_args()
cfg = load_config()
if args.mode == "get_env_info":
manager = EnvsManager()
active = manager.get_active()
if not active or not manager.list_envs().get(active):
sys.exit(1)
env_data = manager.list_envs()[active]
print(f"ENV_INFO|{env_data['type']}|{env_data['path']}")
sys.exit(0)
if args.mode == "status":
show_status()
sys.exit(0)
if args.mode == "manage":
do_manage()
sys.exit(0)
gpu_name, vendor = get_gpu_info()
profile_key = get_profile_key(gpu_name, vendor)
profile = cfg['gpu_profiles'][profile_key]
if args.mode == "install":
print(f"Hardware Detected: {gpu_name} ({vendor})")
if args.auto:
do_install_auto(args.env, cfg, profile_key)
else:
do_install_interactive(args.env, cfg, profile_key)
elif args.mode == "update":
manager = EnvsManager()
env_name = manager.resolve_target_env()
env_data = manager.list_envs()[env_name]
needs_install = False
if not os.path.exists(".git"):
print("[*] No .git folder found.")
repair_git_repo()
needs_install = True
else:
print("[*] Checking for updates...")
try:
old_head = subprocess.check_output(["git", "rev-parse", "HEAD"], encoding='utf-8', stderr=subprocess.DEVNULL).strip()
except:
old_head = ""
try:
subprocess.run(["git", "pull"], check=True)
new_head = subprocess.check_output(["git", "rev-parse", "HEAD"], encoding='utf-8', stderr=subprocess.DEVNULL).strip()
if old_head != new_head or not old_head:
needs_install = True
except subprocess.CalledProcessError:
print("\n[!] 'git pull' failed.")
print("[*] Attempting automatic recovery...")
repair_git_repo()
needs_install = True
if needs_install:
print("\n[*] Updates found. Installing/Verifying requirements...")
install_fmt = ENV_TEMPLATES[env_data['type']]['install']
pip_cmd = install_fmt.format(dir=env_data['path'])
run_cmd(f"{pip_cmd} -r requirements.txt")
else:
print("\n[*] Code is already up to date. Skipping requirements installation.")
elif args.mode == "upgrade":
do_upgrade(cfg)

Xet Storage Details

Size:
40.9 kB
·
Xet hash:
6ffeabaa1e713e6090e64a468dbac48998d35520426fed618cf0a37fc04b321f

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.