Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # ================================================ General Info ========================================================
- """
- Course: Visualization and Sonification
- EX1
- Name: Omer Arie Lerinman
- ID: 304919863
- """
- # ===================================================== Imports ========================================================
- # General
- from datetime import datetime
- from collections import defaultdict
- from datetime import date, timedelta
- import matplotlib.pyplot as plt
- import math
- from copy import deepcopy
- import os
- # GUI
- from tkinter import *
- from matplotlib.offsetbox import OffsetImage, AnnotationBbox
- from matplotlib.widgets import Button
- import numpy as np
- from scipy.misc import imread
- import matplotlib.cbook as cbook
- # SOUND
- from pydub import AudioSegment
- import pyaudio
- from pydub.playback import *
- try:
- import thread
- except ImportError:
- import _thread as thread
- # ===================================================== Constants ======================================================
- DB_PATH = 'all_data_db.txt'
- CODING = 'UTF-16'
- DATE_TIME_PATTERN = '%Y-%m-%d %H:%M:%S'
- example_line = '2017-11-26 13:36:00, 864, 754, 941, 955, 814, 751, 839, 899'
- SEP = os.path.sep
- IMAGES_PATH = os.path.abspath('.') + SEP + 'images'
- images_path = IMAGES_PATH + SEP + '{}.png'
- KNESSET_PATH = IMAGES_PATH + SEP + 'knesset.png'
- SOUND_PATH = 'voices' + SEP + 'edited' + SEP
- HATIKVA_VOICES_PATHS = SOUND_PATH + '{}.wav'
- HATIKVA_PLAYBACK_PATH = SOUND_PATH + 'playback.wav'
- SEPARATOR = ';'
- FIELDS = ['en_name', 'id', 'expenses',
- 'presence_days', 'he_name']
- DB_FORMAT_LINE = 'format:\t' + SEPARATOR.join(FIELDS) + '\n'
- FORMAT_LINE = SEPARATOR.join(['{' + field + '}' for field in FIELDS]) + '\n'
- TITLE = "Visualization & Sonification \ Omer Arie Lerinman"
- # ===================================================== MK class ======================================================
- class MK:
- def __init__(self, he_name='_', en_name='_', mk_id=0, expenses=0, presence_days=None, social=None):
- if not presence_days:
- self.presence_days = timedelta()
- else:
- self.presence_days = presence_days
- self.he_name = he_name
- self.en_name = en_name
- self.expenses = expenses
- self.social = social
- self.id = mk_id
- if mk_id:
- self.image_path = images_path.format(mk_id)
- self.voice_path = HATIKVA_VOICES_PATHS.format(mk_id)
- else:
- self.image_path = '_'
- self.voice_path = '_'
- def has_values(self):
- return self.id != 0 and self.presence_days.seconds > 0 and self.social
- # ===================================================== Global variables
- # Data
- mk_dict = defaultdict(MK)
- # Gui
- BACKGROUND_IMAGE_PATH = os.path.abspath('images\\knesset.png')
- mk_indices_in_gui = {}
- # Sound
- thread_indices = np.arange(0, 8)
- tikva_paths = [HATIKVA_VOICES_PATHS.format('p' + str(i)) for i in thread_indices]
- anthem_records = [AudioSegment.from_file(path, format="wav") for path in tikva_paths]
- playback_tikva = AudioSegment.from_file(HATIKVA_PLAYBACK_PATH, format="wav")
- duration_in_milliseconds = len(playback_tikva)
- # ===================================================== DB utils ======================================================
- def load_known():
- global mk_dict
- mk_dict[936] = MK(en_name='Betsalel Smutrich', mk_id=936, social=-22)
- mk_dict[878] = MK(en_name='Yair Lapid', mk_id=878, social=40)
- mk_dict[924] = MK(en_name='Oren Hazan', mk_id=924, social=-50)
- mk_dict[903] = MK(en_name='Stav Shafir', mk_id=903, social=63)
- mk_dict[871] = MK(en_name='Tamar Zandberg', mk_id=871, social=43)
- mk_dict[881] = MK(en_name='Meirav Michaeli', mk_id=881, social=56)
- mk_dict[208] = MK(en_name='Ahmed Tibi', mk_id=208, social=23)
- mk_dict[900] = MK(en_name='Ofer Shelach', mk_id=900, social=63)
- mk_dict[846] = MK(en_name='Hanin Zuabi', mk_id=846, social=18)
- mk_dict[955] = MK(en_name='Jehooda Glik', mk_id=955, social=-13)
- mk_dict[938] = MK(en_name='Iman Udah', mk_id=938, social=40)
- def write_db():
- global mk_dict
- with open(DB_PATH, 'w', encoding=CODING) as f:
- f.write(DB_FORMAT_LINE)
- for mk in mk_dict.values():
- f.write(FORMAT_LINE.format(he_name=mk.he_name, en_name=mk.en_name, id=mk.id,
- expenses=mk.expenses, presence_days=mk.presence_days.seconds / 3600))
- def load_db():
- global mk_dict
- with open(DB_PATH, 'r', encoding=CODING) as f:
- first_line = True
- for line in f:
- if first_line:
- first_line = False
- continue
- mk_as_array = line.split(SEPARATOR)
- en_name = mk_as_array[0]
- mk_id = int(mk_as_array[1])
- expenses = int(mk_as_array[2])
- presence_days = timedelta(seconds=int(float(mk_as_array[3]) * 3600))
- he_name = mk_as_array[4]
- if mk_id > 0:
- mk_dict[mk_id] = MK(he_name=he_name, en_name=en_name, mk_id=mk_id, expenses=expenses,
- presence_days=presence_days)
- else:
- mk_dict[en_name] = MK(he_name=he_name, en_name=en_name, mk_id=mk_id, expenses=expenses,
- presence_days=presence_days)
- # ===================================================== get data ======================================================
- def get_expenses():
- """
- get the expenses of the MK's
- :return:
- """
- with open('public_expenses_2017.txt', 'r', encoding=CODING) as f:
- for line in f:
- words = line.split(' ', 1)
- words[-1] = words[-1][:-1]
- if words[-1] == '':
- words.pop()
- expense = int(float(words[-1]))
- name = ' '.join(words[:-1])
- in_dict = False
- for id, mk in mk_dict:
- if name == mk.he_name:
- in_dict = True
- mk.expenses = expense
- if not in_dict:
- mk_dict[name] = MK(he_name=name, expenses=expense)
- def get_presence():
- global mk_dict
- total_counted_days = 0
- with open('presence_2017.txt') as f:
- prev_day = -1
- prev_presence_mk = []
- day_presence_dict = defaultdict(dict)
- prev_date_time = None
- continue_to_next_day = False
- def add_to_mk(mk_id):
- if day_presence_dict[mk_id]:
- if mk_id not in mk_dict:
- mk_dict[mk_id] = MK(mk_id=mk_id)
- if 'in' not in day_presence_dict[mk_id] or 'out' not in day_presence_dict[mk_id]:
- k = 1 # todo delete
- mk_dict[mk_id].presence_days += day_presence_dict[mk_id]['out'] - day_presence_dict[
- mk_id]['in']
- for line in f:
- # Parse
- words = line.split(', ')
- words[-1] = words[-1][:-1]
- if words[-1] == '':
- words.pop()
- date_time_str = words[0]
- cur_date_time = datetime.strptime(date_time_str, DATE_TIME_PATTERN)
- # if cur_date_time.minute == 25 and cur_date_time.hour == 11 and cur_date_time.day==12: # todo delete
- # k=1
- # init time variables
- year = cur_date_time.year
- date_object = date(year=year, month=cur_date_time.month, day=cur_date_time.day)
- weekday = date_object.weekday() # Return the day of the week as an integer, where Monday is 0 and Sunday 6
- if prev_day != weekday:
- continue_to_next_day = False
- prev_day = weekday
- prev_presence_mk = []
- prev_date_time = cur_date_time # empty dict for a new day
- if day_presence_dict:
- for mk_id in day_presence_dict:
- add_to_mk(mk_id)
- total_counted_days += 1
- day_presence_dict = defaultdict(dict)
- elif continue_to_next_day:
- continue
- if len(words) == 1 and not day_presence_dict:
- prev_date_time = cur_date_time
- continue # no MK's present
- # Don't count days where the clock didn't operate for more then a hour
- if prev_date_time:
- gap = cur_date_time - prev_date_time
- if gap.seconds > 7200:
- day_presence_dict.clear()
- day_presence_dict = defaultdict(dict)
- prev_date_time = cur_date_time
- continue_to_next_day = True
- continue
- # MK's are obligated to be present at monday and wednesday (Plenum day, aka Melia'a)
- if weekday in [0, 2]:
- prev_date_time = cur_date_time # empty dict for a new day
- continue
- # Update current day's presence of each mk
- presence_mks = [int(mk) for mk in words[1:]]
- for mk_id in presence_mks:
- day_presence_dict[mk_id]['out'] = deepcopy(cur_date_time)
- if mk_id not in prev_presence_mk:
- day_presence_dict[mk_id]['in'] = deepcopy(cur_date_time)
- # Calculate time of mk's who left
- mk_who_left = set(prev_presence_mk[:]) - frozenset(presence_mks[:])
- for mk_id in mk_who_left:
- add_to_mk(mk_id)
- del day_presence_dict[mk_id]
- prev_presence_mk = presence_mks
- prev_date_time = cur_date_time
- # Normalize mk's presence in the number of counted days:
- for mk_id in mk_dict:
- mk_dict[mk_id].presence_days /= total_counted_days
- def init_indices():
- global mk_dict, mk_indices_in_gui
- for i, mk_id in enumerate(mk_dict):
- mk = mk_dict[mk_id]
- if mk.has_values():
- xy = (mk.social, mk.presence_days.seconds / 3600)
- mk_indices_in_gui[mk_id] = xy
- knesset_img = None
- def init_gui2():
- global mk_indices_in_gui, mk_dict, knesset_img
- datafile = cbook.get_sample_data(KNESSET_PATH)
- fig, ax = plt.subplots()
- plt.tight_layout()
- plt.title(TITLE)
- plt.xlabel('Social rank')
- plt.ylabel('Average presence in the Knesset')
- ax.set_xlim(-70, 85)
- ax.set_ylim(0, 12)
- plt.subplots_adjust(left=0.1, right=0.9, bottom=0.1, top=0.9)
- # cid2 = fig.canvas.mpl_connect('motion_notify_event', motion_notify_callback) # todo unhide
- img = imread(datafile)
- plt.subplots_adjust(bottom=0.2)
- callback = PlayButtons()
- axstart = plt.axes([0.1, 0.05, 0.1, 0.075])
- bnext = Button(axstart, 'Start')
- bnext.on_clicked(callback.start)
- plt.imshow(img, zorder=0)
- plt.show()
- def gui_2_helper(mk_id=None):
- global mk_indices_in_gui, image
- fig, ax = plt.subplots()
- plt.tight_layout()
- plt.title(TITLE)
- plt.xlabel('Social rank')
- plt.ylabel('Average presence in the Knesset')
- ax.set_xlim(-70, 85)
- ax.set_ylim(0, 12)
- plt.subplots_adjust(left=0.1, right=0.9, bottom=0.1, top=0.9)
- if mk_id:
- mk = mk_dict[mk_id]
- arr_img = plt.imread(mk.image_path, format='png')
- imagebox = OffsetImage(arr_img, zoom=0.2)
- imagebox.image.axes = ax
- xy = mk_indices_in_gui[mk_id]
- image_box = AnnotationBbox(imagebox, xy, xybox=(0, 0), xycoords='data', boxcoords="offset points",
- box_alignment=(-0.5, -0.5), pad=0, arrowprops=dict(arrowstyle="->"))
- ax.add_artist(image_box)
- plt.show(block=False)
- # ===================================== init threads
- chapters = None
- playback_slices = None
- def init_sound():
- global chapters, playback_slices, anthem_records, playback_tikva, duration_in_milliseconds, mk_indices_in_gui
- p = pyaudio.PyAudio()
- a = p.get_default_output_device_info()
- print('Sound device:\t' + str(a['name']))
- number_of_voices = len(tikva_records)
- number_of_mk_to_present = len(mk_indices_in_gui)
- number_of_slices = number_of_mk_to_present + 2
- chapters_length = math.floor(duration_in_milliseconds / number_of_slices)
- playback_slices = list(playback_tikva[::chapters_length])
- tikva_records_slices = [list(version[::chapters_length]) for version in tikva_records]
- mk_in_gui = list(mk_indices_in_gui.keys())
- for mk_id, i in zip(mk_in_gui, np.arange(1, number_of_slices)):
- mk = mk_dict[mk_id]
- # sociality is between -24 to 75
- cur_number_of_voices = math.floor(((mk.social + 24) / 99) * number_of_voices)
- for j in range(cur_number_of_voices):
- playback_slices[i] = playback_slices[i].overlay(tikva_records_slices[j][i])
- gain = min(mk.presence_days.seconds / 3600, 12) / 12 # between [0,1]
- gain_change = float(-(1 - gain)) * 10
- playback_slices[i].apply_gain(gain_change)
- chapters = [None] + mk_in_gui + [None]
- def play_sonification():
- global chapters, playback_slices
- for i, mk_id in enumerate(chapters):
- p = playback_slices[i]
- plt.close()
- gui_2_helper(mk_id)
- play(p)
- # ===================================================== buttons # ======================================================
- class PlayButtons(object):
- def start(self, event):
- play_sonification()
- # ===================================================== Main ======================================================
- if __name__ == '__main__':
- load_known()
- # load_db()
- # get_expenses()
- get_presence()
- init_indices()
- # write_db()
- init_sound()
- while 1:
- init_gui2()
Add Comment
Please, Sign In to add comment