Files
nuzlocke-tracker/tools/import-pokedb/import_pokedb/processing.py

339 lines
10 KiB
Python
Raw Normal View History

"""Core encounter processing: filter, parse, aggregate, and group encounters."""
from __future__ import annotations
import re
from typing import Any
from .mappings import LocationMapper, PokemonMapper, map_encounter_method
from .models import Encounter, Route
# ---------------------------------------------------------------------------
# Rate parsing
# ---------------------------------------------------------------------------
# Word-based rates → numeric value
_WORD_RATES: dict[str, int] = {
"one": 100,
"two": 50,
"three": 33,
"four": 25,
"five": 20,
"six": 17,
"seven": 14,
"eight": 13,
"choose one": 100,
"one of three": 33,
"only one": 100,
"unlimited": 100,
"respawns": 100,
"common": 60,
"average": 30,
"rare": 10,
"varies": 50,
}
_PERCENT_RE = re.compile(r"~?(\d+(?:\.\d+)?)%?")
def parse_rate(value: str | None) -> int | None:
"""Parse a rate string into an integer percentage (0-100).
Handles formats: "50%", "~10%", "one", "common", "100", "??%", etc.
Returns None if unparseable.
"""
if not value:
return None
value = value.strip()
# Word-based
lower = value.lower()
if lower in _WORD_RATES:
return _WORD_RATES[lower]
# Unknown
if value == "??%":
return None
# Numeric percentage: "50%", "~10%", "10.14%", or bare "100"
m = _PERCENT_RE.match(value)
if m:
return max(1, round(float(m.group(1))))
return None
def extract_encounter_rate(record: dict[str, Any], generation: int) -> int:
"""Extract a single encounter_rate from a PokeDB encounter record.
Flattens generation-specific rate variants into a single value.
"""
# Gen 1/3/6: rate_overall
rate_overall = parse_rate(record.get("rate_overall"))
if rate_overall is not None:
return rate_overall
# Gen 2/4: time-of-day rates — take the max
time_rates = [
parse_rate(record.get("rate_morning")),
parse_rate(record.get("rate_day")),
parse_rate(record.get("rate_night")),
]
time_rates = [r for r in time_rates if r is not None]
if time_rates:
return max(time_rates)
# Gen 5: seasonal rates — take the max
season_rates = [
parse_rate(record.get("rate_spring")),
parse_rate(record.get("rate_summer")),
parse_rate(record.get("rate_autumn")),
parse_rate(record.get("rate_winter")),
]
season_rates = [r for r in season_rates if r is not None]
if season_rates:
return max(season_rates)
# Gen 8 Sw/Sh: weather rates — take the max
weather_rates = []
for key, val in record.items():
if key.startswith("weather_") and key.endswith("_rate") and val:
parsed = parse_rate(val)
if parsed is not None:
weather_rates.append(parsed)
if weather_rates:
return max(weather_rates)
# Gen 8 Legends Arceus: boolean conditions → presence-based
if record.get("during_any_time") or record.get("during_morning") or \
record.get("during_day") or record.get("during_evening") or record.get("during_night"):
return 100 # Present under conditions
# Gen 9 Sc/Vi: probability weights → normalize
prob_overall = record.get("probability_overall")
if prob_overall:
parsed = parse_rate(prob_overall)
if parsed is not None:
# These are spawn weights (e.g. "20", "300"), not percentages.
# We'll normalize them later during aggregation when we have
# all encounters for a location. For now, store the raw weight.
return parsed
# Check time-based probability variants
prob_rates = [
parse_rate(record.get("probability_morning")),
parse_rate(record.get("probability_day")),
parse_rate(record.get("probability_evening")),
parse_rate(record.get("probability_night")),
]
prob_rates = [r for r in prob_rates if r is not None]
if prob_rates:
return max(prob_rates)
# Fallback: gift/trade/static encounters with no rate
return 100
# ---------------------------------------------------------------------------
# Level parsing
# ---------------------------------------------------------------------------
def parse_levels(levels_str: str | None) -> tuple[int, int]:
"""Parse a level string into (min_level, max_level).
"2 - 4" (2, 4)
"67" (67, 67)
"44 - 51" (44, 51)
Returns (1, 1) if unparseable.
"""
if not levels_str:
return (1, 1)
levels_str = levels_str.strip()
# Range: "2 - 4" or "2-4"
m = re.match(r"(\d+)\s*-\s*(\d+)", levels_str)
if m:
return (int(m.group(1)), int(m.group(2)))
# Single: "67"
m = re.match(r"(\d+)", levels_str)
if m:
level = int(m.group(1))
return (level, level)
return (1, 1)
# ---------------------------------------------------------------------------
# Core processing
# ---------------------------------------------------------------------------
def filter_encounters_for_game(
encounters: list[dict[str, Any]],
game_slug: str,
) -> list[dict[str, Any]]:
"""Filter PokeDB encounters to only those for a specific game version."""
return [
e for e in encounters
if game_slug in (e.get("version_identifiers") or [])
]
def process_encounters(
raw_encounters: list[dict[str, Any]],
generation: int,
pokemon_mapper: PokemonMapper,
location_mapper: LocationMapper,
) -> dict[str, list[Encounter]]:
"""Process raw PokeDB encounters into grouped-by-location-area Encounter objects.
Returns {location_area_identifier: [Encounter, ...]}.
"""
by_area: dict[str, list[Encounter]] = {}
for record in raw_encounters:
# Map encounter method
method_id = record.get("encounter_method_identifier", "")
method = map_encounter_method(method_id) if method_id else None
if method is None:
continue
# Map pokemon
form_id = record.get("pokemon_form_identifier")
pokemon_info = pokemon_mapper.lookup(form_id)
if pokemon_info is None:
continue
pokeapi_id, pokemon_name = pokemon_info
# Parse levels
min_level, max_level = parse_levels(record.get("levels"))
# Extract rate
encounter_rate = extract_encounter_rate(record, generation)
# Location area
area_id = record.get("location_area_identifier", "")
if not area_id:
continue
enc = Encounter(
pokeapi_id=pokeapi_id,
pokemon_name=pokemon_name,
method=method,
encounter_rate=encounter_rate,
min_level=min_level,
max_level=max_level,
)
by_area.setdefault(area_id, []).append(enc)
return by_area
def aggregate_encounters(encounters: list[Encounter]) -> list[Encounter]:
"""Aggregate encounters by (pokeapi_id, method), merging level ranges and summing rates.
Replicates the Go tool's aggregation logic.
"""
key_type = tuple[int, str]
agg: dict[key_type, Encounter] = {}
order: list[key_type] = []
for enc in encounters:
k = (enc.pokeapi_id, enc.method)
if k in agg:
existing = agg[k]
existing.encounter_rate += enc.encounter_rate
existing.min_level = min(existing.min_level, enc.min_level)
existing.max_level = max(existing.max_level, enc.max_level)
else:
# Copy so we don't mutate the original
agg[k] = Encounter(
pokeapi_id=enc.pokeapi_id,
pokemon_name=enc.pokemon_name,
method=enc.method,
encounter_rate=enc.encounter_rate,
min_level=enc.min_level,
max_level=enc.max_level,
)
order.append(k)
result = []
for k in order:
e = agg[k]
e.encounter_rate = min(e.encounter_rate, 100)
result.append(e)
# Sort by rate descending, then name ascending
result.sort(key=lambda e: (-e.encounter_rate, e.pokemon_name))
return result
def build_routes(
encounters_by_area: dict[str, list[Encounter]],
location_mapper: LocationMapper,
) -> list[Route]:
"""Group encounters by location, building parent/child route hierarchy.
Multiple areas under the same location parent route with children.
Single area flat route.
"""
# Group areas by their parent location identifier
loc_groups: dict[str, list[tuple[str, str, list[Encounter]]]] = {}
# loc_id → [(area_id, area_display_name, encounters), ...]
for area_id, encounters in encounters_by_area.items():
loc_id = location_mapper.get_location_identifier(area_id)
if not loc_id:
loc_id = area_id # fallback
area_name = location_mapper.get_area_name(area_id)
loc_groups.setdefault(loc_id, []).append((area_id, area_name, encounters))
routes: list[Route] = []
for loc_id, areas in loc_groups.items():
loc_name = location_mapper.get_location_name(areas[0][0])
if len(areas) == 1:
# Single area — flat route
_, area_name, encounters = areas[0]
aggregated = aggregate_encounters(encounters)
if aggregated:
# If the area has a distinct name different from the location, use it
route_name = area_name if area_name and area_name != loc_name else loc_name
routes.append(Route(name=route_name, order=0, encounters=aggregated))
else:
# Multiple areas — check if encounters differ
children: list[Route] = []
all_encounters: list[Encounter] = []
for _, area_name, encounters in areas:
aggregated = aggregate_encounters(encounters)
if aggregated:
if area_name and area_name != loc_name:
child_name = area_name
else:
child_name = loc_name
children.append(Route(name=child_name, order=0, encounters=aggregated))
all_encounters.extend(encounters)
if len(children) > 1:
# Parent with children
routes.append(Route(
name=loc_name,
order=0,
encounters=[],
children=children,
))
elif len(children) == 1:
# Only one area had encounters — flatten
routes.append(children[0])
return routes