Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # duplicates_csv.py - duplicate file report as csv
- import os
- import csv
- import hashlib
- import datetime
- import sqlalchemy as sa
- import sqlalchemy.ext.declarative
- STARTDIR = '.'
- FSENCODING = 'latin-1'
- DBFILE = 'duplicates_csv.sqlite3'
- OUTFILE = 'duplicates.csv'
- engine = sa.create_engine('sqlite:///%s' % DBFILE)
- class File(sa.ext.declarative.declarative_base()):
- __tablename__ = 'file'
- location = sa.Column(sa.Text, primary_key=True)
- md5sum = sa.Column(sa.Text, index=True)
- size = sa.Column(sa.Integer, nullable=False)
- mtime = sa.Column(sa.DateTime, nullable=False)
- # sqlite3 string funcs cannot right extract, denormalize
- name = sa.Column(sa.Text, nullable=False)
- ext = sa.Column(sa.Text, nullable=False)
- @staticmethod
- def getinfo(start, path, encoding=FSENCODING):
- location = os.path.relpath(path, start).decode(encoding).replace('\\', '/')
- name = os.path.basename(path).decode(encoding)
- ext = os.path.splitext(name)[1].lstrip('.')
- statinfo = os.stat(path)
- size = statinfo.st_size
- mtime = datetime.datetime.fromtimestamp(statinfo.st_mtime)
- return {'location': location, 'name': name, 'ext': ext, 'size': size, 'mtime': mtime}
- @classmethod
- def from_path(cls, start, path, encoding=FSENCODING):
- kwargs = cls.getinfo(start, path, encoding=encoding)
- return cls(**kwargs)
- def __repr__(self):
- return '<%s %r>' % (self.__class__.__name__, self.location)
- def md5sum(filename, bufsize=32768):
- md = hashlib.md5()
- with open(filename, 'rb') as fd:
- while True:
- data = fd.read(bufsize)
- if not data:
- break
- md.update(data)
- return md.hexdigest()
- def build_db(engine=engine, start=STARTDIR, recreate=False, verbose=False):
- dbfile = engine.url.database
- if recreate and os.path.exists(dbfile):
- os.remove(dbfile)
- if os.path.exists(dbfile):
- return
- File.metadata.create_all(engine)
- with engine.begin() as conn:
- insert_fileinfos(conn, start, verbose)
- with engine.begin() as conn:
- add_md5sums(conn, start, verbose)
- def insert_fileinfos(conn, start, verbose):
- conn = conn.execution_options(compiled_cache={})
- insert_file = sa.insert(File, bind=conn).execute
- for root, dirs, files in os.walk(start):
- if verbose:
- print(root)
- values = [File.getinfo(start, os.path.join(root, f)) for f in files]
- if values:
- insert_file(values)
- def add_md5sums(conn, start, verbose):
- conn = conn.execution_options(compiled_cache={})
- query = sa.select([File.location])\
- .where(File.size.in_(sa.select([File.size])\
- .group_by(File.size).having(sa.func.count() > 1)))\
- .order_by(File.location)
- update_file = sa.update(File, bind=conn)\
- .where(File.location == sa.bindparam('loc'))\
- .values(md5sum=sa.bindparam('md5sum')).execute
- for location, in conn.execute(query):
- if verbose:
- print(location)
- digest = md5sum(os.path.join(start, location))
- update_file(loc=location, md5sum=digest)
- def duplicates_query(by_location=False):
- query = sa.select([File])\
- .where(File.md5sum.in_(sa.select([File.md5sum])\
- .group_by(File.md5sum).having(sa.func.count() > 1)))
- if by_location:
- query = query.order_by(File.location)
- else:
- query = query.order_by(File.md5sum, File.location)
- return query
- def to_csv(results, filename=OUTFILE, encoding='utf-8', dialect='excel'):
- with open(filename, 'wb') as fd:
- csvwriter = csv.writer(fd, dialect=dialect)
- csvwriter.writerow([k.encode(encoding) for k in results.keys()])
- for row in results:
- csvwriter.writerow([str(c).encode(encoding) for c in row])
- if __name__ == '__main__':
- build_db()
- query = duplicates_query()
- to_csv(engine.execute(query))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement