Created
August 14, 2025 02:32
-
-
Save SiddharthShyniben/99b9c0e6e12b2fe32eea6dc2ed8414a9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# lab_blackjack.py | |
# Multiplayer "no-dealer" Blackjack over a shared E: drive. | |
# Everyone runs the same script; state is coordinated via E:\<session_id>.json | |
# [Tech: shared-file state machine, cooperative polling, Windows file lock, Fisher–Yates shuffle, | |
# blackjack evaluator (soft/hard aces), turn arbitration] | |
import json, os, sys, time, random, tempfile, getpass | |
from datetime import datetime | |
from typing import List, Dict, Any | |
# ==== CONFIG ==== | |
BASE_DIR = r"E:\\" # Shared drive root. Change if your lab maps E: differently. | |
POLL_INTERVAL = 0.7 # seconds between refreshes | |
DECKS = 6 # shoe size; can be large for "infinite" players feel | |
MAX_SPLIT_HANDS = 4 # per player | |
LOCK_FILENAME_SUFFIX = ".lock" | |
# ==== Utilities ==== | |
def now_iso(): | |
return datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
def session_paths(session_id: str): | |
safe = "".join(c for c in session_id if c.isalnum() or c in ("-", "_")) | |
json_path = os.path.join(BASE_DIR, f"{safe}.json") | |
lock_path = os.path.join(BASE_DIR, f"{safe}{LOCK_FILENAME_SUFFIX}") | |
return json_path, lock_path | |
class FileLock: | |
# [Technique: advisory file lock using exclusive open; portable enough for lab usage] | |
def __init__(self, lock_path: str): | |
self.lock_path = lock_path | |
self.fh = None | |
def acquire(self, timeout=5.0): | |
start = time.time() | |
while True: | |
try: | |
# O_EXCL ensures only one writer | |
self.fh = os.open(self.lock_path, os.O_CREAT | os.O_EXCL | os.O_RDWR) | |
return | |
except FileExistsError: | |
if time.time() - start > timeout: | |
raise TimeoutError("Could not acquire file lock.") | |
time.sleep(0.05) | |
def release(self): | |
try: | |
if self.fh is not None: | |
os.close(self.fh) | |
if os.path.exists(self.lock_path): | |
os.remove(self.lock_path) | |
finally: | |
self.fh = None | |
def atomic_write_json(path: str, data: dict): | |
# [Technique: atomic write via temp file + replace] | |
d = os.path.dirname(path) | |
fd, tmp = tempfile.mkstemp(prefix="bj_", dir=d, text=True) | |
try: | |
with os.fdopen(fd, "w", encoding="utf-8") as f: | |
json.dump(data, f, indent=2) | |
os.replace(tmp, path) | |
except Exception: | |
try: | |
os.remove(tmp) | |
except Exception: | |
pass | |
raise | |
def load_json(path: str) -> Dict[str, Any]: | |
if not os.path.exists(path): | |
return {} | |
with open(path, "r", encoding="utf-8") as f: | |
try: | |
return json.load(f) | |
except json.JSONDecodeError: | |
return {} | |
# ==== Cards & Blackjack logic ==== | |
RANKS = ["A","2","3","4","5","6","7","8","9","10","J","Q","K"] | |
SUITS = ["♠","♥","♦","♣"] # purely cosmetic | |
RANK_VALUE = {"A":1, "2":2,"3":3,"4":4,"5":5,"6":6,"7":7,"8":8,"9":9,"10":10,"J":10,"Q":10,"K":10} | |
def make_shoe(decks=DECKS) -> List[str]: | |
# [Algorithm: Fisher–Yates shuffle] | |
cards = [] | |
for _ in range(decks): | |
for r in RANKS: | |
for s in SUITS: | |
cards.append(f"{r}{s}") | |
random.shuffle(cards) | |
return cards | |
def hand_value(cards: List[str]) -> (int, bool): | |
# returns (best_total, is_soft) | |
total = 0 | |
aces = 0 | |
for c in cards: | |
r = c[:-1] if len(c) > 2 else c[0] | |
if r not in RANK_VALUE: # handle "10" which is two chars | |
r = "10" | |
v = RANK_VALUE[r] | |
total += v | |
if r == "A": | |
aces += 1 | |
# upgrade some aces from 1 to 11 where possible | |
soft = False | |
while aces > 0 and total + 10 <= 21: | |
total += 10 | |
aces -= 1 | |
soft = True | |
return total, soft | |
def is_blackjack(cards: List[str]) -> bool: | |
if len(cards) != 2: | |
return False | |
total, _ = hand_value(cards) | |
return total == 21 | |
def describe_hand(cards: List[str]) -> str: | |
total, soft = hand_value(cards) | |
tstr = f"{total}{' (soft)' if soft and total<=21 else ''}" | |
return f"[{' '.join(cards)}] = {tstr}" | |
# ==== State Machine ==== | |
def new_state(session_id: str, admin: str) -> Dict[str, Any]: | |
return { | |
"session_id": session_id, | |
"created_at": now_iso(), | |
"admin": admin, | |
"players": {admin: {"joined_at": now_iso(), "wins": 0, "present": True}}, | |
"round": { | |
"number": 0, | |
"status": "lobby", # lobby | dealing | playing | settling | |
"deck": [], | |
"order": [], | |
"hands": {}, # user -> list of hands: {cards, status, doubled, can_split} | |
"turn": {"user": None, "hand_index": 0}, | |
"messages": [] | |
}, | |
"log": [] | |
} | |
def ensure_player(state: Dict[str,Any], username: str): | |
p = state["players"] | |
if username not in p: | |
p[username] = {"joined_at": now_iso(), "wins": 0, "present": True} | |
else: | |
p[username]["present"] = True | |
def cleanup_absent_players(state: Dict[str,Any]): | |
# mark players not seen in a while? We simply keep them; admin may start anyway. | |
pass | |
def post(state, msg): | |
state["round"]["messages"].append(f"{now_iso()} {msg}") | |
# bound message log | |
if len(state["round"]["messages"]) > 150: | |
state["round"]["messages"] = state["round"]["messages"][-150:] | |
def start_round(state: Dict[str,Any]): | |
r = state["round"] | |
if r["status"] != "lobby": | |
return | |
players = [u for u,v in state["players"].items() if v.get("present")] | |
if len(players) < 2: | |
post(state, "Need at least 2 players to start.") | |
return | |
state["round"]["number"] += 1 | |
r["status"] = "dealing" | |
r["deck"] = make_shoe() | |
r["order"] = players[:] # simple fixed order | |
r["hands"] = {} | |
r["turn"] = {"user": None, "hand_index": 0} | |
for u in players: | |
r["hands"][u] = [{"cards": [], "status": "playing", "doubled": False}] | |
# initial deal: two each | |
for _ in range(2): | |
for u in players: | |
deal_card_to(state, u, 0) | |
# mark blackjacks immediately | |
for u in players: | |
h = r["hands"][u][0] | |
if is_blackjack(h["cards"]): | |
h["status"] = "blackjack" | |
r["status"] = "playing" | |
# set first turn to first player who still has a "playing" hand | |
nxt = first_pending_turn(state) | |
r["turn"] = nxt or {"user": None, "hand_index": 0} | |
post(state, f"Round {state['round']['number']} started. Turn: {r['turn']['user']}.") | |
def deal_card_to(state: Dict[str,Any], user: str, hand_index: int): | |
r = state["round"] | |
if not r["deck"]: | |
r["deck"] = make_shoe() | |
post(state, "Shoe reshuffled.") | |
card = r["deck"].pop() | |
r["hands"][user][hand_index]["cards"].append(card) | |
# auto-bust check | |
total,_ = hand_value(r["hands"][user][hand_index]["cards"]) | |
if total > 21: | |
r["hands"][user][hand_index]["status"] = "bust" | |
def can_split(hand: Dict[str,Any]) -> bool: | |
cards = hand["cards"] | |
if len(cards) != 2: | |
return False | |
r1 = cards[0][:-1] if len(cards[0])>2 else cards[0][0] | |
if r1 not in RANK_VALUE: | |
r1 = "10" | |
r2 = cards[1][:-1] if len(cards[1])>2 else cards[1][0] | |
if r2 not in RANK_VALUE: | |
r2 = "10" | |
return r1 == r2 | |
def first_pending_turn(state: Dict[str,Any]): | |
r = state["round"] | |
for u in r["order"]: | |
for i, h in enumerate(r["hands"][u]): | |
if h["status"] == "playing": | |
return {"user": u, "hand_index": i} | |
return None | |
def next_turn(state: Dict[str,Any]): | |
r = state["round"] | |
cur = r["turn"] | |
# continue scanning from current position | |
started = False | |
for u in r["order"]: | |
if not started: | |
if cur["user"] is None: | |
started = True | |
elif u == cur["user"]: | |
started = True | |
# continue within same user to later hands | |
for j in range(cur["hand_index"]+1, len(r["hands"][u])): | |
if r["hands"][u][j]["status"] == "playing": | |
return {"user": u, "hand_index": j} | |
continue | |
else: | |
continue | |
# after we've reached current user, scan forward | |
for i, h in enumerate(r["hands"][u]): | |
if h["status"] == "playing": | |
return {"user": u, "hand_index": i} | |
# wrap-around | |
for u in r["order"]: | |
for i, h in enumerate(r["hands"][u]): | |
if h["status"] == "playing": | |
return {"user": u, "hand_index": i} | |
return None | |
def settle_round(state: Dict[str,Any]): | |
r = state["round"] | |
r["status"] = "settling" | |
# Determine best for each player | |
best = {} | |
has_any_nonbust = False | |
any_blackjack = False | |
for u, hands in r["hands"].items(): | |
best_total = -1 | |
bj = False | |
for h in hands: | |
if h["status"] == "blackjack": | |
bj = True | |
best_total = 21 | |
elif h["status"] != "bust": | |
t,_ = hand_value(h["cards"]) | |
if t <= 21 and t > best_total: | |
best_total = t | |
if best_total != -1: | |
has_any_nonbust = True | |
best[u] = {"total": best_total, "blackjack": bj} | |
if bj: | |
any_blackjack = True | |
winners = [] | |
if any_blackjack: | |
winners = [u for u,info in best.items() if info["blackjack"]] | |
else: | |
if has_any_nonbust: | |
max_t = max(info["total"] for info in best.values()) | |
winners = [u for u,info in best.items() if info["total"] == max_t and info["total"] != -1] | |
else: | |
winners = [] | |
if winners: | |
for w in winners: | |
state["players"][w]["wins"] += 1 | |
post(state, f"Winners: {', '.join(winners)}") | |
else: | |
post(state, "All bust — no winners.") | |
# Back to lobby | |
r["status"] = "lobby" | |
r["turn"] = {"user": None, "hand_index": 0} | |
r["order"] = [] | |
r["deck"] = [] | |
# keep hands for viewing but they won’t be used on next start | |
# ==== I/O Loop ==== | |
def render(state: Dict[str,Any], me: str): | |
os.system('cls' if os.name == 'nt' else 'clear') | |
r = state.get("round", {}) | |
print(f"Session: {state.get('session_id','?')} Created: {state.get('created_at','?')}") | |
print(f"Admin: {state.get('admin','?')} You are: {me} Time: {now_iso()}") | |
print("-"*60) | |
# Players | |
ps = state.get("players", {}) | |
line = "Players: " + ", ".join(f"{u}({'P' if v.get('present') else 'A'})[{v.get('wins',0)}]" | |
for u,v in ps.items()) | |
print(line) | |
print("-"*60) | |
status = r.get("status","lobby") | |
print(f"Round #{r.get('number',0)} Status: {status}") | |
if status in ("dealing","playing","settling"): | |
for u, hands in r.get("hands",{}).items(): | |
rows = [] | |
for idx,h in enumerate(hands): | |
mark = "" | |
if r["turn"]["user"] == u and r["turn"]["hand_index"] == idx and status=="playing": | |
mark = " <= TURN" | |
print(f" {u} Hand#{idx+1}: {describe_hand(h['cards'])} [{h['status']}] {mark}") | |
print("-"*60) | |
# Messages | |
for msg in r.get("messages", [])[-8:]: | |
print(msg) | |
print("-"*60) | |
def prompt(me: str, state: Dict[str,Any]) -> str: | |
r = state["round"] | |
status = r["status"] | |
is_admin = state.get("admin") == me | |
if status == "lobby": | |
if is_admin: | |
return input("[A]dd note [S]tart round [Q]uit > ").strip().lower() | |
else: | |
return input("[A]dd note [Q]uit (waiting for admin to start) > ").strip().lower() | |
elif status == "playing": | |
turn = r["turn"] | |
if turn["user"] == me: | |
uhands = r["hands"][me] | |
h = uhands[turn["hand_index"]] | |
moves = ["[H]it","[T]and"] # stand = 't' to avoid clash with start | |
can_double = (len(h["cards"]) == 2 and not h["doubled"]) | |
can_spl = (len(uhands) < MAX_SPLIT_HANDS and can_split(h)) | |
if can_double: moves.append("[D]ouble") | |
if can_spl: moves.append("S[p]lit") | |
mv = input(f"{' '.join(moves)} [A]dd note [Q]uit > ").strip().lower() | |
return mv | |
else: | |
return input("[A]dd note [Q]uit (waiting for others) > ").strip().lower() | |
else: | |
return input("[A]dd note [Q]uit > ").strip().lower() | |
def apply_action(me: str, state: Dict[str,Any], action: str): | |
r = state["round"] | |
status = r["status"] | |
if action == "q": | |
# mark presence false; don’t delete the player | |
if me in state["players"]: | |
state["players"][me]["present"] = False | |
post(state, f"{me} left (can rejoin).") | |
return "quit" | |
if action == "a": | |
note = input("Note: ").strip() | |
if note: | |
post(state, f"{me}: {note}") | |
return "ok" | |
if status == "lobby": | |
if action == "s": | |
if state.get("admin") == me: | |
start_round(state) | |
else: | |
post(state, f"{me} tried to start but is not admin.") | |
return "ok" | |
if status == "playing": | |
turn = r["turn"] | |
if turn["user"] != me: | |
return "ok" | |
h = r["hands"][me][turn["hand_index"]] | |
if action == "h": | |
deal_card_to(state, me, turn["hand_index"]) | |
# if bust/21, auto-advance | |
if h["status"] == "bust": | |
post(state, f"{me} busts.") | |
nxt = next_turn(state) | |
if nxt: r["turn"] = nxt | |
else: settle_round(state) | |
else: | |
t,_ = hand_value(h["cards"]) | |
if t == 21: | |
h["status"] = "stand" | |
post(state, f"{me} stands on 21.") | |
nxt = next_turn(state) | |
if nxt: r["turn"] = nxt | |
else: settle_round(state) | |
elif action == "t": | |
h["status"] = "stand" | |
post(state, f"{me} stands.") | |
nxt = next_turn(state) | |
if nxt: r["turn"] = nxt | |
else: settle_round(state) | |
elif action == "d": | |
if len(h["cards"]) == 2 and not h["doubled"]: | |
h["doubled"] = True | |
deal_card_to(state, me, turn["hand_index"]) | |
# double forces stand after one card | |
if h["status"] != "bust": | |
h["status"] = "stand" | |
post(state, f"{me} doubles and stands.") | |
else: | |
post(state, f"{me} doubles and busts.") | |
nxt = next_turn(state) | |
if nxt: r["turn"] = nxt | |
else: settle_round(state) | |
else: | |
post(state, f"{me} tried to double but cannot.") | |
elif action == "p": | |
# split | |
uhands = r["hands"][me] | |
if len(uhands) < MAX_SPLIT_HANDS and can_split(h) and len(h["cards"])==2: | |
c1 = h["cards"][0] | |
c2 = h["cards"][1] | |
# replace current with single-card hand c1, and add new hand with c2 | |
h["cards"] = [c1] | |
h["status"] = "playing" | |
h["doubled"] = False | |
newh = {"cards":[c2], "status":"playing", "doubled":False} | |
uhands.insert(turn["hand_index"]+1, newh) | |
# deal one extra card to each split hand | |
deal_card_to(state, me, turn["hand_index"]) | |
deal_card_to(state, me, turn["hand_index"]+1) | |
post(state, f"{me} splits.") | |
# current turn remains on first split hand (now possibly bust/21) | |
cur = r["hands"][me][turn["hand_index"]] | |
total,_ = hand_value(cur["cards"]) | |
if total > 21 or total == 21: | |
# auto-advance within player | |
cur["status"] = "bust" if total>21 else "stand" | |
nxt = next_turn(state) | |
if nxt: r["turn"] = nxt | |
else: settle_round(state) | |
else: | |
post(state, f"{me} tried to split but cannot.") | |
else: | |
pass | |
# if no more playing hands, settle | |
if r["status"] == "playing" and first_pending_turn(state) is None: | |
settle_round(state) | |
return "ok" | |
return "ok" | |
def main(): | |
print("=== Lab Blackjack (No Dealer) ===") | |
if not os.path.isdir(BASE_DIR): | |
print(f"Shared drive not found: {BASE_DIR}") | |
sys.exit(1) | |
username = input("Enter username: ").strip() | |
if not username: | |
print("Username required.") | |
sys.exit(1) | |
session_id = input("Enter session id: ").strip() | |
if not session_id: | |
print("Session id required.") | |
sys.exit(1) | |
json_path, lock_path = session_paths(session_id) | |
random.seed(f"{username}-{time.time()}") # per-client randomness | |
# Join or create | |
created = False | |
if not os.path.exists(json_path): | |
# create new session | |
st = new_state(session_id, username) | |
lock = FileLock(lock_path) | |
try: | |
lock.acquire() | |
atomic_write_json(json_path, st) | |
created = True | |
finally: | |
lock.release() | |
# Main loop | |
last_snapshot = None | |
quit_flag = False | |
while not quit_flag: | |
# read | |
st = load_json(json_path) | |
if not st: | |
# Session file might have been deleted; recreate as admin if needed | |
st = new_state(session_id, username) | |
# ensure I am present | |
ensure_player(st, username) | |
# write presence back (heartbeat) | |
lock = FileLock(lock_path) | |
try: | |
lock.acquire() | |
atomic_write_json(json_path, st) | |
finally: | |
lock.release() | |
# render | |
render(st, username) | |
if created: | |
print("(You created this session. You are admin.)") | |
created = False | |
# prompt and act (with lock) | |
act = prompt(username, st) | |
lock = FileLock(lock_path) | |
try: | |
lock.acquire() | |
st2 = load_json(json_path) or st | |
ensure_player(st2, username) | |
res = apply_action(username, st2, act) | |
atomic_write_json(json_path, st2) | |
finally: | |
lock.release() | |
if res == "quit": | |
print("Goodbye.") | |
break | |
time.sleep(POLL_INTERVAL) | |
if __name__ == "__main__": | |
try: | |
main() | |
except TimeoutError as e: | |
print("Lock timeout:", e) | |
except KeyboardInterrupt: | |
print("\nExiting.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment