Files
basegame-vcko/server.py
2026-05-02 23:25:02 -07:00

626 lines
24 KiB
Python

#!/usr/bin/env python3
"""
FastAPI server for VCK Online - Development/testing server
Simple REST API to replace the socket-based protocol
"""
import re
import time
import uuid
from pathlib import Path
from typing import Dict, List, Optional
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from pydantic import BaseModel
import shortuuid
from game import Game, LobbyMember, GameMember, load_game_data, GameObjectEncoder
import json
_REPO_ROOT = Path(__file__).resolve().parent
_DEV_CLIENT_INDEX = _REPO_ROOT / "static" / "dev-client" / "index.html"
_GAME_CLIENT_INDEX = _REPO_ROOT / "static" / "game" / "index.html"
# Card image directories — keyed by the singular type name used in filenames
_CARD_IMAGE_DIRS: Dict[str, Path] = {
"monster": _REPO_ROOT / "images" / "monsters",
"citizen": _REPO_ROOT / "images" / "citizens",
"domain": _REPO_ROOT / "images" / "domains",
"duke": _REPO_ROOT / "images" / "dukes",
"starter": _REPO_ROOT / "images" / "starters",
}
_IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".webp"}
app = FastAPI(title="VCK Online API", description="Development server for Valeria Card Kingdoms Online")
class ConnectionManager:
def __init__(self):
self._conns: Dict[str, List[tuple]] = {} # game_id -> [(ws, player_id)]
async def connect(self, game_id: str, websocket: WebSocket, player_id: Optional[str] = None):
await websocket.accept()
self._conns.setdefault(game_id, []).append((websocket, player_id))
def disconnect(self, game_id: str, websocket: WebSocket):
self._conns[game_id] = [
(ws, pid) for ws, pid in self._conns.get(game_id, []) if ws is not websocket
]
async def broadcast(self, game_id: str, game):
conns = list(self._conns.get(game_id, []))
dead = []
for ws, pid in conns:
try:
await ws.send_json({"type": "state", "state": _serialize_game_for_player(game, pid)})
except Exception:
dead.append(ws)
if dead:
self._conns[game_id] = [(ws, pid) for ws, pid in conns if ws not in dead]
manager = ConnectionManager()
class LobbyWsManager:
"""Push lobby snapshots to subscribed browsers (personalized by optional player_id)."""
def __init__(self):
self._connections = {} # WebSocket -> Optional[player_id]
async def connect(self, websocket: WebSocket):
await websocket.accept()
self._connections[websocket] = None
def disconnect(self, websocket: WebSocket):
self._connections.pop(websocket, None)
def identify(self, websocket: WebSocket, player_id: Optional[str]):
if websocket in self._connections:
self._connections[websocket] = player_id or None
async def send_snapshot(self, websocket: WebSocket):
pid = self._connections.get(websocket)
try:
payload = build_lobby_status_dict(pid)
await websocket.send_json({"type": "lobby_status", **payload})
except Exception:
self.disconnect(websocket)
async def broadcast_lobby(self):
dead = []
for ws in list(self._connections.keys()):
pid = self._connections.get(ws)
try:
payload = build_lobby_status_dict(pid)
await ws.send_json({"type": "lobby_status", **payload})
except Exception:
dead.append(ws)
for ws in dead:
self.disconnect(ws)
async def broadcast_game_started(self, game_id: str, player_ids: List[str]):
dead = []
msg = {"type": "game_started", "game_id": game_id, "player_ids": list(player_ids)}
for ws in list(self._connections.keys()):
try:
await ws.send_json(msg)
except Exception:
dead.append(ws)
for ws in dead:
self.disconnect(ws)
lobby_ws_manager = LobbyWsManager()
def build_lobby_status_dict(player_id: Optional[str] = None):
"""Lobby list + active game count + optional in_game/game_id for this player."""
current_time = time.time()
global lobby
lobby = [p for p in lobby if current_time - p.last_active_time <= 60]
lobby_data = []
for member in lobby:
lobby_data.append({
"player_id": member.player_id,
"name": member.name,
"is_ready": member.is_ready,
"debug_starting_resources": bool(getattr(member, "debug_starting_resources", False)),
})
response = {
"lobby": lobby_data,
"game_count": sum(1 for g in games.values() if getattr(g, "phase", None) != "game_over"),
}
if player_id:
for gamer in gamers:
if gamer.player_id == player_id:
response["in_game"] = True
response["game_id"] = gamer.game_id
return response
response["in_game"] = False
return response
# CORS middleware for web client
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# In-memory storage (simple for dev)
lobby: List[LobbyMember] = []
games: Dict[str, Game] = {}
gamers: List[GameMember] = []
# Request/Response models
class JoinLobbyRequest(BaseModel):
name: str
class RenameRequest(BaseModel):
player_id: str
name: str
class ReadyRequest(BaseModel):
player_id: str
debug_starting_resources: bool = False
class ResourcePayment(BaseModel):
"""How much gold / strength / magic the client spends on an action (validated server-side)."""
gold: int = 0
strength: int = 0
magic: int = 0
class GameActionRequest(BaseModel):
player_id: str
action_type: str # "hire_citizen", "build_domain", "slay_monster", "take_resource", "act_on_required_action", "submit_concurrent_action"
# Action parameters (varies by action type)
citizen_id: Optional[int] = None
domain_id: Optional[int] = None
monster_id: Optional[int] = None
# take_resource: "gold" | "strength" | "magic"
resource: Optional[str] = None
gold_cost: Optional[int] = None
strength_cost: Optional[int] = None
magic_cost: Optional[int] = None
# Preferred: explicit payment split (gold/strength/magic). If set, overrides legacy *_cost fields for that action.
payment: Optional[ResourcePayment] = None
action: Optional[str] = None # For act_on_required_action
harvest_slot_key: Optional[str] = None # harvest_card: e.g. "citizen:3:0"
# submit_concurrent_action: which non-ordered prompt this responds to,
# plus the opaque per-player payload (string; handler decides how to parse).
kind: Optional[str] = None
response: Optional[str] = None
# finalize_roll: optional override dice values (1-6). If omitted, server finalizes using the rolled dice.
die_one: Optional[int] = None
die_two: Optional[int] = None
def _rollback_consumed_action(game):
game.actions_remaining = int(getattr(game, "actions_remaining", 0)) + 1
game.tick_id = int(getattr(game, "tick_id", 0)) - 1
def resolve_action_payment(req: GameActionRequest):
if req.payment is not None:
return int(req.payment.gold or 0), int(req.payment.strength or 0), int(req.payment.magic or 0)
if req.action_type == "slay_monster":
return 0, int(req.strength_cost or 0), int(req.magic_cost or 0)
if req.action_type == "hire_citizen":
return int(req.gold_cost or 0), 0, int(req.magic_cost or 0)
if req.action_type == "build_domain":
return int(req.gold_cost or 0), 0, int(req.magic_cost or 0)
return int(req.gold_cost or 0), int(req.strength_cost or 0), int(req.magic_cost or 0)
# Lobby endpoints
@app.post("/api/lobby/join")
async def join_lobby(request: JoinLobbyRequest):
"""Join the lobby with a player name"""
player_id = str(shortuuid.uuid())
player = LobbyMember(request.name, player_id)
player.last_active_time = time.time()
lobby.append(player)
await lobby_ws_manager.broadcast_lobby()
return {"player_id": player_id, "message": "Joined lobby"}
@app.post("/api/lobby/rename")
async def rename_player(request: RenameRequest):
"""Rename a player in the lobby"""
for player in lobby:
if player.player_id == request.player_id:
player.name = request.name
player.last_active_time = time.time()
await lobby_ws_manager.broadcast_lobby()
return {"message": "Player renamed"}
raise HTTPException(status_code=404, detail="Player not found in lobby")
@app.post("/api/lobby/leave")
async def leave_lobby(player_id: str):
"""Leave the lobby"""
global lobby
lobby = [p for p in lobby if p.player_id != player_id]
await lobby_ws_manager.broadcast_lobby()
return {"message": "Left lobby"}
@app.post("/api/lobby/ready")
async def set_ready(request: ReadyRequest):
"""Mark player as ready"""
for player in lobby:
if player.player_id == request.player_id:
player.is_ready = True
player.debug_starting_resources = bool(request.debug_starting_resources)
player.last_active_time = time.time()
# Check if all players are ready
ready_count = sum(1 for p in lobby if p.is_ready)
if ready_count == len(lobby) and len(lobby) >= 2:
# Start game
new_game_id = str(uuid.uuid4())
players_to_remove = []
for p in lobby:
if p.is_ready:
gamer = GameMember(p.player_id, p.name, new_game_id)
gamers.append(gamer)
players_to_remove.append(p)
debug_starting_resources = any(bool(getattr(p, "debug_starting_resources", False)) for p in players_to_remove)
# Remove ready players from lobby
for p in players_to_remove:
lobby.remove(p)
# Create game
try:
# Get only the gamers for this new game
game_gamers = [g for g in gamers if g.game_id == new_game_id]
game_state = load_game_data(
new_game_id,
"base1",
game_gamers,
debug_starting_resources=debug_starting_resources,
)
new_game = Game(game_state)
new_game.last_active_time = time.time()
# Auto-run the start-of-game roll/harvest so the first state is actionable.
while new_game.advance_tick():
if getattr(new_game, "phase", None) == "action":
break
games[new_game_id] = new_game
pid_list = [g.player_id for g in game_gamers]
await lobby_ws_manager.broadcast_game_started(new_game_id, pid_list)
await lobby_ws_manager.broadcast_lobby()
return {
"message": "Game started",
"game_id": new_game_id,
"players": [{"player_id": g.player_id, "name": g.name} for g in game_gamers]
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to create game: {str(e)}")
await lobby_ws_manager.broadcast_lobby()
return {"message": "Player ready", "all_ready": ready_count == len(lobby)}
raise HTTPException(status_code=404, detail="Player not found in lobby")
@app.post("/api/lobby/unready")
async def set_unready(request: ReadyRequest):
"""Mark player as not ready"""
for player in lobby:
if player.player_id == request.player_id:
player.is_ready = False
player.last_active_time = time.time()
await lobby_ws_manager.broadcast_lobby()
return {"message": "Player unready"}
raise HTTPException(status_code=404, detail="Player not found in lobby")
@app.get("/api/lobby/status")
async def get_lobby_status(player_id: Optional[str] = None):
"""Get lobby status. If player_id provided, also check if player is in a game."""
return build_lobby_status_dict(player_id)
@app.websocket("/ws/lobby")
async def ws_lobby(websocket: WebSocket):
await lobby_ws_manager.connect(websocket)
await lobby_ws_manager.send_snapshot(websocket)
try:
while True:
raw = await websocket.receive_text()
try:
data = json.loads(raw)
except json.JSONDecodeError:
continue
if data.get("type") == "identify":
lobby_ws_manager.identify(websocket, data.get("player_id"))
await lobby_ws_manager.send_snapshot(websocket)
except WebSocketDisconnect:
pass
finally:
lobby_ws_manager.disconnect(websocket)
# Game endpoints
def _serialize_game_for_player(game, viewer_player_id: Optional[str]):
"""
Serialize the game state for a given viewer.
Security/hidden-info rule:
- Dukes are hidden information. Only the viewing player should see their own duke.
"""
game_json = json.dumps(game, cls=GameObjectEncoder, indent=2)
state = json.loads(game_json)
players = state.get("player_list") or []
if not isinstance(players, list):
return state
for p in players:
if not isinstance(p, dict):
continue
pid = p.get("player_id")
if viewer_player_id is None or str(pid) != str(viewer_player_id):
# Hide opponent (and spectator) dukes.
p["owned_dukes"] = []
return state
@app.get("/api/game/{game_id}/state")
async def get_game_state(game_id: str, player_id: Optional[str] = None):
"""Get the current game state"""
game = games.get(game_id)
if not game:
raise HTTPException(status_code=404, detail="Game not found")
game.last_active_time = time.time()
# Ensure the beginning-of-turn roll/harvest are automatic (including the very first fetch).
while getattr(game, "phase", None) in ("roll", "harvest"):
if not game.advance_tick():
break
if getattr(game, "phase", None) == "action":
break
return _serialize_game_for_player(game, player_id)
@app.post("/api/game/{game_id}/action")
async def perform_game_action(game_id: str, request: GameActionRequest):
"""Perform a game action (hire citizen, build domain, slay monster, etc.)"""
game = games.get(game_id)
if not game:
raise HTTPException(status_code=404, detail="Game not found")
game.last_active_time = time.time()
try:
if request.action_type == "hire_citizen":
if request.citizen_id is None:
raise HTTPException(status_code=400, detail="citizen_id required")
if request.payment is None and request.gold_cost is None and request.magic_cost is None:
raise HTTPException(status_code=400, detail="payment or gold_cost/magic_cost required")
if not game.consume_player_action(request.player_id):
raise HTTPException(status_code=400, detail="Not your turn (or no actions remaining)")
g, s, m = resolve_action_payment(request)
try:
game.hire_citizen(request.player_id, request.citizen_id, g, m, s)
except ValueError as e:
_rollback_consumed_action(game)
raise HTTPException(status_code=400, detail=str(e))
except Exception:
_rollback_consumed_action(game)
raise
game.finish_turn_if_no_actions_remaining()
elif request.action_type == "build_domain":
if request.domain_id is None:
raise HTTPException(status_code=400, detail="domain_id required")
if request.payment is None and request.gold_cost is None and request.magic_cost is None:
raise HTTPException(status_code=400, detail="payment or gold_cost/magic_cost required")
if not game.consume_player_action(request.player_id):
raise HTTPException(status_code=400, detail="Not your turn (or no actions remaining)")
g, s, m = resolve_action_payment(request)
try:
game.build_domain(request.player_id, request.domain_id, g, m, s)
except ValueError as e:
_rollback_consumed_action(game)
raise HTTPException(status_code=400, detail=str(e))
except Exception:
_rollback_consumed_action(game)
raise
game.finish_turn_if_no_actions_remaining()
elif request.action_type == "slay_monster":
if request.monster_id is None:
raise HTTPException(status_code=400, detail="monster_id required")
if request.payment is None and request.strength_cost is None and request.magic_cost is None:
raise HTTPException(status_code=400, detail="payment or strength_cost/magic_cost required")
if not game.consume_player_action(request.player_id):
raise HTTPException(status_code=400, detail="Not your turn (or no actions remaining)")
g, s, m = resolve_action_payment(request)
try:
game.slay_monster(request.player_id, request.monster_id, s, m, g)
except ValueError as e:
_rollback_consumed_action(game)
raise HTTPException(status_code=400, detail=str(e))
except Exception:
_rollback_consumed_action(game)
raise
game.finish_turn_if_no_actions_remaining()
elif request.action_type == "take_resource":
if request.resource is None or not str(request.resource).strip():
raise HTTPException(status_code=400, detail='resource required ("gold", "strength", or "magic")')
r = str(request.resource).strip().lower()
if r not in ("gold", "strength", "magic"):
raise HTTPException(status_code=400, detail='resource must be "gold", "strength", or "magic"')
if not game.consume_player_action(request.player_id):
raise HTTPException(status_code=400, detail="Not your turn (or no actions remaining)")
try:
game.take_resource(request.player_id, r)
except ValueError as e:
_rollback_consumed_action(game)
raise HTTPException(status_code=400, detail=str(e))
except Exception:
_rollback_consumed_action(game)
raise
game.finish_turn_if_no_actions_remaining()
elif request.action_type == "act_on_required_action":
if request.action is None:
raise HTTPException(status_code=400, detail="action required")
game.act_on_required_action(request.player_id, request.action)
# resolving a required action may unblock the engine
game.advance_tick()
elif request.action_type == "submit_concurrent_action":
if request.response is None:
raise HTTPException(status_code=400, detail="response required")
try:
game.submit_concurrent_action(
request.player_id,
request.response,
kind=request.kind,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
elif request.action_type == "finalize_roll":
try:
game.finalize_roll(request.player_id, die_one=request.die_one, die_two=request.die_two)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
# After finalizing, auto-advance harvest as far as possible.
while getattr(game, "phase", None) == "harvest":
if not game.advance_tick():
break
if getattr(game, "phase", None) == "action":
break
elif request.action_type == "roll_phase":
raise HTTPException(status_code=400, detail="roll_phase is automatic; reserved for future reroll effects")
elif request.action_type == "harvest_phase":
raise HTTPException(status_code=400, detail="harvest_phase is automatic")
elif request.action_type == "harvest_card":
if not request.harvest_slot_key or not str(request.harvest_slot_key).strip():
raise HTTPException(status_code=400, detail="harvest_slot_key required")
try:
game.harvest_card(request.player_id, str(request.harvest_slot_key).strip())
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
elif request.action_type == "play_turn":
# Advance through roll+harvest, then leave at action phase
# (player actions will drive action ticks)
while game.advance_tick():
if game.phase == "action":
break
else:
raise HTTPException(status_code=400, detail=f"Unknown action type: {request.action_type}")
# Push updated state to all WebSocket subscribers for this game.
await manager.broadcast(game_id, game)
return {"message": "Action performed", "game_state": _serialize_game_for_player(game, request.player_id)}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Action failed: {str(e)}")
# Cleanup inactive games (runs periodically)
@app.on_event("startup")
async def startup_event():
"""Cleanup task for inactive games"""
import asyncio
async def cleanup():
while True:
await asyncio.sleep(30) # Check every 30 seconds
current_time = time.time()
inactive_games = [
game_id for game_id, game in games.items()
if current_time - game.last_active_time > 180
]
for game_id in inactive_games:
del games[game_id]
# Remove gamers from this game
global gamers
gamers = [g for g in gamers if g.game_id != game_id]
asyncio.create_task(cleanup())
# ── Card image lookup ────────────────────────────────────────────────────────
@app.get("/card-image/{card_type}/{card_id}")
async def card_image(card_type: str, card_id: int):
"""Return the card image matched by type + numeric ID prefix."""
dir_path = _CARD_IMAGE_DIRS.get(card_type)
if not dir_path or not dir_path.exists():
raise HTTPException(status_code=404, detail="Unknown card type")
prefix = f"{card_type}_{card_id:02d}_"
for f in sorted(dir_path.iterdir()):
if f.name.startswith(prefix) and f.suffix.lower() in _IMAGE_EXTS:
return FileResponse(str(f), media_type="image/jpeg")
raise HTTPException(status_code=404, detail="Image not found")
# Serve static files and simple HTML client
try:
app.mount("/static", StaticFiles(directory="static"), name="static")
app.mount("/images", StaticFiles(directory=str(_REPO_ROOT / "images")), name="images")
except Exception:
pass # static / images directory might not exist
@app.websocket("/ws/game/{game_id}")
async def ws_game(websocket: WebSocket, game_id: str, player_id: Optional[str] = None):
game = games.get(game_id)
if not game:
await websocket.accept()
await websocket.send_json({"type": "error", "code": 4004, "message": "Game not found"})
await websocket.close(code=4004)
return
await manager.connect(game_id, websocket, player_id)
try:
await websocket.send_json({"type": "state", "state": _serialize_game_for_player(game, player_id)})
while True:
await websocket.receive_text()
except WebSocketDisconnect:
manager.disconnect(game_id, websocket)
@app.get("/")
async def game_client():
return FileResponse(_GAME_CLIENT_INDEX, media_type="text/html")
@app.get("/debug")
async def debug_client():
return FileResponse(_DEV_CLIENT_INDEX, media_type="text/html")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)