[CONSEJO — Doc 09] Contrarian: El 80% de los SaaS de inventario que fallan no fallan por tener el método de pronóstico incorrecto — fallan por no manejar correctamente el cold start, la demanda intermitente y los outliers de datos. El motor perfecto sobre datos sucios produce basura perfectamente calculada. Priorizar la limpieza de datos y la robustez ante datos imperfectos sobre la sofisticación del método. First Principles: El dueño de una ferretería no necesita saber si usaste Croston o ADIDA. Necesita saber: "¿qué compro mañana y cuánto?" El output del motor es siempre una orden de compra justificada en español, no un pronóstico estadístico. Expansionist: El motor debe estar diseñado para 14 métodos desde el inicio pero solo necesita implementar los 12 más importantes en MVP. Prophet y LightGBM son aceleradores para V1.1 cuando haya historial suficiente acumulado. Outsider: Los sistemas de inventario industriales (SAP, Oracle) usan métodos simples (SES, Croston clásico) en 80% de sus SKUs. La sofisticación estadística importa en el 20% restante (A-items de demanda errática/lumpy). No sobre-ingeniear para el 20% a costa de la robustez del 80%. Executor: El pipeline debe ser determinista dado el mismo input. Si el cliente ejecuta el motor dos veces el mismo día con el mismo inventario, debe obtener exactamente el misma OC. La reproducibilidad es un atributo de calidad, no un nice-to-have. Security Auditor: Los resultados del motor (forecast, policy parameters) son propiedad del cliente. El motor no puede usar datos de un cliente para mejorar modelos de otro cliente sin consentimiento explícito (riesgo Ley 8968 y competencia desleal). El motor es instance-per-tenant: sin transfer learning cross-tenant. Chairman: El motor estadístico es la razón de existir de StockMind. Todo lo demás (UI, API, integraciones) sirve para que el motor entregue su valor. La inversión en documentar, testear y validar el motor es la más rentable que puede hacer el equipo.
1. Filosofía del motor
El motor estadístico de StockMind es procedural, determinista y explicable:
- Procedural: un pipeline de pasos secuenciales y definidos, no una red neuronal opaca.
- Determinista: dado el mismo historial de movimientos, produce siempre el mismo pronóstico y la misma política de inventario. Sin aleatoriedad en producción (seeds fijos donde aplique).
- Explicable: cada decisión del motor (selección de método, parámetros de política, generación de OC) tiene una explicación en español generada por Jinja2 templates, sin LLMs en el path crítico.
- Instance-per-tenant: el motor corre aislado por tenant. Sin cross-tenant learning.
Librería central: statsforecast (Nixtla) — implementaciones altamente optimizadas en Python/Numba de los principales métodos de pronóstico de series de tiempo.
2. Pipeline de 11 pasos
INPUT: sm_stock_movements (historial) + sm_skus (parámetros) + sm_tenants (settings)
│
┌────────────────────▼────────────────────────────────────────────────┐
│ PASO 1 — INGESTION │
│ Lee historial de movimientos (tipo='sale') del último período │
│ disponible (default 365 días, mín 30 días para cold start) │
└────────────────────┬────────────────────────────────────────────────┘
│
┌────────────────────▼────────────────────────────────────────────────┐
│ PASO 2 — CLEANING │
│ Detección y tratamiento de outliers (IQR×3), fechas faltantes │
│ (imputación por cero para intermittent, interpolación lineal para │
│ smooth), truncación de series con gaps >60 días │
└────────────────────┬────────────────────────────────────────────────┘
│
┌────────────────────▼────────────────────────────────────────────────┐
│ PASO 3 — CLASSIFICATION │
│ ABC × XYZ × Syntetos-Boylan → 36 buckets │
│ Calcula: revenue_90d, CV_xyz, ADI, CV² │
└────────────────────┬────────────────────────────────────────────────┘
│
┌────────────────────▼────────────────────────────────────────────────┐
│ PASO 4 — METHOD SELECTION │
│ Asigna conjunto candidato de métodos según clasificación │
│ (ver tabla de métodos por bucket en §4) │
└────────────────────┬────────────────────────────────────────────────┘
│
┌────────────────────▼────────────────────────────────────────────────┐
│ PASO 5 — CV BACKTEST (expanding window) │
│ Para cada método candidato: expanding window CV con mínimo 3 folds │
│ Métrica: MASE (Mean Absolute Scaled Error) para comparación cross- │
│ series; MAE absoluto para reporting al usuario │
└────────────────────┬────────────────────────────────────────────────┘
│
┌────────────────────▼────────────────────────────────────────────────┐
│ PASO 6 — WINNER SELECTION + ENSEMBLE │
│ El método con menor MASE gana. Si diferencia entre top-2 <5%: │
│ ensemble por inverse-error weighting (Diebold-Mariano test) │
└────────────────────┬────────────────────────────────────────────────┘
│
┌────────────────────▼────────────────────────────────────────────────┐
│ PASO 7 — FORECAST GENERATION │
│ Genera pronóstico h=30/60/90 días según plan del tenant │
│ Genera intervalos de confianza (bootstrap o analítico según método)│
└────────────────────┬────────────────────────────────────────────────┘
│
┌────────────────────▼────────────────────────────────────────────────┐
│ PASO 8 — POLICY CALCULATION (s, S, Q) │
│ SS = z × σ_LT → s = μ_D × L + SS → EOQ = √(2×D_a×A/h) │
│ Q* = max(EOQ, MOQ); S = s + EOQ │
└────────────────────┬────────────────────────────────────────────────┘
│
┌────────────────────▼────────────────────────────────────────────────┐
│ PASO 9 — ORDER SUGGESTIONS │
│ Para cada SKU donde stock_current < reorder_point: │
│ Genera registro en sm_order_suggestions con cantidad Q* │
└────────────────────┬────────────────────────────────────────────────┘
│
┌────────────────────▼────────────────────────────────────────────────┐
│ PASO 10 — GUARDRAILS (15 reglas de validación) │
│ Ver §6 — bloquea o marca OCs que no pasan validaciones │
└────────────────────┬────────────────────────────────────────────────┘
│
┌────────────────────▼────────────────────────────────────────────────┐
│ PASO 11 — PERSIST + NOTIFY │
│ Escribe sm_forecast_results, actualiza sm_skus (status, params) │
│ Genera sm_purchase_orders para OCs que pasaron guardrails │
│ Publica notificaciones de alerta en Redis │
└─────────────────────────────────────────────────────────────────────┘
OUTPUT: sm_forecast_results + sm_purchase_orders + sm_skus actualizado
3. Clasificación de SKUs: ABC × XYZ × Syntetos-Boylan
3.1 Clasificación ABC (por revenue)
def classify_abc(skus_with_revenue: list[SkuRevenue]) -> dict[str, str]:
"""
Clasifica SKUs por revenue de 90 días usando umbral de Pareto clásico.
A: top 80% del revenue acumulado
B: siguiente 15% (hasta 95%)
C: resto (5% inferior)
"""
total = sum(s.revenue_90d for s in skus_with_revenue)
sorted_skus = sorted(skus_with_revenue, key=lambda x: x.revenue_90d, reverse=True)
cumulative = 0
result = {}
for sku in sorted_skus:
cumulative += sku.revenue_90d
ratio = cumulative / total if total > 0 else 0
if ratio <= 0.80:
result[sku.id] = 'A'
elif ratio <= 0.95:
result[sku.id] = 'B'
else:
result[sku.id] = 'C'
return result
3.2 Clasificación XYZ (por variabilidad de demanda)
def classify_xyz(demand_series: list[float]) -> tuple[str, float]:
"""
X: CV ≤ 0.50 (demanda estable)
Y: 0.50 < CV ≤ 1.00 (demanda variable)
Z: CV > 1.00 (demanda muy variable / errática)
CV_xyz se calcula sobre períodos de tiempo iguales (semanas o meses)
según el horizonte de datos disponible.
"""
if len(demand_series) < 3:
return 'Z', float('inf') # Cold start → más conservador
mean = statistics.mean(demand_series)
if mean == 0:
return 'Z', float('inf')
std = statistics.stdev(demand_series)
cv = std / mean
if cv <= 0.50:
return 'X', cv
elif cv <= 1.00:
return 'Y', cv
else:
return 'Z', cv
3.3 Clasificación Syntetos-Boylan (patrón de demanda)
def classify_syntetos_boylan(demand_series: list[float]) -> tuple[str, float, float]:
"""
Clasifica el patrón de demanda según el cuadrante Syntetos-Boylan:
- ADI (Average Demand Interval): tiempo promedio entre períodos con demanda >0
- CV² (coeficiente de variación al cuadrado de los períodos con demanda >0)
Cuadrantes:
- Smooth: ADI < 1.32 y CV² < 0.49
- Erratic: ADI < 1.32 y CV² ≥ 0.49
- Intermittent: ADI ≥ 1.32 y CV² < 0.49
- Lumpy: ADI ≥ 1.32 y CV² ≥ 0.49
Umbrales de referencia: Syntetos & Boylan (2005), Lengu et al. (2014)
"""
non_zero = [d for d in demand_series if d > 0]
if len(non_zero) < 2:
return 'lumpy', float('inf'), float('inf') # < 2 períodos con demanda → lumpy
# ADI: número total de períodos / períodos con demanda
adi = len(demand_series) / len(non_zero)
# CV² sobre períodos con demanda (no sobre toda la serie)
mean_nonzero = statistics.mean(non_zero)
if mean_nonzero == 0:
return 'lumpy', adi, float('inf')
std_nonzero = statistics.stdev(non_zero) if len(non_zero) > 1 else 0
cv_squared = (std_nonzero / mean_nonzero) ** 2
if adi < 1.32 and cv_squared < 0.49:
pattern = 'smooth'
elif adi < 1.32 and cv_squared >= 0.49:
pattern = 'erratic'
elif adi >= 1.32 and cv_squared < 0.49:
pattern = 'intermittent'
else:
pattern = 'lumpy'
return pattern, adi, cv_squared
4. Los 14 métodos de pronóstico
4.1 Tabla de métodos y condiciones de uso
| # | Método | Librería | Patrón de demanda | Condición de aplicación |
|---|---|---|---|---|
| 1 | SES (Simple Exp. Smoothing) | statsforecast | Smooth | ADI<1.32, CV²<0.49, >30 obs |
| 2 | HW-Additive (Holt-Winters) | statsforecast | Smooth estacional | ≥2 ciclos estacionales completos |
| 3 | HW-Multiplicative | statsforecast | Smooth estacional multiplicativo | ≥2 ciclos, sin ceros en serie |
| 4 | THETA | statsforecast | Smooth/Erratic | Competitivo en M3/M4, fallback robusto |
| 5 | AutoARIMA | statsforecast | Smooth/Erratic | >50 observaciones |
| 6 | AutoETS | statsforecast | Smooth/Erratic | >30 observaciones |
| 7 | Croston Classic | statsforecast | Intermittent | ADI≥1.32, CV²<0.49 |
| 8 | Croston Optimized | statsforecast | Intermittent | ADI≥1.32, CV²<0.49 |
| 9 | CrostonTSB (Teunter-Syntetos-Babai) | statsforecast | Intermittent | ADI≥1.32, más robusto que Croston Classic |
| 10 | ADIDA (Aggregate-Disaggregate Intermittent DA) | statsforecast | Intermittent/Lumpy | Aggregation-disaggregation approach |
| 11 | IMAPA | statsforecast | Lumpy | ADI≥1.32, CV²≥0.49, múltiples frecuencias |
| 12 | Naïve | statsforecast | Fallback universal | Siempre en el conjunto candidato como baseline |
| 13 | Prophet (Facebook) | prophet | Smooth con estacionalidad compleja | ≥2 años de historia, V1.1+ |
| 14 | LightGBM | lightgbm | Erratic/Lumpy con features externas | ≥500 obs, features disponibles, V1.1+ |
MVP (métodos 1-12): Prophet (13) y LightGBM (14) se implementan en V1.1 cuando el volumen de datos acumulados justifica su uso.
4.2 Selección de candidatos por clasificación
| Clasificación ABC+SB | Métodos candidatos | Notas |
|---|---|---|
| A + Smooth | SES, HW-Add, HW-Mult, THETA, AutoETS, Naïve | Priorizar precisión |
| A + Erratic | AutoARIMA, THETA, AutoETS, SES, Naïve | ARIMA por complejidad |
| A + Intermittent | CrostonTSB, Croston Opt, ADIDA, Naïve | TSB > Classic (literatura) |
| A + Lumpy | CrostonTSB, IMAPA, ADIDA, Naïve | IMAPA para alto CV² |
| B/C + Smooth | SES, THETA, Naïve | Menos candidatos → más rápido |
| B/C + Intermittent | CrostonTSB, Naïve | Simplicidad para bajo-valor |
| Cold start (<30 días) | Naïve + 1 método según patrón estimado | Ver §5 cold start |
5. Cold start protocol
El cold start ocurre cuando un SKU tiene <30 días de historial de movimientos de salida. El motor no puede entrenar un método estadístico robusto con menos de ese umbral.
5.1 Reglas de cold start
COLD_START_THRESHOLDS = {
'smooth': 30, # días mínimos de historial
'erratic': 45,
'intermittent': 60, # más historial para estimar ADI correctamente
'lumpy': 90,
'unknown': 90, # sin clasificación → conservador
}
def handle_cold_start(sku: SkuData) -> ForecastResult:
"""
Para SKUs en cold start:
1. Aplica Naïve (demanda promedio histórica disponible)
2. Incrementa safety stock en 50% (mayor incertidumbre)
3. Marca forecast con drift_status='cold_start' (no 'ok')
4. Genera explicación específica de cold start al usuario
5. Recomienda ingresar historial manual si existe
"""
days_available = len(sku.demand_series)
avg_demand = mean(sku.demand_series) if sku.demand_series else 0
cold_start_ss_multiplier = 1.5 # G14 guardrail
return ForecastResult(
method='Naïve',
forecast=[avg_demand] * forecast_horizon,
safety_stock_multiplier=cold_start_ss_multiplier,
drift_status='cold_start',
explanation_key='cold_start',
min_days_needed=COLD_START_THRESHOLDS.get(sku.demand_pattern, 90)
)
6. Política de inventario (s, S, Q)
6.1 Stock de seguridad (SS)
def calculate_safety_stock(
demand_std_lead_time: float,
service_level: float # 0.95, 0.975, etc. según plan del tenant
) -> float:
"""
SS = z × σ_LT
donde:
- z = factor de servicio (z-score normal)
- σ_LT = desviación estándar de la demanda durante el lead time
σ_LT = σ_D × √L (si lead time es constante)
σ_LT = √(L × σ_D² + D² × σ_L²) (si lead time es variable)
"""
from scipy.stats import norm
z = norm.ppf(service_level) # 1.645 para 95%, 1.960 para 97.5%
return z * demand_std_lead_time
6.2 Niveles de servicio por plan
| Plan | Service level | z-score |
|---|---|---|
| Starter | 95% | 1.645 |
| Growth | 95% | 1.645 |
| Pro | 97.5% | 1.960 |
| Enterprise | Configurable (90%–99%) | Variable |
El usuario no ve el z-score — ve un slider "Nivel de protección contra stockouts: bajo / medio / alto / muy alto" que mapea internamente a los niveles estadísticos.
6.3 Punto de reorden (s) y máximo (S)
def calculate_reorder_point(
mean_demand_per_day: float,
lead_time_days: float,
safety_stock: float
) -> float:
"""s = μ_D × L + SS"""
return mean_demand_per_day * lead_time_days + safety_stock
def calculate_max_stock(reorder_point: float, eoq: float) -> float:
"""S = s + EOQ"""
return reorder_point + eoq
6.4 Cantidad económica de pedido (EOQ)
def calculate_eoq(
annual_demand: float,
ordering_cost: float, # default: 20 USD si no configurado por tenant
holding_cost_rate: float, # default: 0.25 (25% del costo unitario anual)
unit_cost: float,
moq: float # minimum order quantity del proveedor
) -> float:
"""
EOQ = √(2 × D_a × A / h)
Q* = max(EOQ, MOQ)
Referencia: Harris (1913), Wilson (1934)
"""
if annual_demand <= 0 or unit_cost <= 0:
return moq
h = holding_cost_rate * unit_cost
eoq = math.sqrt(2 * annual_demand * ordering_cost / h)
return max(eoq, moq)
7. Backtest con expanding window
7.1 Metodología
def expanding_window_cv(
demand_series: list[float],
method: ForecastMethod,
horizon: int = 30,
min_train_periods: int = 30,
n_folds: int = 3
) -> float:
"""
Cross-validation con expanding window:
Fold 1: train[0:min_train_periods], test[min_train_periods:min_train+horizon]
Fold 2: train[0:min_train_periods+step], test[...]
Fold 3: train[0:...], test[...]
Retorna MASE promedio sobre todos los folds.
MASE = MAE_forecast / MAE_naive
Un MASE <1 significa que el método supera al Naïve.
Referencia: Hyndman & Koehler (2006)
"""
n = len(demand_series)
if n < min_train_periods + horizon:
return float('inf') # No hay datos suficientes → método no aplicable
step = (n - min_train_periods - horizon) // n_folds
maes = []
for fold in range(n_folds):
train_end = min_train_periods + fold * step
test_start = train_end
test_end = test_start + horizon
if test_end > n:
break
train = demand_series[:train_end]
test = demand_series[test_start:test_end]
forecast = method.fit(train).predict(horizon)
mae = mean_absolute_error(test, forecast[:len(test)])
# Calcular MAE Naïve (benchmark)
naive_forecast = [train[-1]] * len(test)
mae_naive = mean_absolute_error(test, naive_forecast)
mase = mae / mae_naive if mae_naive > 0 else float('inf')
maes.append(mase)
return statistics.mean(maes) if maes else float('inf')
7.2 Ensemble por inverse-error weighting
def ensemble_forecast(
method_results: list[MethodResult],
threshold: float = 0.05 # 5% umbral de diferencia
) -> ForecastResult:
"""
Si el mejor método supera al segundo por >5% en MASE → usar solo el mejor.
Si la diferencia es ≤5% → ensemble con inverse-error weighting.
Peso del método i = (1/MASE_i) / Σ(1/MASE_j)
Referencia: Montero-Manso et al. (2020), M4 competition winner ensemble
"""
sorted_methods = sorted(method_results, key=lambda m: m.mase)
best = sorted_methods[0]
second = sorted_methods[1] if len(sorted_methods) > 1 else None
if second is None or (second.mase - best.mase) / best.mase > threshold:
return best.forecast_result
# Ensemble de los top-N métodos dentro del 5%
top_methods = [m for m in sorted_methods if (m.mase - best.mase) / best.mase <= threshold]
weights = [1.0 / m.mase for m in top_methods]
total_weight = sum(weights)
normalized_weights = [w / total_weight for w in weights]
ensemble_values = []
for t in range(len(best.forecast_result.forecast)):
weighted_val = sum(
w * m.forecast_result.forecast[t]
for w, m in zip(normalized_weights, top_methods)
)
ensemble_values.append(weighted_val)
return ForecastResult(
method=f"Ensemble({'+'.join(m.name for m in top_methods)})",
forecast=ensemble_values,
...
)
8. Drift detection
El motor detecta cuando la demanda real diverge significativamente del pronóstico del período anterior, lo que indica un cambio estructural en el patrón de demanda.
def calculate_drift(
actual_demand_7d: list[float],
forecasted_demand_7d: list[float],
backtest_mae: float
) -> tuple[float, str]:
"""
drift_ratio = MAE_actual_7d / MAE_backtest
ok: drift_ratio ≤ 1.5 (sostenido 3 semanas)
alert: drift_ratio > 1.5 (sostenido 3 semanas) → notificación al usuario
critical: drift_ratio > 2.5 (sostenido 1 semana) → re-run del motor + notificación
Un drift_ratio = 1.0 significa que el motor tiene el mismo error que en backtest.
Un drift_ratio = 2.0 significa que el error actual es el doble del esperado.
"""
actual_mae = mean_absolute_error(actual_demand_7d, forecasted_demand_7d)
drift_ratio = actual_mae / backtest_mae if backtest_mae > 0 else float('inf')
if drift_ratio > 2.5:
status = 'critical'
elif drift_ratio > 1.5:
status = 'alert'
else:
status = 'ok'
return drift_ratio, status
Cuando drift_status = 'critical', el motor lanza automáticamente una nueva ejecución del pipeline para ese SKU antes del próximo ciclo programado.
9. Los 15 Guardrails
Los guardrails son reglas de validación que se aplican después del forecast y la política, antes de generar las OCs. Protegen al usuario de OCs erróneas por datos sucios o condiciones extremas.
| # | Guardrail | Condición | Acción |
|---|---|---|---|
| G1 | OC masiva — requiere revisión | cantidad_oc > 3× media histórica de OCs | Status 'requires_review', no bloquea |
| G2 | OC masiva — bloqueo | cantidad_oc > 5× media histórica de OCs | Status 'blocked', notificación urgente |
| G3 | Stock negativo imposible | stock_current < 0 después de movimiento | Log error + flag de auditoría |
| G4 | Costo total OC excede límite de crédito | total_oc > límite_crédito_proveedor configurado | Warning, no bloquea |
| G5 | OC por debajo del MOQ | cantidad_oc < moq del proveedor | Ajuste automático a MOQ con explicación |
| G6 | Lead time desconocido | lead_time_days es NULL | Usa lead_time default del tenant + warning |
| G7 | Historial insuficiente (cold start) | días_historia < umbral por patrón | Aplica SS multiplier 1.5× + marca cold_start |
| G8 | Outlier en demanda reciente | demanda_últimos_7d > 5× media histórica | Flag de anomalía, no excluye de forecast |
| G9 | Proveedor sin contacto registrado | supplier.contact_email IS NULL | Warning: "Proveedor sin email de contacto" |
| G10 | SKU sin precio de costo | unit_cost IS NULL | EOQ usa costo estimado por categoría, flag |
| G11 | OC duplicada reciente | OC aprobada para mismo SKU en últimos L días | Sugiere consolidar, no duplica |
| G12 | Bug EOQ — OC infinita | eoq es NaN o Inf | Usa MOQ como cantidad fallback, log error |
| G13 | Bug EOQ — OC cero o negativa | eoq ≤ 0 | Usa MOQ como cantidad fallback, log error |
| G14 | Cold start SS insuficiente | cold_start=True y SS_multiplier < 1.5 | Fuerza SS_multiplier = 1.5 |
| G15 | Forecast negativo | cualquier valor del forecast < 0 | Clamp a 0, log advertencia |
Los guardrails G12 y G13 son los más críticos (bugs documentados en wiki/10-hallucinations-detectadas.md como riesgo R12). Deben tener tests unitarios con 100% de cobertura.
10. Generación de explicaciones en español (Jinja2)
Las explicaciones no usan LLMs — son templates Jinja2 con lógica condicional. Esto garantiza: - Determinismo (la misma clasificación siempre produce la misma explicación) - Sin costo de inferencia en el path crítico - Sin alucinaciones en las explicaciones técnicas
# Ejemplo de template Jinja2 para patrón intermittent
TEMPLATE_INTERMITTENT = """
El motor seleccionó **{{ method }}** para este SKU porque su demanda es
**intermitente** — es decir, tiene períodos regulares sin ventas
(intervalo promedio entre pedidos: {{ adi|round(1) }} períodos).
{{ method }} está diseñado específicamente para este tipo de demanda y
supera al método anterior en {{ improvement_pct|round(0) }}% de precisión.
**Política de inventario resultante:**
- Stock de seguridad: {{ safety_stock|int }} {{ unit }}
(protege contra variabilidad durante {{ lead_time_days }} días de lead time)
- Punto de reorden: {{ reorder_point|int }} {{ unit }}
(cuando el stock llegue a este nivel, es momento de ordenar)
- Cantidad de pedido sugerida: {{ eoq|int }} {{ unit }}
(cantidad óptima según balance entre costo de ordenar y costo de mantener stock)
"""
Templates disponibles para cada patrón: smooth, erratic, intermittent, lumpy, cold_start, ensemble, drift_alert, drift_critical.
11. Escalabilidad y rendimiento
11.1 Capacidad en Hostinger KVM2
| Escenario | SKUs totales | Tiempo de run | Acción si supera |
|---|---|---|---|
| MVP (1–10 tenants) | <10,000 | 10–20 min | Sin acción |
| Early growth (10–50 tenants) | <50,000 | 30–60 min | Sin acción |
| Scale (50–200 tenants) | <100,000 | 90–180 min | Alerta si >90 min, migrar worker |
| Threshold KVM2 | ~100,000 | ~4–6h | Migrar a worker KVM4 |
11.2 Paralelismo con Celery
# Para tenants grandes (>5,000 SKUs), dividir en chunks
# y procesar en paralelo con Celery group
from celery import group
def run_forecast_parallel(tenant_id: UUID, sku_ids: list[UUID], chunk_size: int = 500):
chunks = [sku_ids[i:i+chunk_size] for i in range(0, len(sku_ids), chunk_size)]
job = group(
forecast_chunk.s(tenant_id, chunk) for chunk in chunks
)
result = job.apply_async()
return result
11.3 Caching de resultados del motor
El forecast no cambia entre runs del mismo día (salvo re-run por drift crítico). Los resultados se cachean en Redis con TTL de 23h para el endpoint GET /forecast/skus/{sku_id}, reduciendo la carga de PostgreSQL en los hits del frontend.
12. Tests del motor
12.1 Fixtures de referencia
Todas las series temporales de test deben tener resultado calculado a mano (o con R) para validar que statsforecast produce el resultado correcto:
# tests/test_motor/fixtures.py
SMOOTH_SERIES = [10, 12, 11, 13, 10, 12, 14, 11, 13, 12] # CV ~0.1 → X, smooth
INTERMITTENT_SERIES = [0, 0, 5, 0, 0, 0, 8, 0, 0, 3, 0, 0] # ADI=4 → intermittent
LUMPY_SERIES = [0, 0, 25, 0, 0, 0, 3, 0, 0, 0, 40, 0] # ADI>1.32, CV²>0.49 → lumpy
COLD_START_SERIES = [10, 12, 8] # < 30 días → cold start
EXPECTED_ABC = {
'sku_high_revenue': 'A',
'sku_mid_revenue': 'B',
'sku_low_revenue': 'C',
}
EXPECTED_PATTERN = {
'smooth': ('smooth', 'X'),
'intermittent': ('intermittent', 'Z'),
}
12.2 Tests de invariantes (Hypothesis)
from hypothesis import given, strategies as st
@given(
demand=st.lists(st.floats(min_value=0, max_value=1000), min_size=30),
unit_cost=st.floats(min_value=0.01, max_value=10000),
lead_time=st.integers(min_value=1, max_value=180)
)
def test_safety_stock_never_negative(demand, unit_cost, lead_time):
"""G15: El stock de seguridad nunca puede ser negativo."""
result = calculate_safety_stock(demand, unit_cost, lead_time, service_level=0.95)
assert result >= 0
@given(
demand=st.lists(st.floats(min_value=0, max_value=1000), min_size=1),
)
def test_eoq_never_nan_or_infinite(demand):
"""G12+G13: El EOQ nunca puede ser NaN o infinito."""
result = calculate_eoq(sum(demand), ordering_cost=20, holding_cost_rate=0.25,
unit_cost=10, moq=1)
assert not math.isnan(result)
assert not math.isinf(result)
assert result > 0
Ver también: Doc 07 (SRD Backend — cómo el worker Celery ejecuta este motor) · Doc 08 (Modelo de Datos — tablas donde persiste los resultados) · Doc 06 (SRD Frontend — cómo presenta los resultados al usuario) · wiki/05-motor-estadistico.md (especificación completa con referencias académicas)