Guest User

Untitled

a guest
Jun 13th, 2026
9
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.94 KB | None | 0 0
  1. # -*- coding: utf-8 -*-
  2. """
  3. 동일 득표쌍(collision) 분석 - 1000회 시뮬레이션 분포 확인
  4. - 난수 시드를 1씩 늘려가며 총 1000번 돌린 후,
  5. 노이즈 데이터의 동일쌍 개수 분포(평균, 표준편차, 최소, 최대)를 분석합니다.
  6. """
  7. import json, sys, numpy as np
  8. from collections import defaultdict, Counter
  9. sys.stdout.reconfigure(encoding='utf-8')
  10.  
  11. # 데이터 로드
  12. recs = json.load(open('data/data_raw.json', encoding='utf-8'))
  13. print(f'총 레코드: {len(recs)}')
  14.  
  15. # ----------------------------------------------------
  16. # 설정 및 데이터 그룹화
  17. # ----------------------------------------------------
  18. ITERATIONS = 1000 # 시뮬레이션 반복 횟수 (1000번)
  19. rankpairs = [(0, 1), (1, 2), (2, 3)]
  20. gubuns = ['관내사전투표', '선거일투표']
  21.  
  22. # 원본 데이터 그룹화
  23. groups_original = defaultdict(list)
  24. for r in recs:
  25. groups_original[(r['contest'], r['gubun'])].append(r)
  26.  
  27. # ----------------------------------------------------
  28. # 득표수 1회성 노이즈 부여 함수 (시드기반 생성기 주입)
  29. # ----------------------------------------------------
  30. def apply_random_noise_with_rng(records, current_rng):
  31. perturbed_records = []
  32. for r in records:
  33. new_r = r.copy()
  34. new_votes = []
  35. for v in r['votes']:
  36. if v is None:
  37. new_votes.append(None)
  38. continue
  39.  
  40. # 무작위로 +1 또는 -1 선택
  41. noise = current_rng.choice([-1, 1])
  42. perturbed_v = v + noise
  43. if perturbed_v < 0:
  44. perturbed_v = 0
  45. new_votes.append(perturbed_v)
  46.  
  47. new_r['votes'] = new_votes
  48. perturbed_records.append(new_r)
  49. return perturbed_records
  50.  
  51. # ----------------------------------------------------
  52. # 기존 분석 함수들 (동일 유지)
  53. # ----------------------------------------------------
  54. def pair_count(values):
  55. c = Counter(values)
  56. pairs = sum(n * (n - 1) // 2 for n in c.values() if n >= 2)
  57. return pairs
  58.  
  59. def topk_key(vec, ranks):
  60. sv = sorted(vec, reverse=True)
  61. if len(sv) <= max(ranks):
  62. return None
  63. return tuple(sv[r] for r in ranks)
  64.  
  65. def analyze_observed_pairs(gubun_filter, rankpairs, target_groups):
  66. out = {}
  67. for rp in rankpairs:
  68. pooled_vals = []
  69. per_contest_pairs = 0
  70. for (contest, gubun), rs in target_groups.items():
  71. if gubun != gubun_filter:
  72. continue
  73. vals = [topk_key(r['votes'], rp) for r in rs]
  74. vals = [v for v in vals if v is not None]
  75. per_contest_pairs += pair_count(vals)
  76. pooled_vals += vals
  77. pooled_pairs = pair_count(pooled_vals)
  78. out[rp] = {'per_contest': per_contest_pairs, 'pooled': pooled_pairs}
  79. return out
  80.  
  81. def full_vector_collisions(gubun_filter, target_groups):
  82. per_contest = 0
  83. for (contest, gubun), rs in target_groups.items():
  84. if gubun != gubun_filter:
  85. continue
  86.  
  87. per_contest += pair_count([tuple(r['votes']) for r in rs])
  88. return per_contest
  89.  
  90. # ----------------------------------------------------
  91. # 메인 반복 실행부 (시드 1씩 증가)
  92. # ----------------------------------------------------
  93. if __name__ == '__main__':
  94. # 1. 기준이 되는 원본 데이터 결과 먼저 계산
  95. orig_results = {}
  96. for gubun in gubuns:
  97. orig_results[gubun] = {
  98. 'rankpairs': analyze_observed_pairs(gubun, rankpairs, groups_original),
  99. 'full_vector': full_vector_collisions(gubun, groups_original)
  100. }
  101.  
  102. # 2. 시뮬레이션 데이터 수집용 딕셔너리 초기화
  103. # 구조: sim_data[gubun]['per_contest' 또는 'pooled'][rp] = [1000개 결과 리스트]
  104. sim_data = {}
  105. for gubun in gubuns:
  106. sim_data[gubun] = {
  107. 'per_contest': {rp: [] for rp in rankpairs},
  108. 'pooled': {rp: [] for rp in rankpairs},
  109. 'full_vector': []
  110. }
  111.  
  112. print(f'\n🚀 시드 0부터 {ITERATIONS-1}까지 총 {ITERATIONS}회 시뮬레이션 시작...')
  113.  
  114. for i in range(ITERATIONS):
  115. # 시드를 1씩 늘려가며 난수 생성기 생성 (예: 20260603 + i 형태로 기본 시드를 잡거나 그냥 i 사용)
  116. current_seed = i
  117. current_rng = np.random.default_rng(current_seed)
  118.  
  119. # 노이즈 반영 및 그룹화
  120. recs_pert = apply_random_noise_with_rng(recs, current_rng)
  121. groups_perturbed = defaultdict(list)
  122. for r in recs_pert:
  123. groups_perturbed[(r['contest'], r['gubun'])].append(r)
  124.  
  125. # 각 구분별 결과 기록
  126. for gubun in gubuns:
  127. obs_pert = analyze_observed_pairs(gubun, rankpairs, groups_perturbed)
  128. fv_pert = full_vector_collisions(gubun, groups_perturbed)
  129.  
  130. for rp in rankpairs:
  131. sim_data[gubun]['per_contest'][rp].append(obs_pert[rp]['per_contest'])
  132. sim_data[gubun]['pooled'][rp].append(obs_pert[rp]['pooled'])
  133. sim_data[gubun]['full_vector'].append(fv_pert)
  134.  
  135. # 100번 돌 때마다 진행 상황 출력
  136. if (i + 1) % 100 == 0:
  137. print(f' [진행률] {i + 1}/{ITERATIONS} 완료...')
  138.  
  139. # ----------------------------------------------------
  140. # 3. 통계 결과 출력
  141. # ----------------------------------------------------
  142. print('\n' + '=' * 80)
  143. print(f'📊 총 {ITERATIONS}회 노이즈 시뮬레이션 결과 분포 요약')
  144. print('=' * 80)
  145.  
  146. for gubun in gubuns:
  147. print(f'\n🟦 [{gubun}]')
  148. print('-' * 75)
  149.  
  150. for rp in rankpairs:
  151. rp_name = tuple(r+1 for r in rp)
  152. orig_pc = orig_results[gubun]['rankpairs'][rp]['per_contest']
  153. orig_pl = orig_results[gubun]['rankpairs'][rp]['pooled']
  154.  
  155. pc_list = np.array(sim_data[gubun]['per_contest'][rp])
  156. pl_list = np.array(sim_data[gubun]['pooled'][rp])
  157.  
  158. print(f'📌 순위쌍 {rp_name}:')
  159. print(f' • 지역구내 동일쌍 -> [원본]: {orig_pc} | [노이즈 분포]: 평균={pc_list.mean():.1f} (최소={pc_list.min()}, 최대={pc_list.max()}, 표준편차={pc_list.std():.2f})')
  160. print(f' • 전국풀링 동일쌍 -> [원본]: {orig_pl} | [노이즈 분포]: 평균={pl_list.mean():.1f} (최소={pl_list.min()}, 최대={pl_list.max()}, 표준편차={pl_list.std():.2f})')
  161. print('')
  162.  
  163. fv_orig = orig_results[gubun]['full_vector']
  164. fv_list = np.array(sim_data[gubun]['full_vector'])
  165. print(f'✨ 전체 득표벡터 완전일치 쌍 (지역구 내):')
  166. print(f' • [원본]: {fv_orig} 쌍')
  167. print(f' • [노이즈 분포]: 평균={fv_list.mean():.1f} 쌍 (최소={fv_list.min()}, 최대={fv_list.max()}, 표준편차={fv_list.std():.2f})')
  168. print('-' * 75)
Advertisement
Add Comment
Please, Sign In to add comment