SHARE
TWEET

Untitled

a guest Oct 21st, 2019 60 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. # Data cleansing helper functions
  2. from pyspark.sql import functions as F
  3. from pyspark.sql import Window, DataFrame
  4. from IPython.display import HTML, Markdown
  5. from pyspark.ml.feature import VectorAssembler
  6. import pandas as pd
  7. import matplotlib.pyplot as plt
  8.  
  9. def transform(self, function, *args, **kwargs):
  10.     return function(self, *args, **kwargs)
  11.  
  12. if getattr(DataFrame, 'transform', None) is None:
  13.     DataFrame.transform = transform
  14.  
  15. def removeColumnPrefixes(df, prefixes):
  16.     for col in df.columns:
  17.         for prefix in prefixes:
  18.             if col.startswith(prefix):
  19.                 df = df.withColumnRenamed(col, col.replace(prefix,''))
  20.     return df
  21.    
  22. def renameColumns(df, mapping):
  23.     for ocol, newcol in mapping.items():
  24.         if ocol in df.columns:
  25.             df = df.withColumnRenamed(ocol, newcol)
  26.         else:
  27.             print(f"WARNING: column {ocol} unavailable")
  28.     return df
  29.  
  30. def dropColumns(df, columns):
  31.     return df.select(*[col for col in df.columns if col not in columns])
  32.  
  33. def trimColumns(df, columns):
  34.     for col in columns:
  35.         df = df.withColumn(col,
  36.                            F.when(F.col(col).isNotNull() & (F.trim(F.col(col)) != F.lit('')),
  37.                                   F.trim(F.col(col))))
  38.     return df
  39.  
  40. def trimAllStringColumns(df):
  41.     string_fields = []
  42.     for field in df.columns:
  43.         dataType = df.schema[field].dataType.typeName()
  44.  
  45.         if dataType == 'string':
  46.             string_fields.append(field)
  47.            
  48.     return trimColumns(df, string_fields)
  49.  
  50. def plotStringLengthDistribution(df, field, length_field_name='length', title=None):
  51.     if isinstance(field, str):
  52.         field = F.col(field)
  53.        
  54.     df = (df.select(field)
  55.           .withColumn(length_field_name, F.length(field))
  56.           .groupBy(F.col(length_field_name)).count()
  57.           .orderBy(F.col('count').desc()))
  58.        
  59.     display(df.toPandas().plot(x=length_field_name, kind='bar', title=title))
  60.     plt.show()
  61.    
  62. def findDuplicatesBy(df, fields):
  63.     if not isinstance(fields, list) and not isinstance(fields, tuple):
  64.         fields = [fields]
  65.        
  66.     columns = df.columns
  67.     select_columns = columns + [F.count('*').over(Window.partitionBy(*fields)).alias('record_count')]
  68.     grouping = df.select(select_columns)
  69.     dupes = (df.groupBy(*fields)
  70.                 .agg(F.collect_list(F.struct(*columns)).alias('records'),
  71.                      F.count('*').alias('record_count'))
  72.                  .where('record_count > 1'))
  73.    
  74.     unique = grouping.where('record_count == 1').drop('record_count')
  75.    
  76.     return [dupes, unique]
  77.  
  78. def stringToVector(df, field):
  79.     df = df.select(field).where(df['iddoc_type'] == 'O')
  80.  
  81.     # id structure analysis
  82.     df = (df.select(field, F.posexplode(F.split(field,'')).alias('position', 'letter'))
  83.              .withColumn('letter_pos', F.concat(F.lit('letter'), F.format_string('%02d', F.col('position')))))
  84.  
  85.     df = df.withColumn('letter_type', F.when(F.regexp_extract(F.col('letter'), r'^[a-zA-Z]+$', 0) != '', 2)
  86.                                                  .when(F.regexp_extract(F.col('letter'), r'^[0-9]+$', 0) != '', 1))
  87.  
  88.  
  89.  
  90.     df = df.drop('position','letter').groupBy(field).pivot('letter_pos').agg(F.first('letter_type'))
  91.  
  92.     df = df.withColumn('str_length', F.length(F.col(field)))
  93.  
  94.     for c in df.columns:
  95.         df = df.withColumn(c, F.when(F.col(c).isNotNull(), F.col(c)).otherwise(0))
  96.  
  97.     vect_in = df.drop('iddoc')
  98.     vect_assembler = VectorAssembler(inputCols=vect_in.columns, outputCol='features')
  99.     vect = vect_assembler.transform(vect_in)
  100.    
  101.     return [vect, df]
  102.  
  103. def profile(df, field, find_duplicates=False):
  104.     dataType = df.schema[field].dataType.typeName()
  105.     display(Markdown(f'# Field {field} ({dataType})'))
  106.    
  107.     nullreport = df.select(F.when(F.col(field).isNotNull(), F.lit('Not Null'))
  108.                                    .otherwise(F.lit('Null')).alias('recorded')).groupBy(F.col('recorded')).count()
  109.    
  110.     display(nullreport.toPandas().set_index('recorded').plot(y='count', autopct='%.2f', kind='pie', title='Null Report'))
  111.     plt.axis("off")
  112.     plt.show()
  113.  
  114.     if dataType in ['integer', 'long','float']:
  115.         zerosreport = df.select(F.when(F.col(field).isNotNull() & (F.col(field) != 0), F.lit('Not Zero'))
  116.                                 .otherwise(F.lit('Zero')).alias('recorded')).groupBy(F.col('recorded')).count()
  117.         display(zerosreport.toPandas().set_index('recorded').plot(y='count', autopct='%.2f', kind='pie', title='Zeros Report'))
  118.         plt.axis("off")
  119.         plt.show()
  120.     if dataType == 'string':
  121.         dist = df.groupBy(field).count()
  122.         dist_top = dist.orderBy(F.col('count').desc()).limit(20)
  123.         dist_bottom = dist.orderBy(F.col('count')).limit(20)
  124.         display(dist_top.toPandas().plot(kind='bar', x=field, title='Top 20 values by count'))
  125.         display(dist_bottom.toPandas().plot(kind='bar', x=field, title='Bottom 20 values by count'))
  126.         plt.show()
  127.         plotStringLengthDistribution(df, field, title='Length distribution')
  128.        
  129.     if find_duplicates:
  130.         if dataType == 'string':
  131.             dupes, unique = findDuplicatesBy(df.withColumn(field,F.trim(F.upper(F.col(field)))), field)
  132.         else:
  133.             dupes, unique = findDuplicatesBy(df, field)
  134.  
  135.         dupes = dupes.cache()
  136.  
  137.         unique_count = unique.select(F.lit('Unique').alias('label'), F.count('*').alias('count'))
  138.         dupes_count = dupes.select(F.lit('Duplicate').alias('label'), F.count('*').alias('count'))
  139.  
  140.         dupe_report = unique_count.union(dupes_count).toPandas()
  141.         has_duplicate = False
  142.         for r in dupe_report.to_dict('records'):
  143.             if r['label'] == 'Duplicate' and r['count'] > 0:
  144.                 has_duplicate = True
  145.                
  146.         if has_duplicate:
  147.  
  148.             display(dupe_report.set_index('label').plot(y='count', kind='pie', autopct='%.2f', title='Duplicated Records'))
  149.             plt.axis("off")
  150.             plt.show()
  151.  
  152.             display(dupes.groupBy('record_count').count().orderBy(F.col('count').desc()).limit(20).toPandas().plot(x='record_count', kind='bar', title='Distribution of total duplicated records'))
  153.             plt.show()
  154.  
  155.             display(dupes.select(field, 'record_count').limit(10).toPandas())
  156.             plt.show()
  157.         else:
  158.             print("No duplicates found")
  159.         dupes.unpersist()
  160.  
  161.  
  162.  
  163. def cwsDateToTimestamp(df, fields):
  164.     ts_format = 'yyyyMMddHHmm'
  165.     for field in fields:
  166.         df = df.withColumn(field,
  167.                 ((F.unix_timestamp((F.col(field)/100000).cast('bigint').cast('string'),
  168.                                     ts_format)) * 100000) + (F.col(field) % 100000))
  169.     return df
  170.    
  171. def listEmptyFields(df):
  172.     result = []
  173.     for f in df.columns:
  174.         if df.select(f).where(F.col(f).isNotNull()).distinct().count() == 0:
  175.             result.append(f)
  176.             print(f)
  177.     return result
  178.  
  179.  
  180. def computeFuzzySimilarity(df, id_field, fields,
  181.                           match_column_prefix='matched',
  182.                           distance_column_prefix='levdist'):
  183.    
  184.     df = df.select(id_field, *fields)
  185.    
  186.     columns = df.columns
  187.     match_columns = []
  188.     matched_id_field = f'{match_column_prefix}_{id_field}'
  189.  
  190.     for col in columns:
  191.         match_column_name = f'{match_column_prefix}_{col}'
  192.         match_columns.append(F.col(col).alias(match_column_name))
  193.        
  194.     df2 = df.select(*match_columns)
  195.  
  196.     df = df.join(df2, df[id_field] != df2[matched_id_field])
  197.  
  198.     unique_window = F.concat(
  199.         F.when(F.col(id_field) < F.col(matched_id_field), F.col(id_field)).cast('string'),
  200.         F.lit('-----'),
  201.         F.when(F.col(id_field) < F.col(matched_id_field), F.col(matched_id_field)).cast('string'))
  202.    
  203.     unique_window_name = f'unique_{id_field}'
  204.    
  205.     df = df.select(
  206.         F.row_number().over(Window.partitionBy(unique_window).orderBy(F.col(id_field))).alias(unique_window_name),
  207.         *df.columns
  208.     ).where(F.col(unique_window_name) == 1).drop(unique_window_name)
  209.    
  210.     lev_cols = []
  211.    
  212.     for field in fields:
  213.         match_column_name = f'{match_column_prefix}_{field}'
  214.         distance_column_name = f'{distance_column_prefix}_{field}'
  215.         lev_col = F.levenshtein(F.col(field), F.col(match_column_name)).alias(distance_column_name)
  216.         lev_cols.append(lev_col)
  217.  
  218.     df = df.select(*(df.columns + lev_cols))
  219.  
  220.     return df
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top