vibe-praying/app.py

245 lines
8.6 KiB
Python

from fastapi import FastAPI, Request, Form, HTTPException
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.sql import func
from pydantic import BaseModel
import random
from typing import List, Dict, Any
from datetime import datetime
import os
# Create FastAPI app
app = FastAPI(title="Vibe Praying", version="1.0.0")
# Database setup
SQLALCHEMY_DATABASE_URL = "sqlite:///./prayer_pairs.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# Database models
class PairingSession(Base):
__tablename__ = "pairing_sessions"
id = Column(Integer, primary_key=True, index=True)
session_name = Column(String, index=True)
created_at = Column(DateTime, default=func.now())
names = Column(String) # JSON string of names
pairs = Column(String) # JSON string of pairs
class Pair(Base):
__tablename__ = "pairs"
id = Column(Integer, primary_key=True, index=True)
session_id = Column(Integer, index=True)
person1 = Column(String)
person2 = Column(String)
created_at = Column(DateTime, default=func.now())
# Create tables
Base.metadata.create_all(bind=engine)
# Pydantic models
class NameList(BaseModel):
names: List[str]
class PairingResult(BaseModel):
session_name: str
names: List[str]
pairs: List[Dict[str, str]]
created_at: datetime
# Templates and static files
templates = Jinja2Templates(directory="templates")
app.mount("/static", StaticFiles(directory="static"), name="static")
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
def create_pairs(names: List[str]) -> List[Dict[str, str]]:
"""Create pairs where each person is paired with 2 others randomly."""
if len(names) < 3:
raise ValueError("Need at least 3 names to create pairs")
# For each person, we need to pair them with 2 others
# Total pairs needed = len(names) * 2 / 2 = len(names)
pairs_needed = len(names)
# Create a list of all possible pairs
all_possible_pairs = []
for i, person1 in enumerate(names):
for j, person2 in enumerate(names):
if i < j: # Avoid duplicates and self-pairing
all_possible_pairs.append((person1, person2))
# Shuffle the pairs
random.shuffle(all_possible_pairs)
# Track how many pairs each person has
person_pair_counts = {name: 0 for name in names}
selected_pairs = []
# Select pairs ensuring each person gets exactly 2 pairs
for person1, person2 in all_possible_pairs:
# Check if adding this pair would exceed 2 pairs for either person
if (person_pair_counts[person1] < 2 and
person_pair_counts[person2] < 2):
selected_pairs.append({"person1": person1, "person2": person2})
person_pair_counts[person1] += 1
person_pair_counts[person2] += 1
# If we have enough pairs, we're done
if len(selected_pairs) >= pairs_needed:
break
# If we don't have enough pairs, try to add more
if len(selected_pairs) < pairs_needed:
# Find people who still need pairs
people_needing_pairs = [name for name, count in person_pair_counts.items() if count < 2]
# Try to create additional pairs
for person1 in people_needing_pairs:
for person2 in people_needing_pairs:
if person1 != person2:
# Check if this pair already exists
pair_exists = any(
(p["person1"] == person1 and p["person2"] == person2) or
(p["person1"] == person2 and p["person2"] == person1)
for p in selected_pairs
)
if not pair_exists and person_pair_counts[person1] < 2 and person_pair_counts[person2] < 2:
selected_pairs.append({"person1": person1, "person2": person2})
person_pair_counts[person1] += 1
person_pair_counts[person2] += 1
if len(selected_pairs) >= pairs_needed:
break
if len(selected_pairs) >= pairs_needed:
break
# If we still don't have enough pairs, try a different approach
if len(selected_pairs) < pairs_needed:
# Reset and try a more aggressive approach
person_pair_counts = {name: 0 for name in names}
selected_pairs = []
# Try to create pairs more systematically
for i, person1 in enumerate(names):
if person_pair_counts[person1] < 2:
for j, person2 in enumerate(names):
if i != j and person_pair_counts[person2] < 2:
# Check if this pair already exists
pair_exists = any(
(p["person1"] == person1 and p["person2"] == person2) or
(p["person1"] == person2 and p["person2"] == person1)
for p in selected_pairs
)
if not pair_exists:
selected_pairs.append({"person1": person1, "person2": person2})
person_pair_counts[person1] += 1
person_pair_counts[person2] += 1
if len(selected_pairs) >= pairs_needed:
break
if len(selected_pairs) >= pairs_needed:
break
return selected_pairs
@app.get("/", response_class=HTMLResponse)
async def home(request: Request):
"""Serve the main page."""
return templates.TemplateResponse("index.html", {"request": request})
@app.post("/create-pairs")
async def create_pairs_endpoint(names: NameList, session_name: str = "Default Session"):
"""Create pairs from a list of names."""
if len(names.names) < 3:
raise HTTPException(status_code=400, detail="Need at least 3 names to create pairs")
# Remove duplicates and empty names
clean_names = list(set([name.strip() for name in names.names if name.strip()]))
if len(clean_names) < 3:
raise HTTPException(status_code=400, detail="Need at least 3 valid names after cleaning")
# Create pairs
pairs = create_pairs(clean_names)
# Save to database
db = SessionLocal()
try:
import json
session = PairingSession(
session_name=session_name,
names=json.dumps(clean_names),
pairs=json.dumps(pairs)
)
db.add(session)
db.commit()
db.refresh(session)
return {
"session_id": session.id,
"session_name": session_name,
"names": clean_names,
"pairs": pairs,
"created_at": session.created_at
}
finally:
db.close()
@app.get("/sessions")
async def get_sessions():
"""Get all pairing sessions."""
db = SessionLocal()
try:
sessions = db.query(PairingSession).order_by(PairingSession.created_at.desc()).all()
import json
return [
{
"id": session.id,
"session_name": session.session_name,
"names": json.loads(session.names),
"pairs": json.loads(session.pairs),
"created_at": session.created_at
}
for session in sessions
]
finally:
db.close()
@app.get("/session/{session_id}")
async def get_session(session_id: int):
"""Get a specific pairing session."""
db = SessionLocal()
try:
session = db.query(PairingSession).filter(PairingSession.id == session_id).first()
if not session:
raise HTTPException(status_code=404, detail="Session not found")
import json
return {
"id": session.id,
"session_name": session.session_name,
"names": json.loads(session.names),
"pairs": json.loads(session.pairs),
"created_at": session.created_at
}
finally:
db.close()
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)