"""Core encounter processing: filter, parse, aggregate, and group encounters.""" from __future__ import annotations import re from typing import Any from .mappings import LocationMapper, PokemonMapper, map_encounter_method from .models import Encounter, Route # --------------------------------------------------------------------------- # Rate parsing # --------------------------------------------------------------------------- # Word-based rates → numeric value _WORD_RATES: dict[str, int] = { "one": 100, "two": 50, "three": 33, "four": 25, "five": 20, "six": 17, "seven": 14, "eight": 13, "choose one": 100, "one of three": 33, "only one": 100, "unlimited": 100, "respawns": 100, "common": 60, "average": 30, "rare": 10, "varies": 50, } _PERCENT_RE = re.compile(r"~?(\d+(?:\.\d+)?)%?") def parse_rate(value: str | None) -> int | None: """Parse a rate string into an integer percentage (0-100). Handles formats: "50%", "~10%", "one", "common", "100", "??%", etc. Returns None if unparseable. """ if not value: return None value = value.strip() # Word-based lower = value.lower() if lower in _WORD_RATES: return _WORD_RATES[lower] # Unknown if value == "??%": return None # Numeric percentage: "50%", "~10%", "10.14%", or bare "100" m = _PERCENT_RE.match(value) if m: return max(1, round(float(m.group(1)))) return None def extract_encounter_rate(record: dict[str, Any], generation: int) -> int: """Extract a single encounter_rate from a PokeDB encounter record. Flattens generation-specific rate variants into a single value. """ # Gen 1/3/6: rate_overall rate_overall = parse_rate(record.get("rate_overall")) if rate_overall is not None: return rate_overall # Gen 2/4: time-of-day rates — take the max time_rates = [ parse_rate(record.get("rate_morning")), parse_rate(record.get("rate_day")), parse_rate(record.get("rate_night")), ] time_rates = [r for r in time_rates if r is not None] if time_rates: return max(time_rates) # Gen 5: seasonal rates — take the max season_rates = [ parse_rate(record.get("rate_spring")), parse_rate(record.get("rate_summer")), parse_rate(record.get("rate_autumn")), parse_rate(record.get("rate_winter")), ] season_rates = [r for r in season_rates if r is not None] if season_rates: return max(season_rates) # Gen 8 Sw/Sh: weather rates — take the max weather_rates = [] for key, val in record.items(): if key.startswith("weather_") and key.endswith("_rate") and val: parsed = parse_rate(val) if parsed is not None: weather_rates.append(parsed) if weather_rates: return max(weather_rates) # Gen 8 Legends Arceus: boolean conditions → presence-based if record.get("during_any_time") or record.get("during_morning") or \ record.get("during_day") or record.get("during_evening") or record.get("during_night"): return 100 # Present under conditions # Gen 9 Sc/Vi: probability weights → normalize prob_overall = record.get("probability_overall") if prob_overall: parsed = parse_rate(prob_overall) if parsed is not None: # These are spawn weights (e.g. "20", "300"), not percentages. # We'll normalize them later during aggregation when we have # all encounters for a location. For now, store the raw weight. return parsed # Check time-based probability variants prob_rates = [ parse_rate(record.get("probability_morning")), parse_rate(record.get("probability_day")), parse_rate(record.get("probability_evening")), parse_rate(record.get("probability_night")), ] prob_rates = [r for r in prob_rates if r is not None] if prob_rates: return max(prob_rates) # Fallback: gift/trade/static encounters with no rate return 100 # --------------------------------------------------------------------------- # Level parsing # --------------------------------------------------------------------------- def parse_levels(levels_str: str | None) -> tuple[int, int]: """Parse a level string into (min_level, max_level). "2 - 4" → (2, 4) "67" → (67, 67) "44 - 51" → (44, 51) Returns (1, 1) if unparseable. """ if not levels_str: return (1, 1) levels_str = levels_str.strip() # Range: "2 - 4" or "2-4" m = re.match(r"(\d+)\s*-\s*(\d+)", levels_str) if m: return (int(m.group(1)), int(m.group(2))) # Single: "67" m = re.match(r"(\d+)", levels_str) if m: level = int(m.group(1)) return (level, level) return (1, 1) # --------------------------------------------------------------------------- # Core processing # --------------------------------------------------------------------------- def filter_encounters_for_game( encounters: list[dict[str, Any]], game_slug: str, ) -> list[dict[str, Any]]: """Filter PokeDB encounters to only those for a specific game version.""" return [ e for e in encounters if game_slug in (e.get("version_identifiers") or []) ] def process_encounters( raw_encounters: list[dict[str, Any]], generation: int, pokemon_mapper: PokemonMapper, location_mapper: LocationMapper, ) -> dict[str, list[Encounter]]: """Process raw PokeDB encounters into grouped-by-location-area Encounter objects. Returns {location_area_identifier: [Encounter, ...]}. """ by_area: dict[str, list[Encounter]] = {} for record in raw_encounters: # Map encounter method method_id = record.get("encounter_method_identifier", "") method = map_encounter_method(method_id) if method_id else None if method is None: continue # Map pokemon form_id = record.get("pokemon_form_identifier") pokemon_info = pokemon_mapper.lookup(form_id) if pokemon_info is None: continue pokeapi_id, pokemon_name = pokemon_info # Parse levels min_level, max_level = parse_levels(record.get("levels")) # Extract rate encounter_rate = extract_encounter_rate(record, generation) # Location area area_id = record.get("location_area_identifier", "") if not area_id: continue enc = Encounter( pokeapi_id=pokeapi_id, pokemon_name=pokemon_name, method=method, encounter_rate=encounter_rate, min_level=min_level, max_level=max_level, ) by_area.setdefault(area_id, []).append(enc) return by_area def aggregate_encounters(encounters: list[Encounter]) -> list[Encounter]: """Aggregate encounters by (pokeapi_id, method), merging level ranges and summing rates. Replicates the Go tool's aggregation logic. """ key_type = tuple[int, str] agg: dict[key_type, Encounter] = {} order: list[key_type] = [] for enc in encounters: k = (enc.pokeapi_id, enc.method) if k in agg: existing = agg[k] existing.encounter_rate += enc.encounter_rate existing.min_level = min(existing.min_level, enc.min_level) existing.max_level = max(existing.max_level, enc.max_level) else: # Copy so we don't mutate the original agg[k] = Encounter( pokeapi_id=enc.pokeapi_id, pokemon_name=enc.pokemon_name, method=enc.method, encounter_rate=enc.encounter_rate, min_level=enc.min_level, max_level=enc.max_level, ) order.append(k) result = [] for k in order: e = agg[k] e.encounter_rate = min(e.encounter_rate, 100) result.append(e) # Sort by rate descending, then name ascending result.sort(key=lambda e: (-e.encounter_rate, e.pokemon_name)) return result def build_routes( encounters_by_area: dict[str, list[Encounter]], location_mapper: LocationMapper, ) -> list[Route]: """Group encounters by location, building parent/child route hierarchy. Multiple areas under the same location → parent route with children. Single area → flat route. """ # Group areas by their parent location identifier loc_groups: dict[str, list[tuple[str, str, list[Encounter]]]] = {} # loc_id → [(area_id, area_display_name, encounters), ...] for area_id, encounters in encounters_by_area.items(): loc_id = location_mapper.get_location_identifier(area_id) if not loc_id: loc_id = area_id # fallback area_name = location_mapper.get_area_name(area_id) loc_groups.setdefault(loc_id, []).append((area_id, area_name, encounters)) routes: list[Route] = [] for loc_id, areas in loc_groups.items(): loc_name = location_mapper.get_location_name(areas[0][0]) if len(areas) == 1: # Single area — flat route _, area_name, encounters = areas[0] aggregated = aggregate_encounters(encounters) if aggregated: # If the area has a distinct name different from the location, use it route_name = area_name if area_name and area_name != loc_name else loc_name routes.append(Route(name=route_name, order=0, encounters=aggregated)) else: # Multiple areas — check if encounters differ children: list[Route] = [] # Encounters for areas with no distinct name get merged into parent parent_encounters: list[Encounter] = [] for _, area_name, encounters in areas: aggregated = aggregate_encounters(encounters) if aggregated: if area_name and area_name != loc_name: children.append(Route(name=area_name, order=0, encounters=aggregated)) else: # No distinct area name — merge into parent parent_encounters.extend(aggregated) if children: # Parent with children (parent may also have its own encounters) parent_agg = aggregate_encounters(parent_encounters) if parent_encounters else [] routes.append(Route( name=loc_name, order=0, encounters=parent_agg, children=children, )) elif parent_encounters: # All areas had same name — flatten into single route routes.append(Route( name=loc_name, order=0, encounters=aggregate_encounters(parent_encounters), )) return routes def filter_den_routes(routes: list[Route]) -> list[Route]: """Remove Max Raid den child routes from the route list. Dens are identified by "(Den " in the child route name. Only children are filtered — parent routes are kept. """ for route in routes: if route.children: route.children = [c for c in route.children if "(Den " not in c.name] return routes