Den child routes (~561 per game) bloated the route list without being useful for Nuzlocke tracking. Adds filter_den_routes() to strip children matching "(Den " from the route hierarchy, reducing Sw/Sh from ~1,007 to 446 routes each. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
356 lines
11 KiB
Python
356 lines
11 KiB
Python
"""Core encounter processing: filter, parse, aggregate, and group encounters."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
from typing import Any
|
|
|
|
from .mappings import LocationMapper, PokemonMapper, map_encounter_method
|
|
from .models import Encounter, Route
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Rate parsing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# Word-based rates → numeric value
|
|
_WORD_RATES: dict[str, int] = {
|
|
"one": 100,
|
|
"two": 50,
|
|
"three": 33,
|
|
"four": 25,
|
|
"five": 20,
|
|
"six": 17,
|
|
"seven": 14,
|
|
"eight": 13,
|
|
"choose one": 100,
|
|
"one of three": 33,
|
|
"only one": 100,
|
|
"unlimited": 100,
|
|
"respawns": 100,
|
|
"common": 60,
|
|
"average": 30,
|
|
"rare": 10,
|
|
"varies": 50,
|
|
}
|
|
|
|
_PERCENT_RE = re.compile(r"~?(\d+(?:\.\d+)?)%?")
|
|
|
|
|
|
def parse_rate(value: str | None) -> int | None:
|
|
"""Parse a rate string into an integer percentage (0-100).
|
|
|
|
Handles formats: "50%", "~10%", "one", "common", "100", "??%", etc.
|
|
Returns None if unparseable.
|
|
"""
|
|
if not value:
|
|
return None
|
|
|
|
value = value.strip()
|
|
|
|
# Word-based
|
|
lower = value.lower()
|
|
if lower in _WORD_RATES:
|
|
return _WORD_RATES[lower]
|
|
|
|
# Unknown
|
|
if value == "??%":
|
|
return None
|
|
|
|
# Numeric percentage: "50%", "~10%", "10.14%", or bare "100"
|
|
m = _PERCENT_RE.match(value)
|
|
if m:
|
|
return max(1, round(float(m.group(1))))
|
|
|
|
return None
|
|
|
|
|
|
def extract_encounter_rate(record: dict[str, Any], generation: int) -> int:
|
|
"""Extract a single encounter_rate from a PokeDB encounter record.
|
|
|
|
Flattens generation-specific rate variants into a single value.
|
|
"""
|
|
# Gen 1/3/6: rate_overall
|
|
rate_overall = parse_rate(record.get("rate_overall"))
|
|
if rate_overall is not None:
|
|
return rate_overall
|
|
|
|
# Gen 2/4: time-of-day rates — take the max
|
|
time_rates = [
|
|
parse_rate(record.get("rate_morning")),
|
|
parse_rate(record.get("rate_day")),
|
|
parse_rate(record.get("rate_night")),
|
|
]
|
|
time_rates = [r for r in time_rates if r is not None]
|
|
if time_rates:
|
|
return max(time_rates)
|
|
|
|
# Gen 5: seasonal rates — take the max
|
|
season_rates = [
|
|
parse_rate(record.get("rate_spring")),
|
|
parse_rate(record.get("rate_summer")),
|
|
parse_rate(record.get("rate_autumn")),
|
|
parse_rate(record.get("rate_winter")),
|
|
]
|
|
season_rates = [r for r in season_rates if r is not None]
|
|
if season_rates:
|
|
return max(season_rates)
|
|
|
|
# Gen 8 Sw/Sh: weather rates — take the max
|
|
weather_rates = []
|
|
for key, val in record.items():
|
|
if key.startswith("weather_") and key.endswith("_rate") and val:
|
|
parsed = parse_rate(val)
|
|
if parsed is not None:
|
|
weather_rates.append(parsed)
|
|
if weather_rates:
|
|
return max(weather_rates)
|
|
|
|
# Gen 8 Legends Arceus: boolean conditions → presence-based
|
|
if record.get("during_any_time") or record.get("during_morning") or \
|
|
record.get("during_day") or record.get("during_evening") or record.get("during_night"):
|
|
return 100 # Present under conditions
|
|
|
|
# Gen 9 Sc/Vi: probability weights → normalize
|
|
prob_overall = record.get("probability_overall")
|
|
if prob_overall:
|
|
parsed = parse_rate(prob_overall)
|
|
if parsed is not None:
|
|
# These are spawn weights (e.g. "20", "300"), not percentages.
|
|
# We'll normalize them later during aggregation when we have
|
|
# all encounters for a location. For now, store the raw weight.
|
|
return parsed
|
|
|
|
# Check time-based probability variants
|
|
prob_rates = [
|
|
parse_rate(record.get("probability_morning")),
|
|
parse_rate(record.get("probability_day")),
|
|
parse_rate(record.get("probability_evening")),
|
|
parse_rate(record.get("probability_night")),
|
|
]
|
|
prob_rates = [r for r in prob_rates if r is not None]
|
|
if prob_rates:
|
|
return max(prob_rates)
|
|
|
|
# Fallback: gift/trade/static encounters with no rate
|
|
return 100
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Level parsing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def parse_levels(levels_str: str | None) -> tuple[int, int]:
|
|
"""Parse a level string into (min_level, max_level).
|
|
|
|
"2 - 4" → (2, 4)
|
|
"67" → (67, 67)
|
|
"44 - 51" → (44, 51)
|
|
Returns (1, 1) if unparseable.
|
|
"""
|
|
if not levels_str:
|
|
return (1, 1)
|
|
|
|
levels_str = levels_str.strip()
|
|
|
|
# Range: "2 - 4" or "2-4"
|
|
m = re.match(r"(\d+)\s*-\s*(\d+)", levels_str)
|
|
if m:
|
|
return (int(m.group(1)), int(m.group(2)))
|
|
|
|
# Single: "67"
|
|
m = re.match(r"(\d+)", levels_str)
|
|
if m:
|
|
level = int(m.group(1))
|
|
return (level, level)
|
|
|
|
return (1, 1)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Core processing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def filter_encounters_for_game(
|
|
encounters: list[dict[str, Any]],
|
|
game_slug: str,
|
|
) -> list[dict[str, Any]]:
|
|
"""Filter PokeDB encounters to only those for a specific game version."""
|
|
return [
|
|
e for e in encounters
|
|
if game_slug in (e.get("version_identifiers") or [])
|
|
]
|
|
|
|
|
|
def process_encounters(
|
|
raw_encounters: list[dict[str, Any]],
|
|
generation: int,
|
|
pokemon_mapper: PokemonMapper,
|
|
location_mapper: LocationMapper,
|
|
) -> dict[str, list[Encounter]]:
|
|
"""Process raw PokeDB encounters into grouped-by-location-area Encounter objects.
|
|
|
|
Returns {location_area_identifier: [Encounter, ...]}.
|
|
"""
|
|
by_area: dict[str, list[Encounter]] = {}
|
|
|
|
for record in raw_encounters:
|
|
# Map encounter method
|
|
method_id = record.get("encounter_method_identifier", "")
|
|
method = map_encounter_method(method_id) if method_id else None
|
|
if method is None:
|
|
continue
|
|
|
|
# Map pokemon
|
|
form_id = record.get("pokemon_form_identifier")
|
|
pokemon_info = pokemon_mapper.lookup(form_id)
|
|
if pokemon_info is None:
|
|
continue
|
|
|
|
pokeapi_id, pokemon_name = pokemon_info
|
|
|
|
# Parse levels
|
|
min_level, max_level = parse_levels(record.get("levels"))
|
|
|
|
# Extract rate
|
|
encounter_rate = extract_encounter_rate(record, generation)
|
|
|
|
# Location area
|
|
area_id = record.get("location_area_identifier", "")
|
|
if not area_id:
|
|
continue
|
|
|
|
enc = Encounter(
|
|
pokeapi_id=pokeapi_id,
|
|
pokemon_name=pokemon_name,
|
|
method=method,
|
|
encounter_rate=encounter_rate,
|
|
min_level=min_level,
|
|
max_level=max_level,
|
|
)
|
|
|
|
by_area.setdefault(area_id, []).append(enc)
|
|
|
|
return by_area
|
|
|
|
|
|
def aggregate_encounters(encounters: list[Encounter]) -> list[Encounter]:
|
|
"""Aggregate encounters by (pokeapi_id, method), merging level ranges and summing rates.
|
|
|
|
Replicates the Go tool's aggregation logic.
|
|
"""
|
|
key_type = tuple[int, str]
|
|
agg: dict[key_type, Encounter] = {}
|
|
order: list[key_type] = []
|
|
|
|
for enc in encounters:
|
|
k = (enc.pokeapi_id, enc.method)
|
|
if k in agg:
|
|
existing = agg[k]
|
|
existing.encounter_rate += enc.encounter_rate
|
|
existing.min_level = min(existing.min_level, enc.min_level)
|
|
existing.max_level = max(existing.max_level, enc.max_level)
|
|
else:
|
|
# Copy so we don't mutate the original
|
|
agg[k] = Encounter(
|
|
pokeapi_id=enc.pokeapi_id,
|
|
pokemon_name=enc.pokemon_name,
|
|
method=enc.method,
|
|
encounter_rate=enc.encounter_rate,
|
|
min_level=enc.min_level,
|
|
max_level=enc.max_level,
|
|
)
|
|
order.append(k)
|
|
|
|
result = []
|
|
for k in order:
|
|
e = agg[k]
|
|
e.encounter_rate = min(e.encounter_rate, 100)
|
|
result.append(e)
|
|
|
|
# Sort by rate descending, then name ascending
|
|
result.sort(key=lambda e: (-e.encounter_rate, e.pokemon_name))
|
|
return result
|
|
|
|
|
|
def build_routes(
|
|
encounters_by_area: dict[str, list[Encounter]],
|
|
location_mapper: LocationMapper,
|
|
) -> list[Route]:
|
|
"""Group encounters by location, building parent/child route hierarchy.
|
|
|
|
Multiple areas under the same location → parent route with children.
|
|
Single area → flat route.
|
|
"""
|
|
# Group areas by their parent location identifier
|
|
loc_groups: dict[str, list[tuple[str, str, list[Encounter]]]] = {}
|
|
# loc_id → [(area_id, area_display_name, encounters), ...]
|
|
|
|
for area_id, encounters in encounters_by_area.items():
|
|
loc_id = location_mapper.get_location_identifier(area_id)
|
|
if not loc_id:
|
|
loc_id = area_id # fallback
|
|
|
|
area_name = location_mapper.get_area_name(area_id)
|
|
loc_groups.setdefault(loc_id, []).append((area_id, area_name, encounters))
|
|
|
|
routes: list[Route] = []
|
|
|
|
for loc_id, areas in loc_groups.items():
|
|
loc_name = location_mapper.get_location_name(areas[0][0])
|
|
|
|
if len(areas) == 1:
|
|
# Single area — flat route
|
|
_, area_name, encounters = areas[0]
|
|
aggregated = aggregate_encounters(encounters)
|
|
if aggregated:
|
|
# If the area has a distinct name different from the location, use it
|
|
route_name = area_name if area_name and area_name != loc_name else loc_name
|
|
routes.append(Route(name=route_name, order=0, encounters=aggregated))
|
|
|
|
else:
|
|
# Multiple areas — check if encounters differ
|
|
children: list[Route] = []
|
|
# Encounters for areas with no distinct name get merged into parent
|
|
parent_encounters: list[Encounter] = []
|
|
|
|
for _, area_name, encounters in areas:
|
|
aggregated = aggregate_encounters(encounters)
|
|
if aggregated:
|
|
if area_name and area_name != loc_name:
|
|
children.append(Route(name=area_name, order=0, encounters=aggregated))
|
|
else:
|
|
# No distinct area name — merge into parent
|
|
parent_encounters.extend(aggregated)
|
|
|
|
if children:
|
|
# Parent with children (parent may also have its own encounters)
|
|
parent_agg = aggregate_encounters(parent_encounters) if parent_encounters else []
|
|
routes.append(Route(
|
|
name=loc_name,
|
|
order=0,
|
|
encounters=parent_agg,
|
|
children=children,
|
|
))
|
|
elif parent_encounters:
|
|
# All areas had same name — flatten into single route
|
|
routes.append(Route(
|
|
name=loc_name,
|
|
order=0,
|
|
encounters=aggregate_encounters(parent_encounters),
|
|
))
|
|
|
|
return routes
|
|
|
|
|
|
def filter_den_routes(routes: list[Route]) -> list[Route]:
|
|
"""Remove Max Raid den child routes from the route list.
|
|
|
|
Dens are identified by "(Den " in the child route name.
|
|
Only children are filtered — parent routes are kept.
|
|
"""
|
|
for route in routes:
|
|
if route.children:
|
|
route.children = [c for c in route.children if "(Den " not in c.name]
|
|
return routes
|