from fastapi import FastAPI, Request, Form, HTTPException from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from sqlalchemy import create_engine, Column, Integer, String, DateTime from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, Session from sqlalchemy.sql import func from pydantic import BaseModel import random from typing import List, Dict, Any from datetime import datetime import os # Create FastAPI app app = FastAPI(title="Vibe Praying", version="1.0.0") # Database setup SQLALCHEMY_DATABASE_URL = "sqlite:///./prayer_pairs.db" engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) Base = declarative_base() # Database models class PairingSession(Base): __tablename__ = "pairing_sessions" id = Column(Integer, primary_key=True, index=True) session_name = Column(String, index=True) created_at = Column(DateTime, default=func.now()) names = Column(String) # JSON string of names pairs = Column(String) # JSON string of pairs class Pair(Base): __tablename__ = "pairs" id = Column(Integer, primary_key=True, index=True) session_id = Column(Integer, index=True) person1 = Column(String) person2 = Column(String) created_at = Column(DateTime, default=func.now()) # Create tables Base.metadata.create_all(bind=engine) # Pydantic models class NameList(BaseModel): names: List[str] class PairingResult(BaseModel): session_name: str names: List[str] pairs: List[Dict[str, str]] created_at: datetime # Templates and static files templates = Jinja2Templates(directory="templates") app.mount("/static", StaticFiles(directory="static"), name="static") def get_db(): db = SessionLocal() try: yield db finally: db.close() def create_pairs(names: List[str]) -> List[Dict[str, str]]: """Create pairs where each person is paired with 2 others randomly.""" if len(names) < 3: raise ValueError("Need at least 3 names to create pairs") # For each person, we need to pair them with 2 others # Total pairs needed = len(names) * 2 / 2 = len(names) pairs_needed = len(names) # Create a list of all possible pairs all_possible_pairs = [] for i, person1 in enumerate(names): for j, person2 in enumerate(names): if i < j: # Avoid duplicates and self-pairing all_possible_pairs.append((person1, person2)) # Shuffle the pairs random.shuffle(all_possible_pairs) # Track how many pairs each person has person_pair_counts = {name: 0 for name in names} selected_pairs = [] # Select pairs ensuring each person gets exactly 2 pairs for person1, person2 in all_possible_pairs: # Check if adding this pair would exceed 2 pairs for either person if (person_pair_counts[person1] < 2 and person_pair_counts[person2] < 2): selected_pairs.append({"person1": person1, "person2": person2}) person_pair_counts[person1] += 1 person_pair_counts[person2] += 1 # If we have enough pairs, we're done if len(selected_pairs) >= pairs_needed: break # If we don't have enough pairs, try to add more if len(selected_pairs) < pairs_needed: # Find people who still need pairs people_needing_pairs = [name for name, count in person_pair_counts.items() if count < 2] # Try to create additional pairs for person1 in people_needing_pairs: for person2 in people_needing_pairs: if person1 != person2: # Check if this pair already exists pair_exists = any( (p["person1"] == person1 and p["person2"] == person2) or (p["person1"] == person2 and p["person2"] == person1) for p in selected_pairs ) if not pair_exists and person_pair_counts[person1] < 2 and person_pair_counts[person2] < 2: selected_pairs.append({"person1": person1, "person2": person2}) person_pair_counts[person1] += 1 person_pair_counts[person2] += 1 if len(selected_pairs) >= pairs_needed: break if len(selected_pairs) >= pairs_needed: break # If we still don't have enough pairs, try a different approach if len(selected_pairs) < pairs_needed: # Reset and try a more aggressive approach person_pair_counts = {name: 0 for name in names} selected_pairs = [] # Try to create pairs more systematically for i, person1 in enumerate(names): if person_pair_counts[person1] < 2: for j, person2 in enumerate(names): if i != j and person_pair_counts[person2] < 2: # Check if this pair already exists pair_exists = any( (p["person1"] == person1 and p["person2"] == person2) or (p["person1"] == person2 and p["person2"] == person1) for p in selected_pairs ) if not pair_exists: selected_pairs.append({"person1": person1, "person2": person2}) person_pair_counts[person1] += 1 person_pair_counts[person2] += 1 if len(selected_pairs) >= pairs_needed: break if len(selected_pairs) >= pairs_needed: break return selected_pairs @app.get("/", response_class=HTMLResponse) async def home(request: Request): """Serve the main page.""" return templates.TemplateResponse("index.html", {"request": request}) @app.post("/create-pairs") async def create_pairs_endpoint(names: NameList, session_name: str = "Default Session"): """Create pairs from a list of names.""" if len(names.names) < 3: raise HTTPException(status_code=400, detail="Need at least 3 names to create pairs") # Remove duplicates and empty names clean_names = list(set([name.strip() for name in names.names if name.strip()])) if len(clean_names) < 3: raise HTTPException(status_code=400, detail="Need at least 3 valid names after cleaning") # Create pairs pairs = create_pairs(clean_names) # Save to database db = SessionLocal() try: import json session = PairingSession( session_name=session_name, names=json.dumps(clean_names), pairs=json.dumps(pairs) ) db.add(session) db.commit() db.refresh(session) return { "session_id": session.id, "session_name": session_name, "names": clean_names, "pairs": pairs, "created_at": session.created_at } finally: db.close() @app.get("/sessions") async def get_sessions(): """Get all pairing sessions.""" db = SessionLocal() try: sessions = db.query(PairingSession).order_by(PairingSession.created_at.desc()).all() import json return [ { "id": session.id, "session_name": session.session_name, "names": json.loads(session.names), "pairs": json.loads(session.pairs), "created_at": session.created_at } for session in sessions ] finally: db.close() @app.get("/session/{session_id}") async def get_session(session_id: int): """Get a specific pairing session.""" db = SessionLocal() try: session = db.query(PairingSession).filter(PairingSession.id == session_id).first() if not session: raise HTTPException(status_code=404, detail="Session not found") import json return { "id": session.id, "session_name": session.session_name, "names": json.loads(session.names), "pairs": json.loads(session.pairs), "created_at": session.created_at } finally: db.close() if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)