Guest User

Untitled

a guest
Jun 20th, 2018
89
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.29 KB | None | 0 0
  1. from typing import List, Tuple
  2. from pyspark import SparkContext
  3. from pyspark.sql import SparkSession
  4.  
  5.  
  6. commands = [
  7. ['read'],
  8. ['option', ("inferSchema", "true")],
  9. ['option', ("header", "true")],
  10. ['option', ("dateFormat", "dd/MM/yyyy H:m")],
  11. ['option', ("timestampFormat", "dd/MM/yyyy H:m")],
  12. ['csv', ("retail.csv",)]
  13. ]
  14.  
  15. context = SparkContext()
  16. spark = SparkSession(context)
  17. spark.conf.set("spark.sql.shuffle.partitions", "5")
  18.  
  19.  
  20. def execute_commands(ss: SparkSession, pairs: List[List[str, Tuple]]):
  21. """
  22. Executes commands in a SparkSession from a command list.
  23. Currently assumes that the first command doesn't receive parameters.
  24.  
  25. TODO: Other starters that are not 'read' might receive parameters.
  26. TODO: Other commands that are not 'option' might receive more than two parameters.
  27. """
  28. root = None
  29.  
  30. for pair in pairs:
  31. if root is None:
  32. root = getattr(ss, pair[0])
  33. continue
  34. else:
  35. method = getattr(root, pair[0])
  36. if len(pair[1]) == 1:
  37. root = method(pair[1][0])
  38. elif len(pair[1]) == 2:
  39. root = method(pair[1][0], pair[1][1])
  40. else:
  41. raise ValueError("Must receive len 1 or 2")
  42.  
  43. return root
  44.  
  45.  
  46. print(execute_commands(spark, commands))
Add Comment
Please, Sign In to add comment