Files
myice/myice/myice.py
Rene Luria 8ae1c33b3a chore: migrate to python 3.13 and update dependencies
Migrate from Python 3.11 to 3.13 with updated dependencies. Switch from PyPDF2 to pypdf library for better PDF processing. Add new U14 age groups and extract-pdf utility script.
2025-09-29 23:05:48 +02:00

719 lines
23 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Tool to work with My Ice Hockey schedules
"""
import configparser
import datetime
import json
import os
import re
import sys
import tempfile
from enum import Enum
from pathlib import Path
from typing import Annotated
from typing import List, Tuple
import pypdf
import requests
import typer
from rich import print
from rl_ai_tools import utils # type: ignore
user_agent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:131.0) Gecko/20100101 Firefox/131.0"
def sanitize_json_response(text):
"""Sanitize JSON response from MyIce API using our proven fix_schedule_json.py approach"""
try:
# First, try to parse as-is
data = json.loads(text)
return json.dumps(data, ensure_ascii=False, indent=2, separators=(",", ": "))
except json.JSONDecodeError:
# Use the exact same approach as our working fix_schedule_json.py script
# but implemented directly in code
# Split into lines to process
lines = text.split("\n")
# Fix line issues (remove line numbers if present)
fixed_lines = []
for line in lines:
# Remove line numbers prefix if present (from cat -n format)
line = re.sub(r"^\s*\d+\|\s*", "", line)
fixed_lines.append(line)
# Join all lines back together
content = "".join(fixed_lines)
# Apply comprehensive sanitization that we know works
# 1. Escape literal newlines, carriage returns, and tabs everywhere
content = content.replace("\n", "\\n")
content = content.replace("\r", "\\r")
content = content.replace("\t", "\\t")
# 2. Find the main JSON array structure
start = content.find("[")
end = content.rfind("]")
if start != -1 and end != -1 and end > start:
array_content = content[start : end + 1]
else:
# Fallback: try to reconstruct a valid array
array_content = "[" + content + "]"
# 3. Fix common JSON formatting issues
# Remove trailing commas before closing brackets/braces
array_content = re.sub(r",(\s*[}\]])", r"\1", array_content)
# 4. Try to parse the sanitized content
try:
data = json.loads(array_content)
return json.dumps(
data, ensure_ascii=False, indent=2, separators=(",", ": ")
)
except json.JSONDecodeError:
# If parsing still fails, try one more aggressive approach
# Remove any remaining control characters that might be causing issues
# This is the key fix - we need to escape control characters rather than remove them
def escape_control_chars(match):
char = match.group(0)
# Handle specific control characters that might cause issues
if char == "\n":
return "\\n"
elif char == "\r":
return "\\r"
elif char == "\t":
return "\\t"
else:
# For other control characters, use Unicode escape
return f"\\u{ord(char):04x}"
# More aggressive approach: escape ALL control characters
# This handles the case where line breaks occur within JSON string values
# We need to escape them before they break the JSON structure
# First, let's try to detect if there are line breaks within quoted strings
# and escape them properly
# Simple approach: escape all control characters
array_content = re.sub(r"[\x00-\x1f]", escape_control_chars, array_content)
try:
data = json.loads(array_content)
return json.dumps(
data, ensure_ascii=False, indent=2, separators=(",", ": ")
)
except json.JSONDecodeError:
# Final fallback - return a minimal valid JSON array
return json.dumps(
[], ensure_ascii=False, indent=2, separators=(",", ": ")
)
app = typer.Typer(no_args_is_help=True)
session: requests.Session
userid: int
global_config_section: str = "default"
# Add global option for config section
@app.callback()
def main(
config_section: Annotated[
str,
typer.Option(
"--config-section", "-c", help="Configuration section to use from INI file"
),
] = "default",
):
"""My Ice Hockey schedule tool"""
# Store the config_section in a global variable so it can be accessed by commands
global global_config_section
global_config_section = config_section
class AgeGroup(str, Enum):
u111 = "U11 (1)"
u13e = "U13 (Elite)"
u13t = "U13 (Top)"
u13a = "U13 (A)"
u13p = "U13 (Prép)"
u14ev = "U14 (Elite Vernets)"
u14esmv = "U14 (Elite Sous-Moulin/Vergers)"
u15t = "U15 (Top)"
u15a = "U15 (A)"
u16e = "U16 (Elite)"
u16t = "U16 (Top GS Ass)"
u18e = "U18 (Elite)"
u18el = "U18 (Elit)"
u21e = "U21 (ELIT)"
u14t = "U14 (Top)"
u14t1 = "U14 (Top1)"
u14t2 = "U14 (Top2)"
def normalize_age_group(value: str) -> AgeGroup | None:
"""Normalize age group string to handle case and spelling variations."""
import re
if not isinstance(value, str):
return None
def normalize_spelling(text: str) -> str:
"""Normalize spelling variations of 'elit' to 'elite'."""
# Replace accented versions first
text = text.replace("élit", "elite")
# Use regex to replace "elit" with "elite" only when it's a complete word
# This avoids replacing "elit" within "elite"
text = re.sub(r"\belit\b", "elite", text)
return text
# Convert to lowercase for case-insensitive comparison
input_lower = value.lower()
input_normalized = normalize_spelling(input_lower)
for member in AgeGroup:
# Convert enum value to lowercase and normalize
member_lower = member.value.lower()
member_normalized = normalize_spelling(member_lower)
# Check for match
if member_normalized == input_normalized:
return member
return None
class EventType(str, Enum):
game = "game"
practice = "practice"
def load_cookies(file: str = "cookies.txt") -> requests.cookies.RequestsCookieJar:
cookie_jar_file = Path(file)
cj_dict = {}
if cookie_jar_file.exists():
with cookie_jar_file.open("rb") as f:
cj_dict = json.load(f)
return requests.cookies.cookiejar_from_dict(cj_dict)
def save_cookies(file: str = "cookies.txt"):
cookie_jar_file = Path(file)
with cookie_jar_file.open("w") as f:
f.write(json.dumps(requests.utils.dict_from_cookiejar(session.cookies)))
def get_login(
local_file: str = "myice.ini",
config_section: str = "default",
) -> tuple[str, str, int | None, str | None, int | None]:
config = configparser.ConfigParser()
config.read(
[
Path("~/.config/myice.ini").expanduser(),
"myice.ini",
local_file,
]
)
if config_section in config.sections():
selected_config = config[config_section]
elif "default" in config.sections():
# Fallback to default section if specified section doesn't exist
selected_config = config["default"]
print(
f"Warning: Section '{config_section}' not found, using 'default' section",
file=sys.stderr,
)
else:
print("Error: please configure username/password in ini file", file=sys.stderr)
sys.exit(1)
username = selected_config.get("username")
password = selected_config.get("password")
userid = selected_config.getint("userid", fallback=None)
token = selected_config.get("token", fallback=None)
club_id = selected_config.getint("club_id", fallback=None)
if not username or not password:
print("Error: please configure username/password in ini file", file=sys.stderr)
sys.exit(1)
return username, password, userid, token, club_id
def select_club(club_id: int = 172):
"""Select a club by ID after login."""
global session
r = session.get(
f"https://app.myice.hockey/?cl={club_id}", headers={"User-Agent": user_agent}
)
r.raise_for_status()
def do_login(config_section: str | None = None):
global session
global userid
global global_config_section
# Use provided config_section, or fall back to global one
section = config_section if config_section is not None else global_config_section
username, password, userid_tmp, token, club_id = get_login(config_section=section)
if userid_tmp is not None:
userid = userid_tmp
r = session.get("https://app.myice.hockey/", headers={"User-Agent": user_agent})
r.raise_for_status()
form_data = {
"login_email": username,
"login_password": password,
"sublogin": 1,
"login_submit": "",
}
r = session.post(
"https://app.myice.hockey/classes/process.php",
data=form_data,
headers={
"Referer": "https://app.myice.hockey/",
"Origin": "https://app.myice.hockey",
"User-Agent": user_agent,
},
)
r.raise_for_status()
# select the club we want
if club_id:
select_club(club_id)
def get_userid():
global session
r = session.get(
"https://app.myice.hockey/players/clubschedule/",
headers={
"User-Agent": user_agent,
"Referer": "https://app.myice.hockey/classes/process.php",
},
)
r.raise_for_status()
for line in r.text.splitlines():
m = re.search(r"^\s+userid: '(?P<userid>[0-9]+)'", line)
if m:
userid = m.group("userid")
break
return userid
def wrapper_session(func):
def wrapper(*args, **kwargs):
global session, userid, global_config_section
# Use the global config_section
config_section = global_config_section
session = requests.Session()
# session.verify = False
session.cookies = load_cookies()
session.cookies.clear_expired_cookies()
if not session.cookies.get("mih_v3_cookname"):
print("login...", file=sys.stderr)
do_login(config_section=None) # Use global config_section
save_cookies()
_, _, userid, _, _ = get_login(config_section=config_section)
if not userid:
print("get userid...", file=sys.stderr)
userid = get_userid()
print(f"{userid=}", file=sys.stderr)
return func(*args, **kwargs)
return wrapper
@wrapper_session
def get_schedule(num_days: int) -> str:
global session
global userid
assert session and userid
now = datetime.datetime.now()
date_start = now
date_end = date_start + datetime.timedelta(days=num_days)
r = session.post(
"https://app.myice.hockey/inc/processclubplanning.php",
data={
"type": "fetchmy",
"userid": userid,
"type_location": ["*"],
"start": date_start.strftime("%Y-%m-%d"),
"end": date_end.strftime("%Y-%m-%d"),
},
headers={
"User-Agent": user_agent,
"Referer": "https://app.myice.hockey/players/clubschedule/",
"Accept": "application/json, text/javascript, */*; q=0.01",
"X-Requested-With": "XMLHttpRequest",
},
)
r.raise_for_status()
# Debug: Save raw response to file for analysis
# with open("raw_response.txt", "w") as f:
# f.write(r.text)
return json.loads(sanitize_json_response(r.text))
@wrapper_session
def game_pdf(gameid: int, outfile: Path):
global session, userid
assert session and userid
r = session.get(
f"https://app.myice.hockey/pdfexports/playerplatgamepdfgenerator.php?id={gameid}",
headers={
"User-Agent": user_agent,
"Referer": "https://app.myice.hockey/inc/processclubplanning.php",
},
)
r.raise_for_status()
with outfile.open("wb") as fd:
for chunk in r.iter_content(chunk_size=1024 * 1024 * 4):
fd.write(chunk)
@wrapper_session
def practice_pdf(gameid: int, outfile: Path):
global session, userid
assert session and userid
r = session.get(
f"https://app.myice.hockey/pdfexports/playerplatpracticepdf.php?practice={gameid}",
headers={
"User-Agent": user_agent,
"Referer": "https://app.myice.hockey/inc/processclubplanning.php",
},
)
r.raise_for_status()
with outfile.open("wb") as fd:
for chunk in r.iter_content(chunk_size=1024 * 1024 * 4):
fd.write(chunk)
@app.command()
def schedule(
outfile: Annotated[
Path | None,
typer.Option(
"--outfile", "-o", help="file to write result to, or stdout if none"
),
] = None,
num_days: Annotated[int, typer.Option("--days")] = 7,
):
"""
Fetch schedule as json
"""
global global_config_section
schedule = get_schedule(num_days)
if outfile:
with outfile.open("w") as f:
f.write(json.dumps(schedule))
else:
import builtins
builtins.print(json.dumps(schedule, indent=2))
def os_open(file: str) -> None:
if os.uname().sysname == "Linux":
os.system(f"xdg-open {file}")
else:
print(f"Opening file {file}", file=sys.stderr)
os.system(f"open {file}")
def extract_players(pdf_file: Path) -> List[str]:
reader = pypdf.PdfReader(pdf_file)
page = reader.pages[0]
players = []
def visitor_body(text, cm, tm, fontDict, fontSize):
global last_text
if text:
last_text = text
(x, y) = (tm[4], tm[5])
if x > 79 and x < 80 and y < 741.93:
# and y < 741.93 and y > 741.93 - 585.18:
players.append(last_text.strip())
page.extract_text(visitor_text=visitor_body, extraction_mode="plain")
return players
@app.command("game")
def get_game_pdf(
game_id: Annotated[int, typer.Argument(help="ID of game to gen pdf for")],
open_file: Annotated[bool, typer.Option("--open", "-o")] = False,
):
"""
Genate the pdf for the game invitation
"""
global global_config_section
if open_file:
output_filename = f"game_{game_id}.pdf"
else:
output_filename = tempfile.NamedTemporaryFile().name
game_pdf(game_id, Path(output_filename))
if open_file:
os_open(output_filename)
else:
players = extract_players(Path(output_filename))
print("Players:")
print("\n".join(players))
@app.command("practice")
def get_practice_pdf(
game_id: Annotated[int, typer.Argument(help="ID of practice to gen pdf for")],
):
"""
Genate the pdf for the practice invitation
"""
global global_config_section
output_filename = f"practice_{game_id}.pdf"
practice_pdf(game_id, Path(output_filename))
os_open(output_filename)
@app.command("search")
def parse_schedule(
age_group: Annotated[AgeGroup | None, typer.Option()] = None,
event_type_filter: Annotated[
EventType | None,
typer.Option("--type", help="Only display events of this type"),
] = None,
schedule_file: Annotated[
Path, typer.Option(help="schedule json file to parse")
] = Path("schedule.json"),
):
"""
Parse schedule.json to look for specific games or practices
"""
global global_config_section
try:
with schedule_file.open("r") as f:
data = json.load(f)
except json.JSONDecodeError:
# If JSON is malformed, try to sanitize it first
with schedule_file.open("r") as f:
content = f.read()
sanitized_content = sanitize_json_response(content)
data = json.loads(sanitized_content)
# age_group filter
if age_group:
events = [x for x in data if normalize_age_group(x["agegroup"]) == age_group]
else:
events = [x for x in data if normalize_age_group(x["agegroup"]) is not None]
# event_type filter
if event_type_filter:
if event_type_filter.value == EventType.game:
events = [x for x in events if "event" in x and x["event"] == "Jeu"]
else:
events = [x for x in events if "event" not in x or x["event"] == "Jeu"]
for event in events:
if age_group:
raw_title = event["title"].removeprefix(age_group + "\n")
else:
raw_title = event["title"]
title = " ".join(raw_title.split("\n"))
start = datetime.datetime.fromisoformat(event["start"])
start_fmt = start.strftime("%H:%M")
end = datetime.datetime.fromisoformat(event["end"])
end_fmt = end.strftime("%H:%M")
if "event" in event and event["event"] == "Jeu":
event_type = "game"
else:
event_type = "practice"
print(
f"[{event['id_event']}] {event['date']} {event_type} {start_fmt}-> {end_fmt}"
)
print(f"{event_type}: {title}\n")
@app.command("ai")
def check_with_ai(
schedule_file: Annotated[
Path, typer.Option(help="schedule json file to parse")
] = Path("schedule.json"),
):
"""
Search through the schedule with natural language using Infomaniak LLM API
"""
global global_config_section
if not utils.init():
sys.exit(1)
with schedule_file.open("r") as f:
schedule_data = json.load(f)
schedule_data = [
x for x in schedule_data if normalize_age_group(x["agegroup"]) is not None
]
for event in schedule_data:
event["team"] = event["agegroup"].replace("(", "").replace(")", "")
del event["agegroup"]
when = datetime.datetime.now().strftime("%d-%m-%Y et il est %H:%M")
system = "\n".join(
[
"Tu es une IA connaissant bien les données suivantes, qui décrivent les match et entraînements d'équipes de hockey sur glace.",
f"aujourd'hui, nous sommes le {when}"
"attention: ce qu'il y a entre parenthèse après la catégorie est une catégorie à part entière, example, u13 a = U13 (A) et ça correspond aux agegroup",
"assure-toi de ne pas confondre les catégories d'age dans tes réponses. ne donne pas une réponse pour la mauvaise équipe",
"ne confond pas top, elite, a, prép",
"```json",
json.dumps(schedule_data),
"```",
],
)
history: List[Tuple[str, str]] = []
while True:
try:
question = input("> ")
except EOFError:
break
answer = utils.llm_inference(
question, history, system=system, model="mixtral8x22b"
)
print("<", answer)
history.append((question, answer))
mobile_headers = {
"Accept": "application/json, text/plain, */*",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148",
"Sec-Fetch-Site": "cross-site",
"Accept-Language": "fr-FR,fr;q=0.9",
"Sec-Fetch-Mode": "cors",
"Origin": "null",
"Sec-Fetch-Dest": "empty",
"Content-Type": "application/x-www-form-urlencoded",
}
def mobile_login(config_section: str | None = None):
global global_config_section
import base64
# Use provided config_section, or fall back to global one
section = config_section if config_section is not None else global_config_section
username, password, _, token, club_id = get_login(config_section=section)
if token and club_id:
return {"id": 0, "token": token, "id_club": club_id}
print("Requesting token", file=sys.stderr)
with requests.post(
"https://app.myice.hockey/api/mobilerest/login",
headers=mobile_headers,
data=f"login_email={base64.b64encode(username.encode()).decode()}&login_password={base64.b64encode(password.encode()).decode()}&language=FR&v=2",
# verify=False,
) as r:
r.raise_for_status()
return {
"id": 0,
"token": r.json()["userinfo"]["token"],
"id_club": club_id
or r.json()["userinfo"]["id_club"], # Use configured club_id if available
}
userdata = {
"id": 0,
"id_club": 0,
"token": "",
}
def refresh_data():
with requests.post(
"https://app.myice.hockey/api/mobilerest/refreshdata",
headers=mobile_headers,
data=f"token={userdata['token']}&id_club={userdata['id_club']}&language=FR",
# verify=False,
) as r:
r.raise_for_status()
# Since the API returns valid JSON, we don't need to sanitize it
# Just parse it directly
try:
return r.json()
except json.JSONDecodeError:
# If direct parsing fails, try with sanitization
sanitized = sanitize_json_response(r.text)
try:
return json.loads(sanitized)
except json.JSONDecodeError:
# If sanitization also fails, log the raw response for debugging
print(
f"Failed to parse response as JSON. Raw response: {r.text[:500]}..."
)
# Return an empty dict to avoid breaking the API
return {}
@app.command("mobile-login")
def do_mobile_login():
global userdata, global_config_section
userdata = mobile_login(config_section=global_config_section)
print(json.dumps(userdata, indent=2))
@app.command("mobile")
def mobile():
global userdata, global_config_section
userdata = mobile_login(config_section=global_config_section)
games = [x for x in refresh_data().get("club_games")]
# Use built-in print to avoid rich formatting issues
import builtins
builtins.print(json.dumps(games, indent=2, ensure_ascii=False))
@app.command("mobile-game")
def mobile_game(
game_id: Annotated[int, typer.Argument(help="game id")],
raw: Annotated[bool, typer.Option(help="display raw output")] = False,
):
global userdata, global_config_section
userdata = mobile_login(config_section=global_config_section)
with requests.post(
"https://app.myice.hockey/api/mobilerest/getevent",
headers=mobile_headers,
data="&".join(
[
f"token={userdata['token']}",
f"id_event={game_id}",
"type=games",
f"id_player={userdata['id']}",
f"id_club={userdata['id_club']}",
"language=FR",
]
),
# verify=False,
) as r:
data = json.loads(sanitize_json_response(r.text))["eventData"]
players = data["convocation"]["available"]
if raw:
print(data)
print(
f"Game {game_id}: {data['title']} ({data['type']}, {data['is_away']}) at {data['time_start']}"
)
print(f"{len(players)} players")
for player in sorted(
players,
key=lambda x: (
x["position"] if x["position"] else "none",
x["dob"],
),
):
print(
f"[{player['position']}] {player['fname']} {player['lname']} ({player['dob']})"
)
if __name__ == "__main__":
app()