Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import numpy as np
- import pandas as pd
- import statsmodels.api as sm
- from scipy.stats import t as tdist, f as fdist, spearmanr
- import matplotlib.pyplot as plt
- def print_table(df: pd.DataFrame, title: str, floatfmt="{:,.6f}".format):
- print("=" * (len(title) + 2))
- print(title)
- print("=" * (len(title) + 2))
- df_print = df.copy()
- for c in df_print.columns:
- if np.issubdtype(df_print[c].dtype, np.number):
- df_print[c] = df_print[c].map(lambda v: floatfmt(v))
- print(df_print.to_string(index=False))
- print()
- def _add_const(x):
- return sm.add_constant(np.asarray(x, float))
- def table1_base(x, y):
- x = np.asarray(x, float); y = np.asarray(y, float)
- df = pd.DataFrame({
- "i": np.arange(1, len(x)+1),
- "x": x, "y": y,
- "x^2": x**2, "x*y": x*y, "y^2": y**2
- })
- print_table(df, "Таблиця 1. Базові розрахунки")
- return df
- def fit_ols(x, y):
- X = _add_const(x)
- model = sm.OLS(y, X).fit()
- a0, a1 = model.params
- se0, se1 = model.bse
- t0, t1 = model.tvalues
- p0, p1 = model.pvalues
- r2 = model.rsquared
- rss = np.sum(model.resid**2)
- df = int(model.df_resid)
- df_out = pd.DataFrame([{
- "a0": a0, "a1": a1, "SE(a0)": se0, "SE(a1)": se1,
- "t(a0)": t0, "p(a0)": p0, "t(a1)": t1, "p(a1)": p1,
- "R2": r2, "RSS": rss, "df": df
- }])
- print_table(df_out, "OLS: параметри та статистики")
- yhat = model.fittedvalues
- u = model.resid
- df2 = pd.DataFrame({
- "i": np.arange(1, len(x)+1),
- "x": x, "y": y, "ŷ": yhat, "u": u, "u^2": u**2, "|u|": np.abs(u)
- })
- print_table(df2, "Таблиця 2. Залишки та прогнози (OLS)")
- return model, df2
- def var_bins(x, vals, nbins=3):
- order = np.argsort(x)
- x_sorted = np.asarray(x)[order]
- v_sorted = np.asarray(vals)[order]
- groups = np.array_split(np.arange(len(x)), nbins)
- rows = []
- for b, idx in enumerate(groups, start=1):
- xx = x_sorted[idx]
- vv = v_sorted[idx]
- rows.append({"bin": b, "x_min": xx.min(), "x_max": xx.max(),
- "n": len(idx), "Var(u)": vv.var(ddof=1)})
- return pd.DataFrame(rows)
- def show_bins_ols(x, u):
- dfbins = var_bins(x, u, 3)
- print_table(dfbins, "Таблиця 3. Розкид Var(u) у 3 біннах x (OLS)")
- return dfbins
- def spearman_test(x, z, alpha=0.05, label="|u|"):
- rho, p = spearmanr(x, z)
- n = len(x)
- if abs(rho) < 1.0:
- t_stat = rho * np.sqrt((n - 2) / (1 - rho**2))
- else:
- t_stat = np.sign(rho) * np.inf
- tcrit = tdist.ppf(1 - alpha/2, df=n-2)
- out = pd.DataFrame([{
- "rho": rho, "t_stat": t_stat, "t_crit(α/2)": tcrit,
- "p_value": p, "reject_H0": bool(p < alpha)
- }])
- print_table(
- pd.DataFrame({
- "i": np.arange(1, n+1),
- "x": x, f"|{label}|": z,
- "rank_x": pd.Series(x).rank(method="average"),
- f"rank_|{label}|": pd.Series(z).rank(method="average"),
- "(Δrank)^2": (pd.Series(x).rank(method="average")
- - pd.Series(z).rank(method="average"))**2
- }),
- f"Таблиця 4. Спірмен — ранги x та |{label}|"
- )
- print_table(out, "Підсумок тесту Спірмена")
- return rho, p
- def gq_groups(x, y, m=4, c=2):
- order = np.argsort(x)
- x_sorted, y_sorted = np.asarray(x)[order], np.asarray(y)[order]
- left_idx = np.arange(0, m)
- right_idx = np.arange(len(x_sorted)-m, len(x_sorted))
- mid_cut = slice(m, len(x_sorted)-m)
- if c > 0:
- right_idx = np.arange(len(x_sorted)-m, len(x_sorted))
- Xl = _add_const(x_sorted[left_idx]); yl = y_sorted[left_idx]
- Xr = _add_const(x_sorted[right_idx]); yr = y_sorted[right_idx]
- ols_l = sm.OLS(yl, Xl).fit()
- ols_r = sm.OLS(yr, Xr).fit()
- left_tbl = pd.DataFrame({
- "i": order[left_idx] + 1,
- "x": x_sorted[left_idx],
- "y": yl,
- "ŷ(group)": ols_l.fittedvalues,
- "u(group)": ols_l.resid,
- "u^2(group)": ols_l.resid**2
- })
- right_tbl = pd.DataFrame({
- "i": order[right_idx] + 1,
- "x": x_sorted[right_idx],
- "y": yr,
- "ŷ(group)": ols_r.fittedvalues,
- "u(group)": ols_r.resid,
- "u^2(group)": ols_r.resid**2
- })
- return left_tbl.sort_values("i"), right_tbl.sort_values("i"), ols_l, ols_r
- def gq_summary_compact(left_fit, right_fit, m, alpha=0.05):
- var_left = left_fit.resid.var(ddof=1)
- var_right = right_fit.resid.var(ddof=1)
- F = var_left / var_right
- df_m1 = m - 1
- Fcrit = fdist.ppf(1 - alpha, df_m1, df_m1)
- reject = bool(F > Fcrit)
- out = pd.DataFrame([{
- "m": m, "c": 2, "df_m1": df_m1,
- "RSS_left": np.sum(left_fit.resid**2),
- "RSS_right": np.sum(right_fit.resid**2),
- "F_stat": F, "Fcrit_df_m1(one-sided)": Fcrit,
- "reject_H0": reject
- }])
- return out
- def run_gq(x, y, m=4, c=2, alpha=0.05, prefix=""):
- left_tbl, right_tbl, ols_l, ols_r = gq_groups(x, y, m, c)
- print_table(left_tbl, f"Таблиця 5. Група LEFT для тесту ГК (за x↑)")
- print_table(right_tbl, f"Таблиця 6. Група RIGHT для тесту ГК (за x↑)")
- compact = gq_summary_compact(ols_l, ols_r, m, alpha)
- print_table(compact, "Зведена таблиця тесту Голдфелда–Квандта")
- return compact
- def glejser_table(x, yhat, uabs, deltas=(1,2,3,0.5,1/3), alpha=0.05, label="|u|"):
- rows = []
- long_tbl = pd.DataFrame({
- "i": np.arange(1, len(x)+1),
- "x": x, "y": y, "x^2": x**2, "ŷ": yhat, "|u|": uabs, "x*|u|": x*uabs
- })
- for delta in deltas:
- X = _add_const(x**delta)
- mod = sm.OLS(uabs, X).fit()
- b0, b1 = mod.params
- se_b1 = mod.bse[1]
- t_ratio = abs(b1) / se_b1 if se_b1 > 0 else np.inf
- tcrit = tdist.ppf(1 - alpha/2, df=len(x)-2)
- pval = mod.pvalues[1]
- rows.append({
- "δ": delta, "b0": b0, "b1": b1, "SE(b1)": se_b1,
- "|b1|/SE(b1)": t_ratio, "t_crit(α/2)": tcrit,
- "p-value": pval, "Sε^2": np.mean(mod.resid**2),
- "decision": "ВІДХИЛЯЄМО H0" if pval < alpha else "Приймаємо H0"
- })
- long_tbl[f"û({label})_δ={delta}"] = mod.fittedvalues
- long_tbl[f"({label}-û)_δ={delta}^2"] = (uabs - mod.fittedvalues)**2
- short_tbl = pd.DataFrame(rows)
- print_table(long_tbl, f"Таблиця 7а. Тест Глейзера — покрокова таблиця")
- print_table(short_tbl, "Таблиця 7b. Тест Глейзера — параметри й рішення по δ")
- return short_tbl, long_tbl
- def choose_delta_for_weights(glejser_short, alpha=0.05):
- ok = glejser_short[glejser_short["p-value"] >= alpha]
- if len(ok) == 0:
- best = glejser_short.sort_values("Sε^2").iloc[0]
- else:
- best = ok.iloc[ok["b1"].abs().argmin()]
- return float(best["δ"]), best
- def wls_fit_with_weights(x, y, delta, glejser_short):
- row = glejser_short.loc[glejser_short["δ"] == delta].iloc[0]
- b0, b1 = row["b0"], row["b1"]
- sigma_hat = b0 + b1 * (x ** delta)
- mn = sigma_hat.min()
- if mn <= 0:
- sigma_hat = sigma_hat + (abs(mn) + 1e-8)
- w = 1.0 / (sigma_hat ** 2)
- model = sm.WLS(y, _add_const(x), weights=w).fit()
- df_out = pd.DataFrame([{
- "a0": model.params[0], "a1": model.params[1],
- "SE(a0)": model.bse[0], "SE(a1)": model.bse[1],
- "t(a0)": model.tvalues[0], "p(a0)": model.pvalues[0],
- "t(a1)": model.tvalues[1], "p(a1)": model.pvalues[1],
- "R2": model.rsquared, "RSS": np.sum(model.resid**2), "df": int(model.df_resid)
- }])
- print_table(df_out, "WLS: параметри та статистики (після усунення)")
- comp = pd.DataFrame({
- "model": ["OLS", "WLS"],
- "a0": [np.nan, np.nan],
- "a1": [np.nan, np.nan],
- "SE(a0)": [np.nan, np.nan],
- "SE(a1)": [np.nan, np.nan],
- "t(a0)": [np.nan, np.nan],
- "p(a0)": [np.nan, np.nan],
- "t(a1)": [np.nan, np.nan],
- "p(a1)": [np.nan, np.nan],
- "R2": [np.nan, np.nan],
- "RSS": [np.nan, np.nan],
- "df": [np.nan, np.nan],
- })
- return model, sigma_hat
- def compare_ols_wls(ols_model, wls_model):
- comp = pd.DataFrame({
- "model": ["OLS", "WLS"],
- "a0": [ols_model.params[0], wls_model.params[0]],
- "a1": [ols_model.params[1], wls_model.params[1]],
- "SE(a0)": [ols_model.bse[0], wls_model.bse[0]],
- "SE(a1)": [ols_model.bse[1], wls_model.bse[1]],
- "t(a0)": [ols_model.tvalues[0], wls_model.tvalues[0]],
- "p(a0)": [ols_model.pvalues[0], wls_model.pvalues[0]],
- "t(a1)": [ols_model.tvalues[1], wls_model.tvalues[1]],
- "p(a1)": [ols_model.pvalues[1], wls_model.pvalues[1]],
- "R2": [ols_model.rsquared, wls_model.rsquared],
- "RSS": [np.sum(ols_model.resid**2), np.sum(wls_model.resid**2)],
- "df": [int(ols_model.df_resid), int(wls_model.df_resid)]
- })
- print_table(comp, "Таблиця 8. Порівняння OLS vs WLS")
- return comp
- def standardized_residuals(y, yhat, sigma_hat):
- u = y - yhat
- rstar = u / sigma_hat
- return u, rstar
- def plot_diagnostics_ols(x, y, yhat, u, nbins=3):
- fig, axes = plt.subplots(2, 2, figsize=(10, 8))
- axes = axes.ravel()
- axes[0].scatter(x, u)
- axes[0].axhline(0, linewidth=1)
- axes[0].set_title("OLS: u vs x"); axes[0].set_xlabel("x"); axes[0].set_ylabel("u")
- axes[1].scatter(yhat, u)
- axes[1].axhline(0, linewidth=1)
- axes[1].set_title("OLS: u vs ŷ"); axes[1].set_xlabel("ŷ"); axes[1].set_ylabel("u")
- axes[2].scatter(x, np.abs(u))
- axes[2].set_title("OLS: |u| vs x"); axes[2].set_xlabel("x"); axes[2].set_ylabel("|u|")
- bins = var_bins(x, u, nbins)
- axes[3].bar(bins["bin"], bins["Var(u)"])
- axes[3].set_title("OLS: Var(u) у біннах x"); axes[3].set_xlabel("бін"); axes[3].set_ylabel("Var")
- fig.suptitle("Візуальна діагностика — OLS")
- plt.tight_layout()
- plt.show()
- def plot_diagnostics_wls(x, yhat_w, rstar, nbins=3):
- fig, axes = plt.subplots(2, 2, figsize=(10, 8))
- axes = axes.ravel()
- axes[0].scatter(x, rstar)
- axes[0].axhline(0, linewidth=1)
- axes[0].set_title("Після WLS (r*): r* vs x"); axes[0].set_xlabel("x"); axes[0].set_ylabel("r*")
- axes[1].scatter(yhat_w, rstar)
- axes[1].axhline(0, linewidth=1)
- axes[1].set_title("Після WLS (r*): r* vs ŷ"); axes[1].set_xlabel("ŷ"); axes[1].set_ylabel("r*")
- axes[2].scatter(x, np.abs(rstar))
- axes[2].set_title("Після WLS (r*): |r*| vs x"); axes[2].set_xlabel("x"); axes[2].set_ylabel("|r*|")
- bins = var_bins(x, rstar, nbins)
- axes[3].bar(bins["bin"], bins["Var(u)"])
- axes[3].set_title("Після WLS (r*): Var(r*) у біннах x"); axes[3].set_xlabel("бін"); axes[3].set_ylabel("Var")
- fig.suptitle("Візуальна діагностика — Після WLS (r*)")
- plt.tight_layout()
- plt.show()
- def run_pipeline(x, y, alpha=0.05, deltas=(1,2,3,0.5,1/3), m=4, c=2, make_plots=True):
- x = np.asarray(x, float); y = np.asarray(y, float)
- table1_base(x, y)
- ols_model, tbl2 = fit_ols(x, y)
- yhat = ols_model.fittedvalues
- u = ols_model.resid
- show_bins_ols(x, u)
- spearman_test(x, np.abs(u), alpha, label="u")
- gq_compact_ols = run_gq(x, y, m=m, c=c, alpha=alpha)
- glejser_short, glejser_long = glejser_table(x, yhat, np.abs(u), deltas, alpha)
- delta_best, row_best = choose_delta_for_weights(glejser_short, alpha)
- wls_model, sigma_hat = wls_fit_with_weights(x, y, delta_best, glejser_short)
- compare_ols_wls(ols_model, wls_model)
- yhat_w = wls_model.fittedvalues
- u_w, rstar = standardized_residuals(y, yhat_w, sigma_hat)
- df_r = pd.DataFrame({
- "i": np.arange(1, len(x)+1), "x": x, "y": y,
- "ŷ(WLS)": yhat_w, "u(WLS)": u_w, "r*(WLS)": rstar,
- "r*^2(WLS)": rstar**2, "|r*|(WLS)": np.abs(rstar)
- })
- print_table(df_r, "Таблиця 2*. Прогнози та залишки (WLS) з r* = u/σ̂")
- df_bins_r = var_bins(x, rstar, 3).rename(columns={"Var(u)": "Var(r*)"})
- print_table(df_bins_r, "Таблиця 3*. Розкид Var(r*) у 3 біннах x (після WLS)")
- spearman_rho_r, p_r = spearman_test(x, np.abs(rstar), alpha, label="r*")
- left_tbl_r, right_tbl_r, fit_l_r, fit_r_r = gq_groups(x, rstar, m, c)
- left_r = pd.DataFrame({"i": left_tbl_r["i"], "x": left_tbl_r["x"], "r*(WLS)": fit_l_r.resid + fit_l_r.fittedvalues, "r*(WLS)^2": (fit_l_r.resid + fit_l_r.fittedvalues)**2})
- right_r = pd.DataFrame({"i": right_tbl_r["i"], "x": right_tbl_r["x"], "r*(WLS)": fit_r_r.resid + fit_r_r.fittedvalues, "r*(WLS)^2": (fit_r_r.resid + fit_r_r.fittedvalues)**2})
- print_table(left_r, "Таблиця 5*. LEFT (стандартизовані r*) для тесту ГК")
- print_table(right_r, "Таблиця 6*. RIGHT (стандартизовані r*) для тесту ГК")
- gq_compact_r = gq_summary_compact(fit_l_r, fit_r_r, m, alpha)
- print_table(gq_compact_r, "Зведення тесту Голдфелда–Квандта на r* (F, df=m−1)")
- glejser_r_short, glejser_r_long = glejser_table(
- x, yhat_w, np.abs(rstar), deltas, alpha, label="r*"
- )
- homosk_after_wls = (
- (p_r >= alpha) and
- (not bool(gq_compact_r["reject_H0"].iloc[0])) and
- all(glejser_r_short["p-value"] >= alpha)
- )
- summary_line = f"ПІДСУМОК: Гомоскедастичність після WLS (за r*): {'ТАК' if homosk_after_wls else 'НІ'}; модель ваг за Глейзером: δ={delta_best:g}"
- print(summary_line)
- if make_plots:
- plot_diagnostics_ols(x, y, yhat, u, nbins=3)
- plot_diagnostics_wls(x, yhat_w, rstar, nbins=3)
- return {
- "ols_model": ols_model,
- "wls_model": wls_model,
- "sigma_hat": sigma_hat,
- "glejser_ols": glejser_short,
- "glejser_wls_r": glejser_r_short,
- "gq_ols_compact": gq_compact_ols,
- "gq_r_compact": gq_compact_r,
- "homosked_after_wls": homosk_after_wls,
- "delta_best": delta_best
- }
- if __name__ == "__main__":
- x = [0.3, 0.6, 1.1, 0.2, 0.8, 1.6, 1.5, 0.9, 0.9, 0.8]
- y = [2.8, 4.3, 6.3, 2.9, 5.3, 8.3, 8.0, 5.7, 5.4, 5.2]
- run_pipeline(x, y, alpha=0.05, deltas=(1,2,3,0.5,1/3), m=4, c=2, make_plots=True)
Advertisement
Add Comment
Please, Sign In to add comment