#!/usr/bin/env python3 """ Tool to work with My Ice Hockey schedules """ import configparser import datetime import json import os import re import sys import tempfile from enum import Enum from pathlib import Path from typing import Annotated from typing import List, Tuple import PyPDF2 import requests import typer from rich import print from rl_ai_tools import utils # type: ignore user_agent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:131.0) Gecko/20100101 Firefox/131.0" def sanitize_json_response(text): """Sanitize JSON response from MyIce API using our proven fix_schedule_json.py approach""" try: # First, try to parse as-is data = json.loads(text) return json.dumps(data, ensure_ascii=False, indent=2, separators=(",", ": ")) except json.JSONDecodeError: # Use the exact same approach as our working fix_schedule_json.py script # but implemented directly in code # Split into lines to process lines = text.split("\n") # Fix line issues (remove line numbers if present) fixed_lines = [] for line in lines: # Remove line numbers prefix if present (from cat -n format) line = re.sub(r"^\s*\d+\|\s*", "", line) fixed_lines.append(line) # Join all lines back together content = "".join(fixed_lines) # Apply comprehensive sanitization that we know works # 1. Escape literal newlines, carriage returns, and tabs everywhere content = content.replace("\n", "\\n") content = content.replace("\r", "\\r") content = content.replace("\t", "\\t") # 2. Find the main JSON array structure start = content.find("[") end = content.rfind("]") if start != -1 and end != -1 and end > start: array_content = content[start : end + 1] else: # Fallback: try to reconstruct a valid array array_content = "[" + content + "]" # 3. Fix common JSON formatting issues # Remove trailing commas before closing brackets/braces array_content = re.sub(r",(\s*[}\]])", r"\1", array_content) # 4. Try to parse the sanitized content try: data = json.loads(array_content) return json.dumps( data, ensure_ascii=False, indent=2, separators=(",", ": ") ) except json.JSONDecodeError: # If parsing still fails, try one more aggressive approach # Remove any remaining control characters that might be causing issues # This is the key fix - we need to escape control characters rather than remove them def escape_control_chars(match): char = match.group(0) # Handle specific control characters that might cause issues if char == "\n": return "\\n" elif char == "\r": return "\\r" elif char == "\t": return "\\t" else: # For other control characters, use Unicode escape return f"\\u{ord(char):04x}" # More aggressive approach: escape ALL control characters # This handles the case where line breaks occur within JSON string values # We need to escape them before they break the JSON structure # First, let's try to detect if there are line breaks within quoted strings # and escape them properly # Simple approach: escape all control characters array_content = re.sub(r"[\x00-\x1f]", escape_control_chars, array_content) try: data = json.loads(array_content) return json.dumps( data, ensure_ascii=False, indent=2, separators=(",", ": ") ) except json.JSONDecodeError: # Final fallback - return a minimal valid JSON array return json.dumps( [], ensure_ascii=False, indent=2, separators=(",", ": ") ) app = typer.Typer(no_args_is_help=True) session: requests.Session userid: int global_config_section: str = "default" # Add global option for config section @app.callback() def main( config_section: Annotated[ str, typer.Option( "--config-section", "-c", help="Configuration section to use from INI file" ), ] = "default", ): """My Ice Hockey schedule tool""" # Store the config_section in a global variable so it can be accessed by commands global global_config_section global_config_section = config_section class AgeGroup(str, Enum): u111 = "U11 (1)" u13e = "U13 (Elite)" u13t = "U13 (Top)" u13a = "U13 (A)" u13p = "U13 (Prép)" u14ev = "U14 (Elite Vernets)" u14esmv = "U14 (Elite Sous-Moulin/Vergers)" u15t = "U15 (Top)" u15a = "U15 (A)" u16e = "U16 (Elite)" u16t = "U16 (Top GS Ass)" u18e = "U18 (Elite)" u18el = "U18 (Elit)" u21e = "U21 (ELIT)" def normalize_age_group(value: str) -> AgeGroup | None: """Normalize age group string to handle case and spelling variations.""" import re if not isinstance(value, str): return None def normalize_spelling(text: str) -> str: """Normalize spelling variations of 'elit' to 'elite'.""" # Replace accented versions first text = text.replace("élit", "elite") # Use regex to replace "elit" with "elite" only when it's a complete word # This avoids replacing "elit" within "elite" text = re.sub(r"\belit\b", "elite", text) return text # Convert to lowercase for case-insensitive comparison input_lower = value.lower() input_normalized = normalize_spelling(input_lower) for member in AgeGroup: # Convert enum value to lowercase and normalize member_lower = member.value.lower() member_normalized = normalize_spelling(member_lower) # Check for match if member_normalized == input_normalized: return member return None class EventType(str, Enum): game = "game" practice = "practice" def load_cookies(file: str = "cookies.txt") -> requests.cookies.RequestsCookieJar: cookie_jar_file = Path(file) cj_dict = {} if cookie_jar_file.exists(): with cookie_jar_file.open("rb") as f: cj_dict = json.load(f) return requests.cookies.cookiejar_from_dict(cj_dict) def save_cookies(file: str = "cookies.txt"): cookie_jar_file = Path(file) with cookie_jar_file.open("w") as f: f.write(json.dumps(requests.utils.dict_from_cookiejar(session.cookies))) def get_login( local_file: str = "myice.ini", config_section: str = "default", ) -> tuple[str, str, int | None, str | None, int | None]: config = configparser.ConfigParser() config.read( [ Path("~/.config/myice.ini").expanduser(), "myice.ini", local_file, ] ) if config_section in config.sections(): selected_config = config[config_section] elif "default" in config.sections(): # Fallback to default section if specified section doesn't exist selected_config = config["default"] print( f"Warning: Section '{config_section}' not found, using 'default' section", file=sys.stderr, ) else: print("Error: please configure username/password in ini file", file=sys.stderr) sys.exit(1) username = selected_config.get("username") password = selected_config.get("password") userid = selected_config.getint("userid", fallback=None) token = selected_config.get("token", fallback=None) club_id = selected_config.getint("club_id", fallback=None) if not username or not password: print("Error: please configure username/password in ini file", file=sys.stderr) sys.exit(1) return username, password, userid, token, club_id def select_club(club_id: int = 172): """Select a club by ID after login.""" global session r = session.get( f"https://app.myice.hockey/?cl={club_id}", headers={"User-Agent": user_agent} ) r.raise_for_status() def do_login(config_section: str | None = None): global session global userid global global_config_section # Use provided config_section, or fall back to global one section = config_section if config_section is not None else global_config_section username, password, userid_tmp, token, club_id = get_login(config_section=section) if userid_tmp is not None: userid = userid_tmp r = session.get("https://app.myice.hockey/", headers={"User-Agent": user_agent}) r.raise_for_status() form_data = { "login_email": username, "login_password": password, "sublogin": 1, "login_submit": "", } r = session.post( "https://app.myice.hockey/classes/process.php", data=form_data, headers={ "Referer": "https://app.myice.hockey/", "Origin": "https://app.myice.hockey", "User-Agent": user_agent, }, ) r.raise_for_status() # select the club we want if club_id: select_club(club_id) def get_userid(): global session r = session.get( "https://app.myice.hockey/players/clubschedule/", headers={ "User-Agent": user_agent, "Referer": "https://app.myice.hockey/classes/process.php", }, ) r.raise_for_status() for line in r.text.splitlines(): m = re.search(r"^\s+userid: '(?P[0-9]+)'", line) if m: userid = m.group("userid") break return userid def wrapper_session(func): def wrapper(*args, **kwargs): global session, userid, global_config_section # Use the global config_section config_section = global_config_section session = requests.Session() # session.verify = False session.cookies = load_cookies() session.cookies.clear_expired_cookies() if not session.cookies.get("mih_v3_cookname"): print("login...", file=sys.stderr) do_login(config_section=None) # Use global config_section save_cookies() _, _, userid, _, _ = get_login(config_section=config_section) if not userid: print("get userid...", file=sys.stderr) userid = get_userid() print(f"{userid=}", file=sys.stderr) return func(*args, **kwargs) return wrapper @wrapper_session def get_schedule(num_days: int) -> str: global session global userid assert session and userid now = datetime.datetime.now() date_start = now date_end = date_start + datetime.timedelta(days=num_days) r = session.post( "https://app.myice.hockey/inc/processclubplanning.php", data={ "type": "fetchmy", "userid": userid, "type_location": ["*"], "start": date_start.strftime("%Y-%m-%d"), "end": date_end.strftime("%Y-%m-%d"), }, headers={ "User-Agent": user_agent, "Referer": "https://app.myice.hockey/players/clubschedule/", "Accept": "application/json, text/javascript, */*; q=0.01", "X-Requested-With": "XMLHttpRequest", }, ) r.raise_for_status() # Debug: Save raw response to file for analysis # with open("raw_response.txt", "w") as f: # f.write(r.text) return json.loads(sanitize_json_response(r.text)) @wrapper_session def game_pdf(gameid: int, outfile: Path): global session, userid assert session and userid r = session.get( f"https://app.myice.hockey/pdfexports/playerplatgamepdfgenerator.php?id={gameid}", headers={ "User-Agent": user_agent, "Referer": "https://app.myice.hockey/inc/processclubplanning.php", }, ) r.raise_for_status() with outfile.open("wb") as fd: for chunk in r.iter_content(chunk_size=1024 * 1024 * 4): fd.write(chunk) @wrapper_session def practice_pdf(gameid: int, outfile: Path): global session, userid assert session and userid r = session.get( f"https://app.myice.hockey/pdfexports/playerplatpracticepdf.php?practice={gameid}", headers={ "User-Agent": user_agent, "Referer": "https://app.myice.hockey/inc/processclubplanning.php", }, ) r.raise_for_status() with outfile.open("wb") as fd: for chunk in r.iter_content(chunk_size=1024 * 1024 * 4): fd.write(chunk) @app.command() def schedule( outfile: Annotated[ Path | None, typer.Option( "--outfile", "-o", help="file to write result to, or stdout if none" ), ] = None, num_days: Annotated[int, typer.Option("--days")] = 7, ): """ Fetch schedule as json """ global global_config_section schedule = get_schedule(num_days) if outfile: with outfile.open("w") as f: f.write(json.dumps(schedule)) else: import builtins builtins.print(json.dumps(schedule, indent=2)) def os_open(file: str) -> None: if os.uname().sysname == "Linux": os.system(f"xdg-open {file}") else: print(f"Opening file {file}", file=sys.stderr) os.system(f"open {file}") def extract_players(pdf_file: Path) -> List[str]: reader = PyPDF2.PdfReader(pdf_file) page = reader.pages[0] players = [] def visitor_body(text, cm, tm, fontDict, fontSize): (x, y) = (tm[4], tm[5]) # print(tm, text) if x > 79 and x < 80 and y < 741.93: # and y < 741.93 and y > 741.93 - 585.18: players.append(text) page.extract_text(visitor_text=visitor_body) return players @app.command("game") def get_game_pdf( game_id: Annotated[int, typer.Argument(help="ID of game to gen pdf for")], open_file: Annotated[bool, typer.Option("--open", "-o")] = False, ): """ Genate the pdf for the game invitation """ global global_config_section if open_file: output_filename = f"game_{game_id}.pdf" else: output_filename = tempfile.NamedTemporaryFile().name game_pdf(game_id, Path(output_filename)) if open_file: os_open(output_filename) else: players = extract_players(Path(output_filename)) print("Players:") print("\n".join(players)) @app.command("practice") def get_practice_pdf( game_id: Annotated[int, typer.Argument(help="ID of practice to gen pdf for")], ): """ Genate the pdf for the practice invitation """ global global_config_section output_filename = f"practice_{game_id}.pdf" practice_pdf(game_id, Path(output_filename)) os_open(output_filename) @app.command("search") def parse_schedule( age_group: Annotated[AgeGroup | None, typer.Option()] = None, event_type_filter: Annotated[ EventType | None, typer.Option("--type", help="Only display events of this type"), ] = None, schedule_file: Annotated[ Path, typer.Option(help="schedule json file to parse") ] = Path("schedule.json"), ): """ Parse schedule.json to look for specific games or practices """ global global_config_section try: with schedule_file.open("r") as f: data = json.load(f) except json.JSONDecodeError: # If JSON is malformed, try to sanitize it first with schedule_file.open("r") as f: content = f.read() sanitized_content = sanitize_json_response(content) data = json.loads(sanitized_content) # age_group filter if age_group: events = [x for x in data if normalize_age_group(x["agegroup"]) == age_group] else: events = [x for x in data if normalize_age_group(x["agegroup"]) is not None] # event_type filter if event_type_filter: if event_type_filter.value == EventType.game: events = [x for x in events if "event" in x and x["event"] == "Jeu"] else: events = [x for x in events if "event" not in x or x["event"] == "Jeu"] for event in events: if age_group: raw_title = event["title"].removeprefix(age_group + "\n") else: raw_title = event["title"] title = " ".join(raw_title.split("\n")) start = datetime.datetime.fromisoformat(event["start"]) start_fmt = start.strftime("%H:%M") end = datetime.datetime.fromisoformat(event["end"]) end_fmt = end.strftime("%H:%M") if "event" in event and event["event"] == "Jeu": event_type = "game" else: event_type = "practice" print( f"[{event['id_event']}] {event['date']} {event_type} {start_fmt}-> {end_fmt}" ) print(f"{event_type}: {title}\n") @app.command("ai") def check_with_ai( schedule_file: Annotated[ Path, typer.Option(help="schedule json file to parse") ] = Path("schedule.json"), ): """ Search through the schedule with natural language using Infomaniak LLM API """ global global_config_section if not utils.init(): sys.exit(1) with schedule_file.open("r") as f: schedule_data = json.load(f) schedule_data = [ x for x in schedule_data if normalize_age_group(x["agegroup"]) is not None ] for event in schedule_data: event["team"] = event["agegroup"].replace("(", "").replace(")", "") del event["agegroup"] when = datetime.datetime.now().strftime("%d-%m-%Y et il est %H:%M") system = "\n".join( [ "Tu es une IA connaissant bien les données suivantes, qui décrivent les match et entraînements d'équipes de hockey sur glace.", f"aujourd'hui, nous sommes le {when}" "attention: ce qu'il y a entre parenthèse après la catégorie est une catégorie à part entière, example, u13 a = U13 (A) et ça correspond aux agegroup", "assure-toi de ne pas confondre les catégories d'age dans tes réponses. ne donne pas une réponse pour la mauvaise équipe", "ne confond pas top, elite, a, prép", "```json", json.dumps(schedule_data), "```", ], ) history: List[Tuple[str, str]] = [] while True: try: question = input("> ") except EOFError: break answer = utils.llm_inference( question, history, system=system, model="mixtral8x22b" ) print("<", answer) history.append((question, answer)) mobile_headers = { "Accept": "application/json, text/plain, */*", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148", "Sec-Fetch-Site": "cross-site", "Accept-Language": "fr-FR,fr;q=0.9", "Sec-Fetch-Mode": "cors", "Origin": "null", "Sec-Fetch-Dest": "empty", "Content-Type": "application/x-www-form-urlencoded", } def mobile_login(config_section: str | None = None): global global_config_section import base64 # Use provided config_section, or fall back to global one section = config_section if config_section is not None else global_config_section username, password, _, token, club_id = get_login(config_section=section) if token and club_id: return {"id": 0, "token": token, "id_club": club_id} print("Requesting token", file=sys.stderr) with requests.post( "https://app.myice.hockey/api/mobilerest/login", headers=mobile_headers, data=f"login_email={base64.b64encode(username.encode()).decode()}&login_password={base64.b64encode(password.encode()).decode()}&language=FR&v=2", # verify=False, ) as r: r.raise_for_status() return { "id": 0, "token": r.json()["userinfo"]["token"], "id_club": club_id or r.json()["userinfo"]["id_club"], # Use configured club_id if available } userdata = { "id": 0, "id_club": 0, "token": "", } def refresh_data(): with requests.post( "https://app.myice.hockey/api/mobilerest/refreshdata", headers=mobile_headers, data=f"token={userdata['token']}&id_club={userdata['id_club']}&language=FR", # verify=False, ) as r: r.raise_for_status() # Since the API returns valid JSON, we don't need to sanitize it # Just parse it directly try: return r.json() except json.JSONDecodeError: # If direct parsing fails, try with sanitization sanitized = sanitize_json_response(r.text) try: return json.loads(sanitized) except json.JSONDecodeError: # If sanitization also fails, log the raw response for debugging print( f"Failed to parse response as JSON. Raw response: {r.text[:500]}..." ) # Return an empty dict to avoid breaking the API return {} @app.command("mobile-login") def do_mobile_login(): global userdata, global_config_section userdata = mobile_login(config_section=global_config_section) print(json.dumps(userdata, indent=2)) @app.command("mobile") def mobile(): global userdata, global_config_section userdata = mobile_login(config_section=global_config_section) games = [x for x in refresh_data().get("club_games")] # Use built-in print to avoid rich formatting issues import builtins builtins.print(json.dumps(games, indent=2, ensure_ascii=False)) @app.command("mobile-game") def mobile_game( game_id: Annotated[int, typer.Argument(help="game id")], raw: Annotated[bool, typer.Option(help="display raw output")] = False, ): global userdata, global_config_section userdata = mobile_login(config_section=global_config_section) with requests.post( "https://app.myice.hockey/api/mobilerest/getevent", headers=mobile_headers, data="&".join( [ f"token={userdata['token']}", f"id_event={game_id}", "type=games", f"id_player={userdata['id']}", f"id_club={userdata['id_club']}", "language=FR", ] ), # verify=False, ) as r: data = json.loads(sanitize_json_response(r.text))["eventData"] players = data["convocation"]["available"] if raw: print(data) print( f"Game {game_id}: {data['title']} ({data['type']}, {data['is_away']}) at {data['time_start']}" ) print(f"{len(players)} players") for player in sorted( players, key=lambda x: ( x["position"] if x["position"] else "none", x["dob"], ), ): print( f"[{player['position']}] {player['fname']} {player['lname']} ({player['dob']})" ) if __name__ == "__main__": app()