245 lines
8.6 KiB
Python
245 lines
8.6 KiB
Python
from fastapi import FastAPI, Request, Form, HTTPException
|
|
from fastapi.responses import HTMLResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.templating import Jinja2Templates
|
|
from sqlalchemy import create_engine, Column, Integer, String, DateTime
|
|
from sqlalchemy.ext.declarative import declarative_base
|
|
from sqlalchemy.orm import sessionmaker, Session
|
|
from sqlalchemy.sql import func
|
|
from pydantic import BaseModel
|
|
import random
|
|
from typing import List, Dict, Any
|
|
from datetime import datetime
|
|
import os
|
|
|
|
# Create FastAPI app
|
|
app = FastAPI(title="Vibe Praying", version="1.0.0")
|
|
|
|
# Database setup
|
|
SQLALCHEMY_DATABASE_URL = "sqlite:///./prayer_pairs.db"
|
|
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
|
|
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
|
Base = declarative_base()
|
|
|
|
# Database models
|
|
class PairingSession(Base):
|
|
__tablename__ = "pairing_sessions"
|
|
|
|
id = Column(Integer, primary_key=True, index=True)
|
|
session_name = Column(String, index=True)
|
|
created_at = Column(DateTime, default=func.now())
|
|
names = Column(String) # JSON string of names
|
|
pairs = Column(String) # JSON string of pairs
|
|
|
|
class Pair(Base):
|
|
__tablename__ = "pairs"
|
|
|
|
id = Column(Integer, primary_key=True, index=True)
|
|
session_id = Column(Integer, index=True)
|
|
person1 = Column(String)
|
|
person2 = Column(String)
|
|
created_at = Column(DateTime, default=func.now())
|
|
|
|
# Create tables
|
|
Base.metadata.create_all(bind=engine)
|
|
|
|
# Pydantic models
|
|
class NameList(BaseModel):
|
|
names: List[str]
|
|
|
|
class PairingResult(BaseModel):
|
|
session_name: str
|
|
names: List[str]
|
|
pairs: List[Dict[str, str]]
|
|
created_at: datetime
|
|
|
|
# Templates and static files
|
|
templates = Jinja2Templates(directory="templates")
|
|
app.mount("/static", StaticFiles(directory="static"), name="static")
|
|
|
|
def get_db():
|
|
db = SessionLocal()
|
|
try:
|
|
yield db
|
|
finally:
|
|
db.close()
|
|
|
|
def create_pairs(names: List[str]) -> List[Dict[str, str]]:
|
|
"""Create pairs where each person is paired with 2 others randomly."""
|
|
if len(names) < 3:
|
|
raise ValueError("Need at least 3 names to create pairs")
|
|
|
|
# For each person, we need to pair them with 2 others
|
|
# Total pairs needed = len(names) * 2 / 2 = len(names)
|
|
pairs_needed = len(names)
|
|
|
|
# Create a list of all possible pairs
|
|
all_possible_pairs = []
|
|
for i, person1 in enumerate(names):
|
|
for j, person2 in enumerate(names):
|
|
if i < j: # Avoid duplicates and self-pairing
|
|
all_possible_pairs.append((person1, person2))
|
|
|
|
# Shuffle the pairs
|
|
random.shuffle(all_possible_pairs)
|
|
|
|
# Track how many pairs each person has
|
|
person_pair_counts = {name: 0 for name in names}
|
|
selected_pairs = []
|
|
|
|
# Select pairs ensuring each person gets exactly 2 pairs
|
|
for person1, person2 in all_possible_pairs:
|
|
# Check if adding this pair would exceed 2 pairs for either person
|
|
if (person_pair_counts[person1] < 2 and
|
|
person_pair_counts[person2] < 2):
|
|
|
|
selected_pairs.append({"person1": person1, "person2": person2})
|
|
person_pair_counts[person1] += 1
|
|
person_pair_counts[person2] += 1
|
|
|
|
# If we have enough pairs, we're done
|
|
if len(selected_pairs) >= pairs_needed:
|
|
break
|
|
|
|
# If we don't have enough pairs, try to add more
|
|
if len(selected_pairs) < pairs_needed:
|
|
# Find people who still need pairs
|
|
people_needing_pairs = [name for name, count in person_pair_counts.items() if count < 2]
|
|
|
|
# Try to create additional pairs
|
|
for person1 in people_needing_pairs:
|
|
for person2 in people_needing_pairs:
|
|
if person1 != person2:
|
|
# Check if this pair already exists
|
|
pair_exists = any(
|
|
(p["person1"] == person1 and p["person2"] == person2) or
|
|
(p["person1"] == person2 and p["person2"] == person1)
|
|
for p in selected_pairs
|
|
)
|
|
|
|
if not pair_exists and person_pair_counts[person1] < 2 and person_pair_counts[person2] < 2:
|
|
selected_pairs.append({"person1": person1, "person2": person2})
|
|
person_pair_counts[person1] += 1
|
|
person_pair_counts[person2] += 1
|
|
|
|
if len(selected_pairs) >= pairs_needed:
|
|
break
|
|
if len(selected_pairs) >= pairs_needed:
|
|
break
|
|
|
|
# If we still don't have enough pairs, try a different approach
|
|
if len(selected_pairs) < pairs_needed:
|
|
# Reset and try a more aggressive approach
|
|
person_pair_counts = {name: 0 for name in names}
|
|
selected_pairs = []
|
|
|
|
# Try to create pairs more systematically
|
|
for i, person1 in enumerate(names):
|
|
if person_pair_counts[person1] < 2:
|
|
for j, person2 in enumerate(names):
|
|
if i != j and person_pair_counts[person2] < 2:
|
|
# Check if this pair already exists
|
|
pair_exists = any(
|
|
(p["person1"] == person1 and p["person2"] == person2) or
|
|
(p["person1"] == person2 and p["person2"] == person1)
|
|
for p in selected_pairs
|
|
)
|
|
|
|
if not pair_exists:
|
|
selected_pairs.append({"person1": person1, "person2": person2})
|
|
person_pair_counts[person1] += 1
|
|
person_pair_counts[person2] += 1
|
|
|
|
if len(selected_pairs) >= pairs_needed:
|
|
break
|
|
if len(selected_pairs) >= pairs_needed:
|
|
break
|
|
|
|
return selected_pairs
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
async def home(request: Request):
|
|
"""Serve the main page."""
|
|
return templates.TemplateResponse("index.html", {"request": request})
|
|
|
|
@app.post("/create-pairs")
|
|
async def create_pairs_endpoint(names: NameList, session_name: str = "Default Session"):
|
|
"""Create pairs from a list of names."""
|
|
if len(names.names) < 3:
|
|
raise HTTPException(status_code=400, detail="Need at least 3 names to create pairs")
|
|
|
|
# Remove duplicates and empty names
|
|
clean_names = list(set([name.strip() for name in names.names if name.strip()]))
|
|
|
|
if len(clean_names) < 3:
|
|
raise HTTPException(status_code=400, detail="Need at least 3 valid names after cleaning")
|
|
|
|
# Create pairs
|
|
pairs = create_pairs(clean_names)
|
|
|
|
# Save to database
|
|
db = SessionLocal()
|
|
try:
|
|
import json
|
|
session = PairingSession(
|
|
session_name=session_name,
|
|
names=json.dumps(clean_names),
|
|
pairs=json.dumps(pairs)
|
|
)
|
|
db.add(session)
|
|
db.commit()
|
|
db.refresh(session)
|
|
|
|
return {
|
|
"session_id": session.id,
|
|
"session_name": session_name,
|
|
"names": clean_names,
|
|
"pairs": pairs,
|
|
"created_at": session.created_at
|
|
}
|
|
finally:
|
|
db.close()
|
|
|
|
@app.get("/sessions")
|
|
async def get_sessions():
|
|
"""Get all pairing sessions."""
|
|
db = SessionLocal()
|
|
try:
|
|
sessions = db.query(PairingSession).order_by(PairingSession.created_at.desc()).all()
|
|
import json
|
|
return [
|
|
{
|
|
"id": session.id,
|
|
"session_name": session.session_name,
|
|
"names": json.loads(session.names),
|
|
"pairs": json.loads(session.pairs),
|
|
"created_at": session.created_at
|
|
}
|
|
for session in sessions
|
|
]
|
|
finally:
|
|
db.close()
|
|
|
|
@app.get("/session/{session_id}")
|
|
async def get_session(session_id: int):
|
|
"""Get a specific pairing session."""
|
|
db = SessionLocal()
|
|
try:
|
|
session = db.query(PairingSession).filter(PairingSession.id == session_id).first()
|
|
if not session:
|
|
raise HTTPException(status_code=404, detail="Session not found")
|
|
|
|
import json
|
|
return {
|
|
"id": session.id,
|
|
"session_name": session.session_name,
|
|
"names": json.loads(session.names),
|
|
"pairs": json.loads(session.pairs),
|
|
"created_at": session.created_at
|
|
}
|
|
finally:
|
|
db.close()
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8000) |