Files

395 lines
12 KiB
Python
Raw Permalink Normal View History

"""Core encounter processing: filter, parse, aggregate, and group encounters."""
from __future__ import annotations
import re
from typing import Any
from .mappings import LocationMapper, PokemonMapper, map_encounter_method
from .models import Encounter, Route
# ---------------------------------------------------------------------------
# Rate parsing
# ---------------------------------------------------------------------------
# Word-based rates → numeric value
_WORD_RATES: dict[str, int] = {
"one": 100,
"two": 50,
"three": 33,
"four": 25,
"five": 20,
"six": 17,
"seven": 14,
"eight": 13,
"choose one": 100,
"one of three": 33,
"only one": 100,
"unlimited": 100,
"respawns": 100,
"common": 60,
"average": 30,
"rare": 10,
"varies": 50,
}
_PERCENT_RE = re.compile(r"~?(\d+(?:\.\d+)?)%?")
def parse_rate(value: str | None) -> int | None:
"""Parse a rate string into an integer percentage (0-100).
Handles formats: "50%", "~10%", "one", "common", "100", "??%", etc.
Returns None if unparseable.
"""
if not value:
return None
value = value.strip()
# Word-based
lower = value.lower()
if lower in _WORD_RATES:
return _WORD_RATES[lower]
# Unknown
if value == "??%":
return None
# Numeric percentage: "50%", "~10%", "10.14%", or bare "100"
m = _PERCENT_RE.match(value)
if m:
return max(1, round(float(m.group(1))))
return None
def extract_encounter_data(
record: dict[str, Any],
generation: int,
) -> tuple[int, dict[str, int] | None]:
"""Extract encounter rate and per-condition rates from a PokeDB record.
Returns (rate, conditions) where:
- rate is the max/overall rate (used for sorting and backward compat)
- conditions is a dict of {condition_name: rate} or None for flat rates
"""
# Gen 1/3/6: rate_overall — flat rate, no conditions
rate_overall = parse_rate(record.get("rate_overall"))
if rate_overall is not None:
return rate_overall, None
# Gen 2/4/7: time-of-day rates
time_fields = {
"morning": parse_rate(record.get("rate_morning")),
"day": parse_rate(record.get("rate_day")),
"night": parse_rate(record.get("rate_night")),
}
time_conditions = {k: v for k, v in time_fields.items() if v is not None}
if time_conditions:
rate = max(time_conditions.values())
return rate, time_conditions
# Gen 5: seasonal rates
season_fields = {
"spring": parse_rate(record.get("rate_spring")),
"summer": parse_rate(record.get("rate_summer")),
"autumn": parse_rate(record.get("rate_autumn")),
"winter": parse_rate(record.get("rate_winter")),
}
season_conditions = {
k: v for k, v in season_fields.items() if v is not None
}
if season_conditions:
rate = max(season_conditions.values())
return rate, season_conditions
# Gen 8 Sw/Sh: weather rates
weather_conditions: dict[str, int] = {}
for key, val in record.items():
if key.startswith("weather_") and key.endswith("_rate") and val:
parsed = parse_rate(val)
if parsed is not None:
# "weather_clear_rate" -> "clear"
condition_name = key[len("weather_"):-len("_rate")]
weather_conditions[condition_name] = parsed
if weather_conditions:
rate = max(weather_conditions.values())
return rate, weather_conditions
# Gen 8 Legends Arceus: boolean conditions — presence-based
if (
record.get("during_any_time")
or record.get("during_morning")
or record.get("during_day")
or record.get("during_evening")
or record.get("during_night")
):
return 100, None
# Gen 9 Sc/Vi: probability weights — normalize
prob_overall = record.get("probability_overall")
if prob_overall:
parsed = parse_rate(prob_overall)
if parsed is not None:
return parsed, None
# Check time-based probability variants
prob_rates = [
parse_rate(record.get("probability_morning")),
parse_rate(record.get("probability_day")),
parse_rate(record.get("probability_evening")),
parse_rate(record.get("probability_night")),
]
prob_rates = [r for r in prob_rates if r is not None]
if prob_rates:
return max(prob_rates), None
# Fallback: gift/trade/static encounters with no rate
return 100, None
# ---------------------------------------------------------------------------
# Level parsing
# ---------------------------------------------------------------------------
def parse_levels(levels_str: str | None) -> tuple[int, int]:
"""Parse a level string into (min_level, max_level).
"2 - 4" (2, 4)
"67" (67, 67)
"44 - 51" (44, 51)
Returns (1, 1) if unparseable.
"""
if not levels_str:
return (1, 1)
levels_str = levels_str.strip()
# Range: "2 - 4" or "2-4"
m = re.match(r"(\d+)\s*-\s*(\d+)", levels_str)
if m:
return (int(m.group(1)), int(m.group(2)))
# Single: "67"
m = re.match(r"(\d+)", levels_str)
if m:
level = int(m.group(1))
return (level, level)
return (1, 1)
# ---------------------------------------------------------------------------
# Core processing
# ---------------------------------------------------------------------------
def filter_encounters_for_game(
encounters: list[dict[str, Any]],
game_slug: str,
) -> list[dict[str, Any]]:
"""Filter PokeDB encounters to only those for a specific game version."""
return [
e for e in encounters
if game_slug in (e.get("version_identifiers") or [])
]
def process_encounters(
raw_encounters: list[dict[str, Any]],
generation: int,
pokemon_mapper: PokemonMapper,
location_mapper: LocationMapper,
) -> dict[str, list[Encounter]]:
"""Process raw PokeDB encounters into grouped-by-location-area Encounter objects.
Returns {location_area_identifier: [Encounter, ...]}.
"""
by_area: dict[str, list[Encounter]] = {}
for record in raw_encounters:
# Map encounter method
method_id = record.get("encounter_method_identifier", "")
method = map_encounter_method(method_id) if method_id else None
if method is None:
continue
# Map pokemon
form_id = record.get("pokemon_form_identifier")
pokemon_info = pokemon_mapper.lookup(form_id)
if pokemon_info is None:
continue
pokeapi_id, pokemon_name = pokemon_info
# Parse levels
min_level, max_level = parse_levels(record.get("levels"))
# Extract rate and conditions
encounter_rate, conditions = extract_encounter_data(record, generation)
# Location area
area_id = record.get("location_area_identifier", "")
if not area_id:
continue
enc = Encounter(
pokeapi_id=pokeapi_id,
pokemon_name=pokemon_name,
method=method,
encounter_rate=encounter_rate,
min_level=min_level,
max_level=max_level,
conditions=conditions,
)
by_area.setdefault(area_id, []).append(enc)
return by_area
def _merge_conditions(
a: dict[str, int] | None,
b: dict[str, int] | None,
) -> dict[str, int] | None:
"""Merge two condition dicts by summing rates per key."""
if a is None and b is None:
return None
merged = dict(a or {})
for k, v in (b or {}).items():
merged[k] = merged.get(k, 0) + v
return merged
def _cap_conditions(conditions: dict[str, int]) -> dict[str, int]:
"""Cap each condition rate at 100."""
return {k: min(v, 100) for k, v in conditions.items()}
def aggregate_encounters(encounters: list[Encounter]) -> list[Encounter]:
"""Aggregate encounters by (pokeapi_id, method), merging level ranges and summing rates.
Preserves per-condition rates through aggregation.
"""
key_type = tuple[int, str]
agg: dict[key_type, Encounter] = {}
order: list[key_type] = []
for enc in encounters:
k = (enc.pokeapi_id, enc.method)
if k in agg:
existing = agg[k]
existing.encounter_rate += enc.encounter_rate
existing.min_level = min(existing.min_level, enc.min_level)
existing.max_level = max(existing.max_level, enc.max_level)
existing.conditions = _merge_conditions(
existing.conditions, enc.conditions
)
else:
agg[k] = Encounter(
pokeapi_id=enc.pokeapi_id,
pokemon_name=enc.pokemon_name,
method=enc.method,
encounter_rate=enc.encounter_rate,
min_level=enc.min_level,
max_level=enc.max_level,
conditions=dict(enc.conditions) if enc.conditions else None,
)
order.append(k)
result = []
for k in order:
e = agg[k]
e.encounter_rate = min(e.encounter_rate, 100)
if e.conditions:
e.conditions = _cap_conditions(e.conditions)
e.encounter_rate = max(e.conditions.values())
result.append(e)
# Sort by rate descending, then name ascending
result.sort(key=lambda e: (-e.encounter_rate, e.pokemon_name))
return result
def build_routes(
encounters_by_area: dict[str, list[Encounter]],
location_mapper: LocationMapper,
) -> list[Route]:
"""Group encounters by location, building parent/child route hierarchy.
Multiple areas under the same location parent route with children.
Single area flat route.
"""
# Group areas by their parent location identifier
loc_groups: dict[str, list[tuple[str, str, list[Encounter]]]] = {}
# loc_id → [(area_id, area_display_name, encounters), ...]
for area_id, encounters in encounters_by_area.items():
loc_id = location_mapper.get_location_identifier(area_id)
if not loc_id:
loc_id = area_id # fallback
area_name = location_mapper.get_area_name(area_id)
loc_groups.setdefault(loc_id, []).append((area_id, area_name, encounters))
routes: list[Route] = []
for loc_id, areas in loc_groups.items():
loc_name = location_mapper.get_location_name(areas[0][0])
if len(areas) == 1:
# Single area — flat route
_, area_name, encounters = areas[0]
aggregated = aggregate_encounters(encounters)
if aggregated:
# If the area has a distinct name different from the location, use it
route_name = area_name if area_name and area_name != loc_name else loc_name
routes.append(Route(name=route_name, order=0, encounters=aggregated))
else:
# Multiple areas — check if encounters differ
children: list[Route] = []
# Encounters for areas with no distinct name get merged into parent
parent_encounters: list[Encounter] = []
for _, area_name, encounters in areas:
aggregated = aggregate_encounters(encounters)
if aggregated:
if area_name and area_name != loc_name:
children.append(Route(name=area_name, order=0, encounters=aggregated))
else:
# No distinct area name — merge into parent
parent_encounters.extend(aggregated)
if children:
# Parent with children (parent may also have its own encounters)
parent_agg = aggregate_encounters(parent_encounters) if parent_encounters else []
routes.append(Route(
name=loc_name,
order=0,
encounters=parent_agg,
children=children,
))
elif parent_encounters:
# All areas had same name — flatten into single route
routes.append(Route(
name=loc_name,
order=0,
encounters=aggregate_encounters(parent_encounters),
))
return routes
def filter_den_routes(routes: list[Route]) -> list[Route]:
"""Remove Max Raid den child routes from the route list.
Dens are identified by "(Den " in the child route name.
Only children are filtered parent routes are kept.
"""
for route in routes:
if route.children:
route.children = [c for c in route.children if "(Den " not in c.name]
return routes