Advertisement
Nsfr750

msr.py

Apr 24th, 2019
116
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 13.87 KB | None | 0 0
  1. #!/usr/bin/env python
  2. #
  3. # File: msr.py
  4. import time
  5. import serial
  6.  
  7. class msr(serial.Serial):
  8.     escape_code = "\x1B"
  9.     end_code = "\x1C"
  10.     hico=True
  11.     loco=False
  12.     hibpi=True
  13.     lobpi=False
  14.     track1_map  = " !\"#$%&'()*+`,./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_"
  15.     track23_map = "0123456789:;<=>?"
  16.     parity_map  = [1,0,0,1,0,1,1,0,0,1,1,0,1,0,0,1,0,1,1,0,1,0,0,1,1,0,0,1,0,1,1,0, \
  17.                    0,1,1,0,1,0,0,1,1,0,0,1,0,1,1,0,1,0,0,1,0,1,1,0,0,1,1,0,1,0,0,1]
  18.     rev6bit_map = [0,32,16,48,8,40,24,56,4,36,20,52,12,44,28,60,2,34,18,50,10,42,26,58,6,38,22,54,14,46,30,62, \
  19.                    1,33,17,49,9,41,25,57,5,37,21,53,13,45,29,61,3,35,19,51,11,43,27,59,7,39,23,55,15,47,31,63]
  20.    
  21.     def __init__(self, dev_path):
  22.         if dev_path.find("/") == -1: dev_path = "/dev/" + dev_path
  23.         serial.Serial.__init__(self,dev_path,9600,8,serial.PARITY_NONE,timeout=0)
  24.         self.reset()
  25.    
  26.     def __execute_noresult(self, command):
  27.         self.write(msr.escape_code+command)
  28.         time.sleep(0.1)
  29.    
  30.     def __execute_waitresult(self, command, timeout=10):
  31.         # execute
  32.         self.flushInput()
  33.         self.write(msr.escape_code+command)
  34.         time.sleep(0.1)
  35.        
  36.         # get result
  37.         self.timeout=timeout
  38.         result = self.read()
  39.         time.sleep(0.5)
  40.         if result == "": raise Exception("operation timed out")
  41.         self.timeout=0
  42.         result += self.read(1000)
  43.        
  44.         # parse result : status, result, data
  45.         pos = result.rindex(msr.escape_code)
  46.         return result[pos+1], result[pos+2:], result[0:pos]
  47.  
  48.     def reset(self):
  49.         self.__execute_noresult("a")
  50.    
  51.     @staticmethod
  52.     def __decode_isodatablock(data):
  53.         if data[0:4] != msr.escape_code+"s"+msr.escape_code+"\x01":
  54.             raise Exception("bad datablock : don't start with <ESC>s<ESC>[01]", data)
  55.         if data[-2:] != "?"+msr.end_code:
  56.             raise Exception("bad datablock : don't end with ?<FS>", data)
  57.        
  58.         strip1_start = 4
  59.         strip1_end = data.index(msr.escape_code,strip1_start)
  60.         if strip1_end == strip1_start:
  61.             strip1_end += 2
  62.             strip1 = None
  63.         else:
  64.             strip1 = data[strip1_start:strip1_end]
  65.  
  66.         strip2_start = strip1_end+2
  67.         if data[strip1_end:strip2_start] != msr.escape_code+"\x02":
  68.             raise Exception("bad datablock : missing <ESC>[02] at position %d" % strip1_end, data)
  69.         strip2_end = data.index(msr.escape_code,strip2_start)
  70.         if strip2_end == strip2_start:
  71.             strip2_end += 2
  72.             strip2 = None
  73.         else:
  74.             strip2 = data[strip2_start:strip2_end]
  75.        
  76.         # third strip
  77.         strip3_start = strip2_end+2
  78.         if data[strip2_end:strip3_start] != msr.escape_code+"\x03":
  79.             raise Exception("bad datablock : missing <ESC>[03] at position %d" % strip2_end, data)
  80.         if data[strip3_start] == msr.escape_code:
  81.             strip3 = None
  82.         else:
  83.             strip3 = data[strip3_start:-2]
  84.        
  85.         return strip1, strip2, strip3
  86.    
  87.     @staticmethod
  88.     def __encode_isodatablock(strip1, strip2, strip3):
  89.         # use empty string if you don't want to set a given strip
  90.         return "\x1bs\x1b\x01"+strip1+"\x1b\x02"+strip2+"\x1b\x03"+strip3+"?\x1C"
  91.    
  92.     @staticmethod
  93.     def __decode_rawdatablock(data):
  94.         # header
  95.         if data[0:4] != msr.escape_code+"s"+msr.escape_code+"\x01":
  96.             raise Exception("bad datablock : don't start with <ESC>s<ESC>[01]", data)
  97.        
  98.         # first strip
  99.         strip1_start = 4
  100.         strip1_end = strip1_start + 1 + ord(data[strip1_start]) # first byte is length
  101.         strip1 = data[strip1_start+1:strip1_end]
  102.  
  103.         # second strip
  104.         strip2_start = strip1_end+2
  105.         if data[strip1_end:strip2_start] != msr.escape_code+"\x02":
  106.             raise Exception("bad datablock : missing <ESC>[02] at position %d" % strip1_end, data)
  107.         strip2_end = strip2_start + 1 + ord(data[strip2_start])
  108.         strip2 = data[strip2_start+1:strip2_end]
  109.        
  110.         # third strip
  111.         strip3_start = strip2_end+2
  112.         if data[strip2_end:strip3_start] != msr.escape_code+"\x03":
  113.             raise Exception("bad datablock : missing <ESC>[03] at position %d" % strip2_end, data)
  114.         strip3_end = strip3_start + 1 + ord(data[strip3_start])
  115.         strip3 = data[strip3_start+1:strip3_end]
  116.  
  117.         # trailer
  118.         if data[strip3_end:] != "?"+msr.end_code:
  119.             raise Exception("bad datablock : missing ?<FS> at position %d", strip3_end, data)
  120.                
  121.         return strip1, strip2, strip3
  122.  
  123.     @staticmethod
  124.     def __encode_rawdatablock(strip1, strip2, strip3):
  125.         datablock = "\x1bs"
  126.         if strip1 != "":
  127.             datablock += "\x1b\x01"+chr(len(strip1))+strip1
  128.         if strip2 != "":
  129.             datablock += "\x1b\x02"+chr(len(strip2))+strip2
  130.         if strip3 != "":
  131.             datablock += "\x1b\x03"+chr(len(strip3))+strip3
  132.         datablock += "?\x1C"
  133.         return datablock
  134.         return "\x1bs\x1b\x01"+chr(len(strip1))+strip1+"\x1b\x02"+chr(len(strip2))+strip2+"\x1b\x03"+chr(len(strip3))+strip3+"?\x1C"
  135.    
  136.     @staticmethod
  137.     def pack_raw(data, mapping, bcount_code, bcount_output):
  138.         raw = ""
  139.         lrc = 0      
  140.         rem_bits = 0  
  141.         rem_count = 0
  142.         for c in data:
  143.             i = mapping.find(c)                      
  144.             if i==-1: i = 0                          
  145.             lrc ^= i
  146.             i |= msr.parity_map[i] << bcount_code    
  147.             rem_bits |= i << rem_count              
  148.             rem_count += bcount_code+1
  149.             if rem_count >= bcount_output:
  150.                 raw += chr(rem_bits & ((1<<bcount_output)-1))
  151.                 rem_bits >>= bcount_output
  152.                 rem_count -= bcount_output
  153.         lrc |= msr.parity_map[i] << bcount_code
  154.         rem_bits |= lrc << rem_count
  155.         rem_count += bcount_code+1
  156.         if rem_count >= bcount_output:
  157.             raw += chr(rem_bits & ((1<<bcount_output)-1))
  158.             rem_bits >>= bcount_output
  159.             rem_count -= bcount_output
  160.         if rem_count > 0:
  161.             raw += chr(rem_bits)
  162.         return raw
  163.    
  164.     @staticmethod
  165.     def unpack_raw(raw, mapping, bcount_code, bcount_output):
  166.         data = ""
  167.         parity_errors = ""
  168.         rem_bits = 0  
  169.         rem_count = 0
  170.         lrc = 0      
  171.         last_non_null = -1
  172.         for c in raw:
  173.             rem_count += bcount_output              
  174.             rem_bits = (rem_bits << bcount_output) | (ord(c) & ((1<<bcount_output)-1))
  175.             while rem_count >= bcount_code+1:
  176.                 rem_count -= bcount_code+1
  177.                 i = rem_bits >> rem_count
  178.                 rem_bits &= ((1<<rem_count)-1)
  179.                 p = i & 0x1 # parity code
  180.                 i = msr.rev6bit_map[i>>1] >> (6-bcount_code)
  181.                 data += mapping[i]
  182.                 if i != 0: last_non_null = len(data)-1                
  183.                 lrc ^= i
  184.                 if msr.parity_map[i] == p:
  185.                     parity_errors += " "
  186.                 else:
  187.                     parity_errors += "^"
  188.                
  189.         lrc_error = (lrc != 0)
  190.        
  191.         return data[0:last_non_null+1], len(data), parity_errors[0:last_non_null+1], lrc_error
  192.        
  193.     def read_tracks(self):
  194.         status, _, data = self.__execute_waitresult("r")
  195.         if status != "0":
  196.             raise Exception("read error : %c" % status)
  197.         return self.__decode_isodatablock(data)
  198.  
  199.     def read_raw_tracks(self):
  200.         status, _, data = self.__execute_waitresult("m")
  201.         if status != "0":
  202.             raise Exception("read error : %c" % status)
  203.         return self.__decode_rawdatablock(data)
  204.  
  205.     def write_tracks(self, t1="", t2="", t3=""):
  206.         data = self.__encode_isodatablock(t1,t2,t3)
  207.         status, _, _ = self.__execute_waitresult("w"+data)
  208.         if status != "0":
  209.             raise Exception("write error : %c" % status)
  210.  
  211.     def write_raw_tracks(self, t1, t2, t3):
  212.         data = self.__encode_rawdatablock(t1,t2,t3)
  213.         status, _, _ = self.__execute_waitresult("n"+data)
  214.         if status != "0":
  215.             raise Exception("write error : %c" % status)
  216.  
  217.     def erase_tracks(self, t1=False, t2=False, t3=False):
  218.         mask = 0
  219.         if t1: mask |= 1
  220.         if t2: mask |= 2
  221.         if t3: mask |= 4
  222.         status, _, _ = self.__execute_waitresult("c"+chr(mask))
  223.         if status != "0":
  224.             raise Exception("erase error : %c" % status)
  225.    
  226.     def set_leadingzero(self, track13, track2):
  227.         status, result, _ = self.__execute_waitresult("o"+chr(bpc1)+chr(bpc2)+chr(bpc3))
  228.         if status != "0":
  229.             raise Exception("set_bpc error : %c" % status)
  230.  
  231.     def set_bpc(self, bpc1, bpc2, bpc3):
  232.         status, result, _ = self.__execute_waitresult("o"+chr(bpc1)+chr(bpc2)+chr(bpc3))
  233.         if status != "0":
  234.             raise Exception("set_bpc error : %c" % status)
  235.  
  236.     def set_bpi(self, bpi1=None, bpi2=None, bpi3=None):
  237.         modes = []
  238.         if bpi1==True: modes.append("\xA1")    # 210bpi
  239.         elif bpi1==False: modes.append("\xA0") # 75bpi
  240.         if bpi2==True: modes.append("\xD2")
  241.         elif bpi2==False: modes.append("\x4B")
  242.         if bpi2==True: modes.append("\xC1")
  243.         elif bpi2==False: modes.append("\xC0")
  244.         for m in modes:
  245.             status, result, _ = self.__execute_waitresult("b"+m)
  246.             if status != "0":
  247.                 raise Exception("set_bpi error : %c for %s" % (status,hex(m)))
  248.  
  249.     def set_coercivity(self, hico):
  250.         if hico:
  251.             status, _, _ = self.__execute_waitresult("x")
  252.         else:
  253.             status, _, _ = self.__execute_waitresult("y")
  254.         if status != "0":
  255.             raise Exception("set_hico error : %c" % status)
  256.  
  257. if __name__ == "__main__":
  258.     import argparse
  259.     parser = argparse.ArgumentParser()
  260.     group = parser.add_mutually_exclusive_group(required=True)
  261.     group.add_argument ('-r', '--read', action="store_true", help="read magnetic tracks")
  262.     group.add_argument ('-w', '--write', action="store_true", help="write magnetic tracks")
  263.     group.add_argument ('-e', '--erase', action="store_true", help="erase magnetic tracks")
  264.     group.add_argument ('-C', '--hico', action="store_true", help="select high coercivity mode")
  265.     group.add_argument ('-c', '--loco', action="store_true", help="select low coercivity mode")
  266.     group.add_argument ('-b', '--bpi', help="bit per inch for each track (h or l)")
  267.     parser.add_argument('-d', '--device', help="path to serial communication device")
  268.     parser.add_argument('-0', '--raw', action="store_true", help="do not use ISO encoding/decoding")
  269.     parser.add_argument('-t', '--tracks', default="123", help="select tracks (1, 2, 3, 12, 23, 13, 123)")
  270.     parser.add_argument('-B', '--bpc', help="bit per caracters for each track (5 to 8)")
  271.     parser.add_argument('data', nargs="*", help="(write only) 1, 2 or 3 arguments, matching --tracks")
  272.     args = parser.parse_args();
  273.    
  274.     if (args.read or args.erase) and len(args.data) != 0 or args.write and (len(args.data) != len(args.tracks)):
  275.         print "too many arguments"
  276.         parser.print_help()
  277.         exit(1)
  278.    
  279.     tracks = [False, False, False]
  280.     data = ["", "", ""]
  281.     for i in range(0,len(args.tracks)):
  282.         n = int(args.tracks[i])-1
  283.         if(n<0 or n>2 or tracks[n]):
  284.             parser.print_help()
  285.             exit(1)
  286.         tracks[n] = True
  287.         if(args.write):
  288.             data[n] = args.data[i]
  289.  
  290.     bpc1 = 8
  291.     bpc2 = 8
  292.     bpc3 = 8
  293.     if args.bpc:
  294.         bpc1 = ord(args.bpc[0])-48
  295.         bpc2 = ord(args.bpc[1])-48
  296.         bpc3 = ord(args.bpc[2])-48
  297.     elif args.raw:
  298.         args.bpc = "888" # force setup, as it's kept accross runs
  299.  
  300.     if args.bpi:
  301.         bpi1 = args.bpi[0] != "l"
  302.         bpi2 = args.bpi[1] != "l"
  303.         bpi3 = args.bpi[2] != "l"
  304.    
  305.     # main code
  306.     try:
  307.         dev = msr(args.device)
  308.        
  309.         if args.bpc:
  310.             dev.set_bpc(bpc1,bpc2,bpc3)
  311.        
  312.         if args.read & args.raw:
  313.             s1,s2,s3 = dev.read_raw_tracks()
  314.             def print_result(num, res):
  315.                 s,l,perr,lerr = res
  316.                 line = "%d=%s" % (num, s)
  317.                 if len(s) != l: line += " (+%d null)" % (l-len(s))
  318.                 if lerr: line += " (LRC error)"
  319.                 print line
  320.                 if -1 != perr.find("^"): print "  %s <- parity errors" % perr
  321.             if tracks[0]: print_result(1, msr.unpack_raw(s1, msr.track1_map,  6, bpc1))
  322.             if tracks[1]: print_result(2, msr.unpack_raw(s2, msr.track23_map, 4, bpc2))
  323.             if tracks[2]: print_result(3, msr.unpack_raw(s3, msr.track23_map, 4, bpc3))
  324.        
  325.         elif args.read: # iso mode
  326.             s1,s2,s3 = dev.read_tracks()
  327.             if tracks[0]: print "1=%s" % s1
  328.             if tracks[1]: print "2=%s" % s2
  329.             if tracks[2]: print "3=%s" % s3
  330.        
  331.         elif args.write & args.raw:
  332.             d1 = ""
  333.             d2 = ""
  334.             d3 = ""
  335.             if tracks[0]:
  336.                 d1 = msr.pack_raw(data[0], msr.track1_map,  6, bpc1)
  337.             if tracks[1]:
  338.                 d2 = msr.pack_raw(data[1], msr.track23_map, 4, bpc2)
  339.             if tracks[2]:
  340.                 d3 = msr.pack_raw(data[2], msr.track23_map, 4, bpc3)
  341.             dev.write_raw_tracks(d1,d2,d3)
  342.            
  343.         elif args.write: # iso mode
  344.             dev.write_tracks(data[0],data[1],data[2])
  345.        
  346.         elif args.erase:
  347.             dev.erase_tracks(tracks[0],tracks[1],tracks[2])
  348.  
  349.         elif args.loco:
  350.             dev.set_coercivity(msr.loco)
  351.            
  352.         elif args.hico:
  353.             dev.set_coercivity(msr.hico)
  354.  
  355.         elif args.bpi:
  356.             dev.set_bpi(bpi1,bpi2,bpi3)
  357.        
  358.     except Exception as e:
  359.         print e
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement