Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import chess.pgn
- import os, time, random, multiprocessing
- ## Configuration
- THREADS = 16
- INPUT_NAME = "E12.42-Fischer.pgn"
- OUTPUT_NAME = "E12.42-Fischer.book"
- FENS_PER_GAME = 10
- class Visitor(chess.pgn.BaseVisitor):
- def begin_game(self):
- self.fens = []
- def visit_move(self, board, move):
- self.fens.append(board.fen())
- def result(self):
- return self.fens
- def splitPGN():
- # Split games into THREADS-lists
- games = parseGamesFromPGN()
- pieces = [[] for f in range(THREADS)]
- for ii, game in enumerate(games):
- pieces[ii % THREADS].append(game)
- # Output to SPLIT_OUT_N for each piece
- for ii, piece in enumerate(pieces):
- with open("SPLIT_OUT_{0}.pgn".format(ii), "w") as fout:
- for game in piece:
- for line in game:
- fout.write(line)
- time.sleep(5) # Give time to handle files
- def parseGamesFromPGN():
- # Parse each game from the PGN
- games = []; tokens = []; count = 0
- with open(INPUT_NAME, "r") as pgn:
- while True:
- line = pgn.readline()
- if not line: break
- # python-chess expects half/full move counter
- if line.startswith("[FEN"):
- if len(line.split(" ")) <= 6:
- line = line.replace("\"]", " 0 1\"]")
- tokens.append(line)
- # Count empty lines to check for new games
- if line.strip() == "":
- count += 1
- # Second empty line denotes a new game
- if count == 2:
- games.append(tokens[::])
- tokens = []
- count = 0
- return games
- def parseFENSFromPGNS(id):
- accepted = rejected = 0; outputs = []
- # Parse only games from our designated PGN split
- with open("SPLIT_OUT_{0}.pgn".format(id), "r") as pgn:
- # Parse all games from the PGN
- while True:
- # Grab the next game in the PGN
- game = chess.pgn.read_game(pgn)
- if game == None: break
- # Skip PGNs with strange end results (crashes, timelosses, disconnects, ...)
- if "Termination" in game.headers and game.headers["Termination"] != "adjudication":
- print (game.headers["Termination"])
- rejected += 1
- continue
- # Fetch all FENs; discard if too few
- fens = game.accept(Visitor())
- if len(fens) < FENS_PER_GAME:
- print ("Rejected game of length", len(fens))
- rejected += 1; continue
- # Sample FENS_PER_GAME times and save the position
- for fen in random.sample(fens, FENS_PER_GAME):
- outputs.append("{0} {1}\n".format(fen, game.headers["Result"]))
- # No criteria met to skip this game
- accepted += 1;
- # Output FENS to a thread specific file
- with open("SPLIT_PARSE_{0}.fen".format(id), "w") as fout:
- for fen in outputs:
- fout.write(fen)
- # Final stat reporting
- print ("Thread # {0:<2} Accepted {1} Rejected {2}".format(id, accepted, rejected))
- def buildTexelBook():
- splitPGN() # Split main file into THREADS-pieces
- processes = [] # Process for each PGN parser
- # Launch all of the procsses
- for ii in range(THREADS):
- processes.append(
- multiprocessing.Process(
- target=parseFENSFromPGNS, args=(ii,)))
- # Wait for each parser to finish
- for p in processes: p.start()
- for p in processes: p.join()
- # Build final FEN file from process outputs
- os.system("rm {0}".format(OUTPUT_NAME))
- os.system("touch {0}".format(OUTPUT_NAME))
- for ii in range(THREADS):
- os.system("cat SPLIT_PARSE_{0}.fen >> {1}".format(ii, OUTPUT_NAME))
- os.system("rm SPLIT_OUT_{0}.pgn".format(ii))
- os.system("rm SPLIT_PARSE_{0}.fen".format(ii))
- if __name__ == "__main__":
- buildTexelBook()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement