Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- #
- # File: msr.py
- import time
- import serial
- class msr(serial.Serial):
- escape_code = "\x1B"
- end_code = "\x1C"
- hico=True
- loco=False
- hibpi=True
- lobpi=False
- track1_map = " !\"#$%&'()*+`,./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_"
- track23_map = "0123456789:;<=>?"
- parity_map = [1,0,0,1,0,1,1,0,0,1,1,0,1,0,0,1,0,1,1,0,1,0,0,1,1,0,0,1,0,1,1,0, \
- 0,1,1,0,1,0,0,1,1,0,0,1,0,1,1,0,1,0,0,1,0,1,1,0,0,1,1,0,1,0,0,1]
- rev6bit_map = [0,32,16,48,8,40,24,56,4,36,20,52,12,44,28,60,2,34,18,50,10,42,26,58,6,38,22,54,14,46,30,62, \
- 1,33,17,49,9,41,25,57,5,37,21,53,13,45,29,61,3,35,19,51,11,43,27,59,7,39,23,55,15,47,31,63]
- def __init__(self, dev_path):
- if dev_path.find("/") == -1: dev_path = "/dev/" + dev_path
- serial.Serial.__init__(self,dev_path,9600,8,serial.PARITY_NONE,timeout=0)
- self.reset()
- def __execute_noresult(self, command):
- self.write(msr.escape_code+command)
- time.sleep(0.1)
- def __execute_waitresult(self, command, timeout=10):
- # execute
- self.flushInput()
- self.write(msr.escape_code+command)
- time.sleep(0.1)
- # get result
- self.timeout=timeout
- result = self.read()
- time.sleep(0.5)
- if result == "": raise Exception("operation timed out")
- self.timeout=0
- result += self.read(1000)
- # parse result : status, result, data
- pos = result.rindex(msr.escape_code)
- return result[pos+1], result[pos+2:], result[0:pos]
- def reset(self):
- self.__execute_noresult("a")
- @staticmethod
- def __decode_isodatablock(data):
- if data[0:4] != msr.escape_code+"s"+msr.escape_code+"\x01":
- raise Exception("bad datablock : don't start with <ESC>s<ESC>[01]", data)
- if data[-2:] != "?"+msr.end_code:
- raise Exception("bad datablock : don't end with ?<FS>", data)
- strip1_start = 4
- strip1_end = data.index(msr.escape_code,strip1_start)
- if strip1_end == strip1_start:
- strip1_end += 2
- strip1 = None
- else:
- strip1 = data[strip1_start:strip1_end]
- strip2_start = strip1_end+2
- if data[strip1_end:strip2_start] != msr.escape_code+"\x02":
- raise Exception("bad datablock : missing <ESC>[02] at position %d" % strip1_end, data)
- strip2_end = data.index(msr.escape_code,strip2_start)
- if strip2_end == strip2_start:
- strip2_end += 2
- strip2 = None
- else:
- strip2 = data[strip2_start:strip2_end]
- # third strip
- strip3_start = strip2_end+2
- if data[strip2_end:strip3_start] != msr.escape_code+"\x03":
- raise Exception("bad datablock : missing <ESC>[03] at position %d" % strip2_end, data)
- if data[strip3_start] == msr.escape_code:
- strip3 = None
- else:
- strip3 = data[strip3_start:-2]
- return strip1, strip2, strip3
- @staticmethod
- def __encode_isodatablock(strip1, strip2, strip3):
- # use empty string if you don't want to set a given strip
- return "\x1bs\x1b\x01"+strip1+"\x1b\x02"+strip2+"\x1b\x03"+strip3+"?\x1C"
- @staticmethod
- def __decode_rawdatablock(data):
- # header
- if data[0:4] != msr.escape_code+"s"+msr.escape_code+"\x01":
- raise Exception("bad datablock : don't start with <ESC>s<ESC>[01]", data)
- # first strip
- strip1_start = 4
- strip1_end = strip1_start + 1 + ord(data[strip1_start]) # first byte is length
- strip1 = data[strip1_start+1:strip1_end]
- # second strip
- strip2_start = strip1_end+2
- if data[strip1_end:strip2_start] != msr.escape_code+"\x02":
- raise Exception("bad datablock : missing <ESC>[02] at position %d" % strip1_end, data)
- strip2_end = strip2_start + 1 + ord(data[strip2_start])
- strip2 = data[strip2_start+1:strip2_end]
- # third strip
- strip3_start = strip2_end+2
- if data[strip2_end:strip3_start] != msr.escape_code+"\x03":
- raise Exception("bad datablock : missing <ESC>[03] at position %d" % strip2_end, data)
- strip3_end = strip3_start + 1 + ord(data[strip3_start])
- strip3 = data[strip3_start+1:strip3_end]
- # trailer
- if data[strip3_end:] != "?"+msr.end_code:
- raise Exception("bad datablock : missing ?<FS> at position %d", strip3_end, data)
- return strip1, strip2, strip3
- @staticmethod
- def __encode_rawdatablock(strip1, strip2, strip3):
- datablock = "\x1bs"
- if strip1 != "":
- datablock += "\x1b\x01"+chr(len(strip1))+strip1
- if strip2 != "":
- datablock += "\x1b\x02"+chr(len(strip2))+strip2
- if strip3 != "":
- datablock += "\x1b\x03"+chr(len(strip3))+strip3
- datablock += "?\x1C"
- return datablock
- return "\x1bs\x1b\x01"+chr(len(strip1))+strip1+"\x1b\x02"+chr(len(strip2))+strip2+"\x1b\x03"+chr(len(strip3))+strip3+"?\x1C"
- @staticmethod
- def pack_raw(data, mapping, bcount_code, bcount_output):
- raw = ""
- lrc = 0
- rem_bits = 0
- rem_count = 0
- for c in data:
- i = mapping.find(c)
- if i==-1: i = 0
- lrc ^= i
- i |= msr.parity_map[i] << bcount_code
- rem_bits |= i << rem_count
- rem_count += bcount_code+1
- if rem_count >= bcount_output:
- raw += chr(rem_bits & ((1<<bcount_output)-1))
- rem_bits >>= bcount_output
- rem_count -= bcount_output
- lrc |= msr.parity_map[i] << bcount_code
- rem_bits |= lrc << rem_count
- rem_count += bcount_code+1
- if rem_count >= bcount_output:
- raw += chr(rem_bits & ((1<<bcount_output)-1))
- rem_bits >>= bcount_output
- rem_count -= bcount_output
- if rem_count > 0:
- raw += chr(rem_bits)
- return raw
- @staticmethod
- def unpack_raw(raw, mapping, bcount_code, bcount_output):
- data = ""
- parity_errors = ""
- rem_bits = 0
- rem_count = 0
- lrc = 0
- last_non_null = -1
- for c in raw:
- rem_count += bcount_output
- rem_bits = (rem_bits << bcount_output) | (ord(c) & ((1<<bcount_output)-1))
- while rem_count >= bcount_code+1:
- rem_count -= bcount_code+1
- i = rem_bits >> rem_count
- rem_bits &= ((1<<rem_count)-1)
- p = i & 0x1 # parity code
- i = msr.rev6bit_map[i>>1] >> (6-bcount_code)
- data += mapping[i]
- if i != 0: last_non_null = len(data)-1
- lrc ^= i
- if msr.parity_map[i] == p:
- parity_errors += " "
- else:
- parity_errors += "^"
- lrc_error = (lrc != 0)
- return data[0:last_non_null+1], len(data), parity_errors[0:last_non_null+1], lrc_error
- def read_tracks(self):
- status, _, data = self.__execute_waitresult("r")
- if status != "0":
- raise Exception("read error : %c" % status)
- return self.__decode_isodatablock(data)
- def read_raw_tracks(self):
- status, _, data = self.__execute_waitresult("m")
- if status != "0":
- raise Exception("read error : %c" % status)
- return self.__decode_rawdatablock(data)
- def write_tracks(self, t1="", t2="", t3=""):
- data = self.__encode_isodatablock(t1,t2,t3)
- status, _, _ = self.__execute_waitresult("w"+data)
- if status != "0":
- raise Exception("write error : %c" % status)
- def write_raw_tracks(self, t1, t2, t3):
- data = self.__encode_rawdatablock(t1,t2,t3)
- status, _, _ = self.__execute_waitresult("n"+data)
- if status != "0":
- raise Exception("write error : %c" % status)
- def erase_tracks(self, t1=False, t2=False, t3=False):
- mask = 0
- if t1: mask |= 1
- if t2: mask |= 2
- if t3: mask |= 4
- status, _, _ = self.__execute_waitresult("c"+chr(mask))
- if status != "0":
- raise Exception("erase error : %c" % status)
- def set_leadingzero(self, track13, track2):
- status, result, _ = self.__execute_waitresult("o"+chr(bpc1)+chr(bpc2)+chr(bpc3))
- if status != "0":
- raise Exception("set_bpc error : %c" % status)
- def set_bpc(self, bpc1, bpc2, bpc3):
- status, result, _ = self.__execute_waitresult("o"+chr(bpc1)+chr(bpc2)+chr(bpc3))
- if status != "0":
- raise Exception("set_bpc error : %c" % status)
- def set_bpi(self, bpi1=None, bpi2=None, bpi3=None):
- modes = []
- if bpi1==True: modes.append("\xA1") # 210bpi
- elif bpi1==False: modes.append("\xA0") # 75bpi
- if bpi2==True: modes.append("\xD2")
- elif bpi2==False: modes.append("\x4B")
- if bpi2==True: modes.append("\xC1")
- elif bpi2==False: modes.append("\xC0")
- for m in modes:
- status, result, _ = self.__execute_waitresult("b"+m)
- if status != "0":
- raise Exception("set_bpi error : %c for %s" % (status,hex(m)))
- def set_coercivity(self, hico):
- if hico:
- status, _, _ = self.__execute_waitresult("x")
- else:
- status, _, _ = self.__execute_waitresult("y")
- if status != "0":
- raise Exception("set_hico error : %c" % status)
- if __name__ == "__main__":
- import argparse
- parser = argparse.ArgumentParser()
- group = parser.add_mutually_exclusive_group(required=True)
- group.add_argument ('-r', '--read', action="store_true", help="read magnetic tracks")
- group.add_argument ('-w', '--write', action="store_true", help="write magnetic tracks")
- group.add_argument ('-e', '--erase', action="store_true", help="erase magnetic tracks")
- group.add_argument ('-C', '--hico', action="store_true", help="select high coercivity mode")
- group.add_argument ('-c', '--loco', action="store_true", help="select low coercivity mode")
- group.add_argument ('-b', '--bpi', help="bit per inch for each track (h or l)")
- parser.add_argument('-d', '--device', help="path to serial communication device")
- parser.add_argument('-0', '--raw', action="store_true", help="do not use ISO encoding/decoding")
- parser.add_argument('-t', '--tracks', default="123", help="select tracks (1, 2, 3, 12, 23, 13, 123)")
- parser.add_argument('-B', '--bpc', help="bit per caracters for each track (5 to 8)")
- parser.add_argument('data', nargs="*", help="(write only) 1, 2 or 3 arguments, matching --tracks")
- args = parser.parse_args();
- if (args.read or args.erase) and len(args.data) != 0 or args.write and (len(args.data) != len(args.tracks)):
- print "too many arguments"
- parser.print_help()
- exit(1)
- tracks = [False, False, False]
- data = ["", "", ""]
- for i in range(0,len(args.tracks)):
- n = int(args.tracks[i])-1
- if(n<0 or n>2 or tracks[n]):
- parser.print_help()
- exit(1)
- tracks[n] = True
- if(args.write):
- data[n] = args.data[i]
- bpc1 = 8
- bpc2 = 8
- bpc3 = 8
- if args.bpc:
- bpc1 = ord(args.bpc[0])-48
- bpc2 = ord(args.bpc[1])-48
- bpc3 = ord(args.bpc[2])-48
- elif args.raw:
- args.bpc = "888" # force setup, as it's kept accross runs
- if args.bpi:
- bpi1 = args.bpi[0] != "l"
- bpi2 = args.bpi[1] != "l"
- bpi3 = args.bpi[2] != "l"
- # main code
- try:
- dev = msr(args.device)
- if args.bpc:
- dev.set_bpc(bpc1,bpc2,bpc3)
- if args.read & args.raw:
- s1,s2,s3 = dev.read_raw_tracks()
- def print_result(num, res):
- s,l,perr,lerr = res
- line = "%d=%s" % (num, s)
- if len(s) != l: line += " (+%d null)" % (l-len(s))
- if lerr: line += " (LRC error)"
- print line
- if -1 != perr.find("^"): print " %s <- parity errors" % perr
- if tracks[0]: print_result(1, msr.unpack_raw(s1, msr.track1_map, 6, bpc1))
- if tracks[1]: print_result(2, msr.unpack_raw(s2, msr.track23_map, 4, bpc2))
- if tracks[2]: print_result(3, msr.unpack_raw(s3, msr.track23_map, 4, bpc3))
- elif args.read: # iso mode
- s1,s2,s3 = dev.read_tracks()
- if tracks[0]: print "1=%s" % s1
- if tracks[1]: print "2=%s" % s2
- if tracks[2]: print "3=%s" % s3
- elif args.write & args.raw:
- d1 = ""
- d2 = ""
- d3 = ""
- if tracks[0]:
- d1 = msr.pack_raw(data[0], msr.track1_map, 6, bpc1)
- if tracks[1]:
- d2 = msr.pack_raw(data[1], msr.track23_map, 4, bpc2)
- if tracks[2]:
- d3 = msr.pack_raw(data[2], msr.track23_map, 4, bpc3)
- dev.write_raw_tracks(d1,d2,d3)
- elif args.write: # iso mode
- dev.write_tracks(data[0],data[1],data[2])
- elif args.erase:
- dev.erase_tracks(tracks[0],tracks[1],tracks[2])
- elif args.loco:
- dev.set_coercivity(msr.loco)
- elif args.hico:
- dev.set_coercivity(msr.hico)
- elif args.bpi:
- dev.set_bpi(bpi1,bpi2,bpi3)
- except Exception as e:
- print e
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement