Advertisement
Guest User

Untitled

a guest
Jun 25th, 2019
80
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 10.18 KB | None | 0 0
  1. import sys
  2. import struct
  3. import os
  4. import matplotlib.pyplot as plt
  5. import numpy as np
  6.  
  7. #int to 2 byte Little Endian binary
  8. def int_to_LE2(input_int: int) -> bytes:
  9. output_binary = struct.pack('<h', int(input_int))
  10. return output_binary
  11.  
  12. #2 byte Little Endian binary to integer
  13. def LE2_to_int(input_binary: bytes) -> int:
  14. output_int = struct.unpack('<h', input_binary)
  15. return output_int[0]
  16.  
  17. #int to 4 byte Little Endian binary
  18. def int_to_LE4(input_int: int) -> bytes:
  19. output_binary = struct.pack('<i', int(input_int))
  20. return output_binary
  21.  
  22. #4 byte Little Endian binary to integer
  23. def LE4_to_int(input_binary: bytes) -> int:
  24. output_int = struct.unpack('<i', input_binary)
  25. return output_int[0]
  26.  
  27. #8 bit sample (signed byte) to integer
  28. def sample8_to_int(input_binary: bytes) -> int:
  29. output_int = struct.unpack('<B', input_binary)
  30. return output_int[0]
  31.  
  32. #16 bit sample (unsigned short) to integer
  33. def sample16_to_int(input_binary: bytes) -> int:
  34. output_int = struct.unpack('<h', input_binary)
  35. return output_int[0]
  36.  
  37. #32 bit sample (unsigned short) to integer
  38. def sample32_to_int(input_binary: bytes) -> int:
  39. output_int = struct.unpack('<I', input_binary)
  40. return output_int[0]
  41.  
  42. #binary ascii to string
  43. def ascii_to_string(input_binary: bytes) -> str:
  44. output_string = input_binary.decode('ascii')
  45. return output_string
  46.  
  47. class WaveObject(object):
  48. HEADER_SIZE = 44
  49.  
  50. NP_DATA_TYPES = {
  51. 8 : np.dtype('<u1'),
  52. 16 : np.dtype('<i2'),
  53. 32 : np.dtype('<i4')
  54. }
  55.  
  56. #Just did this to make it a collapsible
  57. def init_header(self):
  58. #name [data, descriptor, type]
  59. self.header = {
  60. 'f_type' : [b'RIFF', 'File type', 'ascii'],
  61. 'f_size' : [b'', 'File size', 'LE4'],
  62. 'f_fmt' : [b'WAVE', 'File format', 'ascii'],
  63. 's_chunk_1_id' : [b'fmt ', 'Sub chunk 1 ID', 'ascii'],
  64. 's_chunk_1_size' : [b'\x10\x00\x00\x00', 'Sub chunk 1 size', 'LE4'],
  65. 'audio_fmt' : [b'\x01\x00', 'Audio format', 'LE2'],
  66. 'n_channels' : [b'', 'Number of channels', 'LE2'],
  67. 'sample_rate' : [b'', 'Sample rate', 'LE4'],
  68. 'byte_rate' : [b'', 'Byte rate', 'LE4'],
  69. 'block_align' : [b'', 'Block align', 'LE2'],
  70. 'bit_depth' : [b'', 'Bit depth', 'LE2'],
  71. 's_chunk_2_id' : [b'data', 'Sub chunk 2 ID', 'ascii'],
  72. 'data_size' : [b'', 'Data size', 'LE4']}
  73.  
  74. #Opens specified file if it exists, creates it otherwise
  75. #Defaults to 'newfile.wav'
  76. def __init__(self, file_name: str = 'newfile.wav'):
  77. self.init_header()
  78. if os.path.isfile(file_name):
  79. self.open_wav(file_name)
  80. else:
  81. self.create_wav(file_name)
  82.  
  83. #Opens a wav file, checks it, then rebuilds the header
  84. def open_wav(self, file_name: str):
  85. with open(file_name, mode='rb') as f:
  86. header_data = f.read(self.HEADER_SIZE)
  87. raw_data = f.read()
  88.  
  89. self.build_header(header_data)
  90. #check that file is a WAV file
  91. bit_depth = LE2_to_int(self.header['bit_depth'][0])
  92. n_channels = LE2_to_int(self.header['n_channels'][0])
  93.  
  94. if (self.header['f_type'][0] != b'RIFF') or (self.header['f_fmt'][0] != b'WAVE'):
  95. print("File is not a valid WAV file")
  96. sys.exit(1)
  97.  
  98. if (bit_depth % 8 != 0) or (bit_depth > 32):
  99. print("Bit depth is invalid (" + self.header['bit_depth'][0] + ")")
  100. sys.exit(1)
  101.  
  102. dt = self.NP_DATA_TYPES[bit_depth]
  103. raw_data = np.fromstring(raw_data, dt)
  104. raw_data.shape = (-1, n_channels)
  105. self.data = raw_data
  106. self.rebuild_header()
  107.  
  108. #creates a new wav file from specified file name, defaults values to normal values and then builds the header
  109. def create_wav(self, file_name: str, n_channels: int = 2, sample_rate: int = 44100, bit_depth: int = 16):
  110. if n_channels < 0:
  111. print("Number of channels is invalid (" + n_channels + ")")
  112. sys.exit(1)
  113. if sample_rate < 0:
  114. print("Sample rate is invalid (" + sample_rate + ")")
  115. sys.exit(1)
  116. if (bit_depth % 8 != 0) or (bit_depth > 32):
  117. print("Bit depth is invalid (" + bit_depth + ")")
  118. sys.exit(1)
  119.  
  120. self.header['n_channels'][0] = int_to_LE2(n_channels)
  121. self.header['sample_rate'][0] = int_to_LE4(sample_rate)
  122. self.header['bit_depth'][0] = int_to_LE2(bit_depth)
  123.  
  124. dt = self.NP_DATA_TYPES[bit_depth]
  125. raw_data = np.array([], dt)
  126. raw_data.shape = (-1, n_channels)
  127. self.rebuild_header()
  128. print("Created new file: " + file_name)
  129.  
  130. #Takes WAV header binay as input, writes to self.header
  131. def build_header(self, header_data: bytes):
  132. index = 0
  133. for item,entry in self.header.items():
  134. if (entry[2] == 'ascii') or (entry[2] == 'LE4'):
  135. self.header[item][0] = header_data[index:index+4]
  136. index += 4
  137. elif entry[2] == 'LE2':
  138. self.header[item][0] = header_data[index:index+2]
  139. index += 2
  140.  
  141. #Prints header info from self.header
  142. def print_header(self, raw_binary: int = 0):
  143. for item,entry in self.header.items():
  144. if raw_binary:
  145. print(entry[1] + ": " + str(entry[0]))
  146. elif entry[2] == 'ascii':
  147. print(entry[1] + ": " + ascii_to_string(entry[0]))
  148. elif entry[2] == 'LE2':
  149. print(entry[1] + ": " + str(LE2_to_int(entry[0])))
  150. elif entry[2] == 'LE4':
  151. print(entry[1] + ": " + str(LE4_to_int(entry[0])))
  152. print('Total samples: ' + str(len(self.data)))
  153.  
  154. #Rebuilds the header information when there is a change
  155. #Does it in the order byte rate, block align, data size
  156. #Additionally updates meta-info of the header that is not written to the file
  157. def rebuild_header(self):
  158. sample_rate = LE4_to_int(self.header['sample_rate'][0])
  159. bit_depth = LE2_to_int(self.header['bit_depth'][0])
  160. num_channels = LE2_to_int(self.header['n_channels'][0])
  161. try:
  162. total_samples = len(self.data) * 2
  163. except None:
  164. total_samples = 0
  165.  
  166. #should be sample rate * bit depth * channels / 8 (LE4)
  167. byte_rate = sample_rate * bit_depth * num_channels / 8
  168. self.header['byte_rate'][0] = int_to_LE4(byte_rate)
  169.  
  170. #should be bit depth * channels / 8 (LE2)
  171. block_align = bit_depth * num_channels / 8
  172. self.header['block_align'][0] = int_to_LE2(block_align)
  173.  
  174. #should be toal samples * bit depth / 8 (LE4)
  175. data_size = total_samples * bit_depth / 8
  176. self.header['data_size'][0] = int_to_LE4(data_size)
  177.  
  178. #size of data + header size - 8 because first 8 bytes don't count for some reason (LE4)
  179. file_size = data_size + self.HEADER_SIZE - 8
  180. self.header['f_size'][0] = int_to_LE4(file_size)
  181.  
  182. #Plots the data, takes a start and end time as an input, defaults to entire file
  183. def plot_waveform(self, start: int = 0, end: int = -1):
  184. sample_rate = LE4_to_int(self.header['sample_rate'][0])
  185. n_channels = LE2_to_int(self.header['n_channels'][0])
  186. bit_depth = LE2_to_int(self.header['bit_depth'][0])
  187.  
  188. start_sample = start * sample_rate
  189.  
  190. if end == -1:
  191. end_sample = len(self.data)
  192. else:
  193. end_sample = end * sample_rate
  194.  
  195. samples = self.data[start_sample:end_sample]
  196. x = np.arange(start_sample, end_sample)
  197.  
  198. plt.figure('Waveforms')
  199.  
  200. for c in range(0, n_channels):
  201. line_label = 'Channel ' + str(c)
  202. plt.plot(x, samples[:,c], label = line_label)
  203. plt.legend()
  204. plt.xlabel('Sample')
  205. plt.ylabel('Amplitude')
  206. plt.show()
  207.  
  208. #merge audio streams and prevent clipping
  209. #def merge_audio(self, stream_a: array, stream_b: array):
  210. # bit_depth = LE2_to_int(self.header['bit_depth'][0])
  211.  
  212.  
  213. #Amplifies each value of the array by a certain percentage
  214. def amplify(self, magnitude: float = 1.25):
  215. bit_depth = LE2_to_int(self.header['bit_depth'][0])
  216. dt = self.NP_DATA_TYPES[bit_depth]
  217. samples = np.array(self.data, float)
  218. samples = samples * magnitude
  219. self.data = np.array(samples, dt)
  220.  
  221. #Adds reverb to the data, takes strength which is how much the reverb decays and time which is in seconds
  222. def add_reverb(self, strength: float = 0.25, delay: float = 1.5):
  223. sample_rate = LE4_to_int(self.header['sample_rate'][0])
  224. n_channels = LE2_to_int(self.header['n_channels'][0])
  225. bit_depth = LE2_to_int(self.header['bit_depth'][0])
  226. dt = self.NP_DATA_TYPES[bit_depth]
  227.  
  228. #get length of reverb effect in samples
  229. reverb_length = int(sample_rate * delay)
  230.  
  231.  
  232. #create arrays
  233. samples = np.array(self.data, float)
  234. reverb = np.array(samples, float)
  235. buffer = np.zeros([reverb_length, n_channels], float)
  236. #apply reverb, shift downwards
  237. reverb = reverb * strength
  238. reverb = np.insert(reverb, 0, buffer, 0)
  239. #add buffer to end of samples
  240. samples = np.append(samples, buffer, 0)
  241. #apply reverb to original data, prevent clipping
  242. data = 2 * (samples + reverb) - (samples * reverb / (2 ^ int(bit_depth / 2)))
  243. #convert sata back to proper data type and apply
  244. self.data = np.array(data, dt)
  245. self.rebuild_header()
  246.  
  247. #Takes a new sample rate as an input then writes it to the header
  248. def change_sample_rate(self, new_sample_rate: int):
  249. if new_sample_rate <= 0:
  250. print("Sample rate is invalid (" + new_sample_rate + ") value not being changed")
  251. else:
  252. self.header['sample_rate'][0] = int_to_LE4(new_sample_rate)
  253. self.rebuild_header()
  254.  
  255. #Takes new bit depth as an input then writes it to the header
  256. def change_bit_depth(self, new_bit_depth: int):
  257. if (new_bit_depth % 8 != 0) or (new_bit_depth > 32):
  258. print("Bit depth is invalid (" + new_bit_depth + ") value not being changed")
  259. else:
  260. self.header['bit_depth'][0] = int_to_LE2(new_bit_depth)
  261. self.rebuild_header()
  262.  
  263. #Takes channel number as an input then writes it to the header
  264. def change_num_channels(self, new_num_channels: int):
  265. if new_num_channels <= 0:
  266. print("Channel number is invalid (" + new_num_channels + ") value not being changed")
  267. else:
  268. self.header['n_channels'][0] = int_to_LE2(new_num_channels)
  269. self.rebuild_header()
  270.  
  271. #Takes a file name and start/end as args, defaults to first 100 samples
  272. def dump_samples(self, file_name: str, start: int = 0, end: int = 100):
  273. if len(self.data) < end:
  274. print("Specified end (" + str(end) + ") is greater than number of samples")
  275. exit(1)
  276. samples = self.data[start:end]
  277. with open(file_name, mode='w+') as f:
  278. for s in samples:
  279. f.write(str(s) + "\n")
  280.  
  281. #Takes the file name as an input and writes all data to that file
  282. def write_file(self, file_name: str):
  283. with open(file_name, mode='bw+') as f:
  284. for item,entry in self.header.items():
  285. f.write(entry[0])
  286. f.write(self.data.tostring())
  287. print("Wrote to file " + file_name)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement