Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: utf-8 -*-
- """
- 동일 득표쌍(collision) 분석 - 1000회 시뮬레이션 분포 확인
- - 난수 시드를 1씩 늘려가며 총 1000번 돌린 후,
- 노이즈 데이터의 동일쌍 개수 분포(평균, 표준편차, 최소, 최대)를 분석합니다.
- """
- import json, sys, numpy as np
- from collections import defaultdict, Counter
- sys.stdout.reconfigure(encoding='utf-8')
- # 데이터 로드
- recs = json.load(open('data/data_raw.json', encoding='utf-8'))
- print(f'총 레코드: {len(recs)}')
- # ----------------------------------------------------
- # 설정 및 데이터 그룹화
- # ----------------------------------------------------
- ITERATIONS = 1000 # 시뮬레이션 반복 횟수 (1000번)
- rankpairs = [(0, 1), (1, 2), (2, 3)]
- gubuns = ['관내사전투표', '선거일투표']
- # 원본 데이터 그룹화
- groups_original = defaultdict(list)
- for r in recs:
- groups_original[(r['contest'], r['gubun'])].append(r)
- # ----------------------------------------------------
- # 득표수 1회성 노이즈 부여 함수 (시드기반 생성기 주입)
- # ----------------------------------------------------
- def apply_random_noise_with_rng(records, current_rng):
- perturbed_records = []
- for r in records:
- new_r = r.copy()
- new_votes = []
- for v in r['votes']:
- if v is None:
- new_votes.append(None)
- continue
- # 무작위로 +1 또는 -1 선택
- noise = current_rng.choice([-1, 1])
- perturbed_v = v + noise
- if perturbed_v < 0:
- perturbed_v = 0
- new_votes.append(perturbed_v)
- new_r['votes'] = new_votes
- perturbed_records.append(new_r)
- return perturbed_records
- # ----------------------------------------------------
- # 기존 분석 함수들 (동일 유지)
- # ----------------------------------------------------
- def pair_count(values):
- c = Counter(values)
- pairs = sum(n * (n - 1) // 2 for n in c.values() if n >= 2)
- return pairs
- def topk_key(vec, ranks):
- sv = sorted(vec, reverse=True)
- if len(sv) <= max(ranks):
- return None
- return tuple(sv[r] for r in ranks)
- def analyze_observed_pairs(gubun_filter, rankpairs, target_groups):
- out = {}
- for rp in rankpairs:
- pooled_vals = []
- per_contest_pairs = 0
- for (contest, gubun), rs in target_groups.items():
- if gubun != gubun_filter:
- continue
- vals = [topk_key(r['votes'], rp) for r in rs]
- vals = [v for v in vals if v is not None]
- per_contest_pairs += pair_count(vals)
- pooled_vals += vals
- pooled_pairs = pair_count(pooled_vals)
- out[rp] = {'per_contest': per_contest_pairs, 'pooled': pooled_pairs}
- return out
- def full_vector_collisions(gubun_filter, target_groups):
- per_contest = 0
- for (contest, gubun), rs in target_groups.items():
- if gubun != gubun_filter:
- continue
- per_contest += pair_count([tuple(r['votes']) for r in rs])
- return per_contest
- # ----------------------------------------------------
- # 메인 반복 실행부 (시드 1씩 증가)
- # ----------------------------------------------------
- if __name__ == '__main__':
- # 1. 기준이 되는 원본 데이터 결과 먼저 계산
- orig_results = {}
- for gubun in gubuns:
- orig_results[gubun] = {
- 'rankpairs': analyze_observed_pairs(gubun, rankpairs, groups_original),
- 'full_vector': full_vector_collisions(gubun, groups_original)
- }
- # 2. 시뮬레이션 데이터 수집용 딕셔너리 초기화
- # 구조: sim_data[gubun]['per_contest' 또는 'pooled'][rp] = [1000개 결과 리스트]
- sim_data = {}
- for gubun in gubuns:
- sim_data[gubun] = {
- 'per_contest': {rp: [] for rp in rankpairs},
- 'pooled': {rp: [] for rp in rankpairs},
- 'full_vector': []
- }
- print(f'\n🚀 시드 0부터 {ITERATIONS-1}까지 총 {ITERATIONS}회 시뮬레이션 시작...')
- for i in range(ITERATIONS):
- # 시드를 1씩 늘려가며 난수 생성기 생성 (예: 20260603 + i 형태로 기본 시드를 잡거나 그냥 i 사용)
- current_seed = i
- current_rng = np.random.default_rng(current_seed)
- # 노이즈 반영 및 그룹화
- recs_pert = apply_random_noise_with_rng(recs, current_rng)
- groups_perturbed = defaultdict(list)
- for r in recs_pert:
- groups_perturbed[(r['contest'], r['gubun'])].append(r)
- # 각 구분별 결과 기록
- for gubun in gubuns:
- obs_pert = analyze_observed_pairs(gubun, rankpairs, groups_perturbed)
- fv_pert = full_vector_collisions(gubun, groups_perturbed)
- for rp in rankpairs:
- sim_data[gubun]['per_contest'][rp].append(obs_pert[rp]['per_contest'])
- sim_data[gubun]['pooled'][rp].append(obs_pert[rp]['pooled'])
- sim_data[gubun]['full_vector'].append(fv_pert)
- # 100번 돌 때마다 진행 상황 출력
- if (i + 1) % 100 == 0:
- print(f' [진행률] {i + 1}/{ITERATIONS} 완료...')
- # ----------------------------------------------------
- # 3. 통계 결과 출력
- # ----------------------------------------------------
- print('\n' + '=' * 80)
- print(f'📊 총 {ITERATIONS}회 노이즈 시뮬레이션 결과 분포 요약')
- print('=' * 80)
- for gubun in gubuns:
- print(f'\n🟦 [{gubun}]')
- print('-' * 75)
- for rp in rankpairs:
- rp_name = tuple(r+1 for r in rp)
- orig_pc = orig_results[gubun]['rankpairs'][rp]['per_contest']
- orig_pl = orig_results[gubun]['rankpairs'][rp]['pooled']
- pc_list = np.array(sim_data[gubun]['per_contest'][rp])
- pl_list = np.array(sim_data[gubun]['pooled'][rp])
- print(f'📌 순위쌍 {rp_name}:')
- print(f' • 지역구내 동일쌍 -> [원본]: {orig_pc} | [노이즈 분포]: 평균={pc_list.mean():.1f} (최소={pc_list.min()}, 최대={pc_list.max()}, 표준편차={pc_list.std():.2f})')
- print(f' • 전국풀링 동일쌍 -> [원본]: {orig_pl} | [노이즈 분포]: 평균={pl_list.mean():.1f} (최소={pl_list.min()}, 최대={pl_list.max()}, 표준편차={pl_list.std():.2f})')
- print('')
- fv_orig = orig_results[gubun]['full_vector']
- fv_list = np.array(sim_data[gubun]['full_vector'])
- print(f'✨ 전체 득표벡터 완전일치 쌍 (지역구 내):')
- print(f' • [원본]: {fv_orig} 쌍')
- print(f' • [노이즈 분포]: 평균={fv_list.mean():.1f} 쌍 (최소={fv_list.min()}, 최대={fv_list.max()}, 표준편차={fv_list.std():.2f})')
- print('-' * 75)
Advertisement
Add Comment
Please, Sign In to add comment