Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """
- Fetching World Bank GDP PPP data...
- Got 206 countries
- Fetching OECD Disposable Income data...
- Got 43 countries
- Countries with both datasets: 41
- Total observations: 1129, years 1990-2024
- ==================================================
- CORRELATION ANALYSIS
- ==================================================
- 1. Cross-section (2024): r = 0.7531, n = 25
- 2. Pooled panel: r = 0.8885, n = 1129
- 3. Within-country mean: r = 0.9891, n = 40 countries
- 4. Country averages: r = 0.8748, n = 41
- """
- import urllib.request
- import json
- import csv
- import io
- from collections import defaultdict
- from math import sqrt
- def fetch_worldbank_gdp_ppp():
- """Fetch GDP per capita PPP from World Bank API"""
- url = "https://api.worldbank.org/v2/country/all/indicator/NY.GDP.PCAP.PP.CD?format=json&per_page=20000&date=1990:2024"
- print("Fetching World Bank GDP PPP data...")
- req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
- with urllib.request.urlopen(req, timeout=30) as response:
- data = json.loads(response.read().decode())
- if not data or len(data) < 2 or data[1] is None:
- raise RuntimeError("World Bank API returned unexpected structure")
- aggregates = {
- 'EU', 'XC', 'OE', 'ZJ', 'XD', 'XO', 'XM', 'XN', 'XP', 'XT',
- 'ZG', 'ZF', 'Z4', 'Z7', '4E', '7E', 'S1', 'S2', 'S3', 'S4',
- '8S', 'B8', 'F1', 'XE', 'XL', 'ZQ', 'XQ', 'XU', '1W', '1A',
- 'ARB', 'CSS', 'CEB', 'EAP', 'EAS', 'ECA', 'ECS', 'EMU', 'EUU',
- 'FCS', 'HIC', 'HPC', 'IBD', 'IBT', 'IDA', 'IDB', 'IDX', 'LAC',
- 'LCN', 'LDC', 'LIC', 'LMC', 'LMY', 'LTE', 'MEA', 'MIC', 'MNA',
- 'NAC', 'OED', 'OSS', 'PRE', 'PSS', 'PST', 'SAS', 'SSA', 'SSF',
- 'SST', 'TEA', 'TEC', 'TLA', 'TMN', 'TSA', 'TSS', 'UMC', 'WLD'
- }
- gdp = defaultdict(dict)
- for record in data[1]:
- if record['value'] is not None:
- country = record['countryiso3code']
- if country and country not in aggregates:
- gdp[country][int(record['date'])] = record['value']
- print(f" Got {len(gdp)} countries")
- return dict(gdp)
- def fetch_oecd_disposable_income():
- """Fetch Gross Disposable Income per capita (USD PPP) from OECD"""
- url = "https://sdmx.oecd.org/public/rest/data/OECD.SDD.NAD,DSD_NAAG@DF_NAAG_V,1.0/A..B6GS1M_POP.USD_PPP_PS.?dimensionAtObservation=AllDimensions"
- headers = {"Accept": "application/vnd.sdmx.data+csv;version=2.0.0"}
- print("Fetching OECD Disposable Income data...")
- req = urllib.request.Request(url, headers=headers)
- with urllib.request.urlopen(req, timeout=30) as response:
- content = response.read().decode('utf-8')
- reader = csv.DictReader(io.StringIO(content))
- income = defaultdict(dict)
- for row in reader:
- country = row.get('REF_AREA', '')
- year = row.get('TIME_PERIOD', '')
- value = row.get('OBS_VALUE', '')
- if country and year and value:
- try:
- income[country][int(year)] = float(value)
- except (ValueError, TypeError):
- pass
- print(f" Got {len(income)} countries")
- return dict(income)
- def pearsonr(x, y):
- n = len(x)
- if n < 3:
- return None
- mx, my = sum(x)/n, sum(y)/n
- sx = sqrt(sum((xi - mx)**2 for xi in x) / (n-1))
- sy = sqrt(sum((yi - my)**2 for yi in y) / (n-1))
- if sx == 0 or sy == 0:
- return None
- return sum((xi - mx) * (yi - my) for xi, yi in zip(x, y)) / ((n-1) * sx * sy)
- def analyze(gdp_data, income_data):
- common_countries = set(gdp_data.keys()) & set(income_data.keys())
- print(f"\nCountries with both datasets: {len(common_countries)}")
- if not common_countries:
- print("ERROR: No matching countries.")
- return
- rows = []
- for country in common_countries:
- common_years = set(gdp_data[country].keys()) & set(income_data[country].keys())
- for year in common_years:
- rows.append({
- 'country': country,
- 'year': year,
- 'gdp': gdp_data[country][year],
- 'income': income_data[country][year]
- })
- years = sorted(set(r['year'] for r in rows))
- print(f"Total observations: {len(rows)}, years {min(years)}-{max(years)}")
- print("\n" + "=" * 50)
- print("CORRELATION ANALYSIS")
- print("=" * 50)
- # 1. Cross-section (latest year)
- latest = max(years)
- latest_rows = [r for r in rows if r['year'] == latest]
- r1 = pearsonr([r['gdp'] for r in latest_rows], [r['income'] for r in latest_rows])
- print(f"\n1. Cross-section ({latest}): r = {r1:.4f}, n = {len(latest_rows)}")
- # 2. Pooled panel
- r2 = pearsonr([r['gdp'] for r in rows], [r['income'] for r in rows])
- print(f"2. Pooled panel: r = {r2:.4f}, n = {len(rows)}")
- # 3. Within-country
- country_rs = []
- for country in common_countries:
- c_rows = [r for r in rows if r['country'] == country]
- if len(c_rows) >= 5:
- rc = pearsonr([r['gdp'] for r in c_rows], [r['income'] for r in c_rows])
- if rc is not None:
- country_rs.append((country, rc))
- country_rs.sort(key=lambda x: x[1], reverse=True)
- mean_r = sum(r for _, r in country_rs) / len(country_rs)
- print(f"3. Within-country mean: r = {mean_r:.4f}, n = {len(country_rs)} countries")
- # 4. Country averages
- country_avgs = {}
- for country in common_countries:
- c_rows = [r for r in rows if r['country'] == country]
- if c_rows:
- country_avgs[country] = {
- 'gdp': sum(r['gdp'] for r in c_rows) / len(c_rows),
- 'income': sum(r['income'] for r in c_rows) / len(c_rows)
- }
- r4 = pearsonr([v['gdp'] for v in country_avgs.values()],
- [v['income'] for v in country_avgs.values()])
- print(f"4. Country averages: r = {r4:.4f}, n = {len(country_avgs)}")
- if __name__ == "__main__":
- gdp = fetch_worldbank_gdp_ppp()
- income = fetch_oecd_disposable_income()
- analyze(gdp, income)
Advertisement
Add Comment
Please, Sign In to add comment