Advertisement
ArcheontPB

PySpark

Mar 1st, 2020
310
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 20.66 KB | None | 0 0
  1. Master-
  2. Workers-
  3. Spark's core data structure is the Resilient Distributed Dataset (RDD). This is a low level object that lets Spark work its magic by splitting data across multiple nodes in the cluster. However, RDDs are hard to work with directly, so in this course you'll be using the Spark DataFrame abstraction built on top of RDDs.
  4.  
  5. You can think of the SparkContext as your connection to the cluster and the SparkSession as your interface with that connection.
  6.  
  7. We've already created a SparkSession for you called spark, but what if you're not sure there already is one? Creating multiple SparkSessions and SparkContexts can cause issues, so it's best practice to use the SparkSession.builder.getOrCreate() method. This returns an existing SparkSession if there's already one in the environment, or creates a new one if necessary!
  8.  
  9. # Filter flights by passing a string
  10. long_flights1 = flights.filter("distance > 1000")
  11.  
  12. # Filter flights by passing a column of boolean values
  13. long_flights2 = flights.filter(flights.distance > 1000)
  14.  
  15. # Print the data to check they're equal
  16. long_flights1.show()
  17. long_flights2.show()
  18.  
  19. The difference between .select() and .withColumn() methods is that .select() returns only the columns you specify, while .withColumn() returns all the columns of the DataFrame in addition to the one you defined.
  20.  
  21. # Select the first set of columns
  22. selected1 = flights.select("tailnum", "origin", "dest")
  23.  
  24. # Select the second set of columns
  25. temp = flights.select(flights.origin, flights.dest, flights.carrier)
  26.  
  27. # Define first filter
  28. filterA = flights.origin == "SEA"
  29.  
  30. # Define second filter
  31. filterB = flights.dest == "PDX"
  32.  
  33. # Filter the data, first by filterA then by filterB
  34. selected2 = temp.filter(filterA).filter(filterB)
  35.  
  36. # Define avg_speed
  37. avg_speed = (flights.distance/(flights.air_time/60)).alias("avg_speed")
  38.  
  39. # Select the correct columns
  40. speed1 = flights.select("origin", "dest", "tailnum", avg_speed)
  41.  
  42. # Create the same table using a SQL expression
  43. speed2 = flights.selectExpr("origin", "dest", "tailnum", "distance/(air_time/60) as avg_speed")
  44.  
  45. # Find the shortest flight from PDX in terms of distance
  46. flights.filter(flights.origin == 'PDX').groupBy().min('distance').show()
  47.  
  48. # Find the longest flight from SEA in terms of air time
  49. flights.filter(flights.origin == 'SEA').groupBy().max('air_time').show()
  50.  
  51. # Group by tailnum
  52. by_plane = flights.groupBy("tailnum")
  53.  
  54. # Number of flights each plane made
  55. by_plane.count().show()
  56.  
  57. # Group by origin
  58. by_origin = flights.groupBy("origin")
  59.  
  60. # Average duration of flights from PDX and SEA
  61. by_origin.avg("air_time").show()
  62.  
  63. # Import pyspark.sql.functions as F
  64. import pyspark.sql.functions as F
  65.  
  66. # Group by month and dest
  67. by_month_dest = flights.groupBy("month", "dest")
  68.  
  69. # Average departure delay by month and destination
  70. by_month_dest.avg("dep_delay").show()
  71.  
  72. # Standard deviation of departure delay
  73. by_month_dest.agg(F.stddev("dep_delay")).show()
  74.  
  75. # Examine the data
  76. print(airports.show())
  77.  
  78. # Rename the faa column
  79. airports = airports.withColumnRenamed("faa", "dest")
  80.  
  81. # Join the DataFrames
  82. flights_with_airports = flights.join(airports, on="dest", how="leftouter")
  83.  
  84. # Examine the new DataFrame
  85. print(flights_with_airports.show())
  86.  
  87. # Rename year column
  88. planes = planes.withColumnRenamed("year", "plane_year")
  89.  
  90. # Join the DataFrames
  91. model_data = flights.join(planes, on="tailnum", how="leftouter")
  92.  
  93. Good work! Before you get started modeling, it's important to know that Spark only handles numeric data. That means all of the columns in your DataFrame must be either integers or decimals (called 'doubles' in Spark).
  94.  
  95. When we imported our data, we let Spark guess what kind of information each column held. Unfortunately, Spark doesn't always guess right and you can see that some of the columns in our DataFrame are strings containing numbers as opposed to actual numeric values.
  96.  
  97. To remedy this, you can use the .cast() method in combination with the .withColumn() method. It's important to note that .cast() works on columns, while .withColumn() works on DataFrames.
  98.  
  99. # Cast the columns to integers
  100. model_data = model_data.withColumn("arr_delay", model_data.arr_delay.cast("integer"))
  101. model_data = model_data.withColumn("air_time", model_data.air_time.cast("integer"))
  102. model_data = model_data.withColumn("month", model_data.month.cast("integer"))
  103. model_data = model_data.withColumn("plane_year", model_data.plane_year.cast("integer"))
  104.  
  105. # Create the column plane_age
  106. model_data = model_data.withColumn("plane_age", model_data.year - model_data.plane_year)
  107.  
  108. # Create is_late
  109. model_data = model_data.withColumn("is_late", model_data.arr_delay > 0)
  110.  
  111. # Convert to an integer
  112. model_data = model_data.withColumn("label", model_data.is_late.cast("integer"))
  113.  
  114. # Remove missing values
  115. model_data = model_data.filter("arr_delay is not NULL and dep_delay is not NULL and air_time is not NULL and plane_year is not NULL")
  116.  
  117. # Create a StringIndexer
  118. carr_indexer = StringIndexer(inputCol="carrier", outputCol="carrier_index")
  119.  
  120. # Create a OneHotEncoder
  121. carr_encoder = OneHotEncoder(inputCol="carrier_index", outputCol="carrier_fact")
  122.  
  123. # Create a StringIndexer
  124. dest_indexer = StringIndexer(inputCol="dest", outputCol="dest_index")
  125.  
  126. # Create a OneHotEncoder
  127. dest_encoder = OneHotEncoder(inputCol="dest_index", outputCol="dest_fact")
  128.  
  129. # Make a VectorAssembler
  130. vec_assembler = VectorAssembler(inputCols=["month", "air_time", "carrier_fact", "dest_fact", "plane_age"], outputCol="features")
  131.  
  132. # Import Pipeline
  133. from pyspark.ml import Pipeline
  134.  
  135. # Make the pipeline
  136. flights_pipe = Pipeline(stages=[dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler])
  137.  
  138. # Fit and transform the data
  139. piped_data = flights_pipe.fit(model_data).transform(model_data)
  140.  
  141. # Split the data into training and test sets
  142. training, test = piped_data.randomSplit([.6, .4])
  143.  
  144. You'll tune this model by testing different values for several hyperparameters. A hyperparameter is just a value in the model that's not estimated from the data, but rather is supplied by the user to maximize performance. For this course it's not necessary to understand the mathematics behind all of these values - what's important is that you'll try out a few different choices and pick the best one.
  145.  
  146. # Import LogisticRegression
  147. from pyspark.ml.classification import LogisticRegression
  148.  
  149. # Create a LogisticRegression Estimator
  150. lr = LogisticRegression()
  151.  
  152. # Import the evaluation submodule
  153. import pyspark.ml.evaluation as evals
  154.  
  155. # Create a BinaryClassificationEvaluator
  156. evaluator = evals.BinaryClassificationEvaluator(metricName="areaUnderROC")
  157.  
  158. # Import the tuning submodule
  159. import pyspark.ml.tuning as tune
  160.  
  161. # Create the parameter grid
  162. grid = tune.ParamGridBuilder()
  163.  
  164. # Add the hyperparameter
  165. grid = grid.addGrid(lr.regParam, np.arange(0, .1, .01))
  166. grid = grid.addGrid(lr.elasticNetParam, [0, 1])
  167.  
  168. # Build the grid
  169. grid = grid.build()
  170.  
  171. # Create the CrossValidator
  172. cv = tune.CrossValidator(estimator=lr,
  173. estimatorParamMaps=grid,
  174. evaluator=evaluator
  175. )
  176.  
  177. # Call lr.fit()
  178. best_lr = lr.fit(training)
  179.  
  180. # Print best_lr
  181. print(best_lr)
  182.  
  183. # Use the model to predict the test set
  184. test_results = best_lr.transform(test)
  185.  
  186. # Evaluate the predictions
  187. print(evaluator.evaluate(test_results))
  188.  
  189. # Import the pyspark.sql.types library
  190. from pyspark.sql.types import *
  191.  
  192. # Define a new schema using the StructType method
  193. people_schema = StructType([
  194. # Define a StructField for each field
  195. StructField('name', StringType(), False),
  196. StructField('age', IntegerType(), False),
  197. StructField('city', StringType(), False)
  198. ])
  199.  
  200. # Load the CSV file
  201. aa_dfw_df = spark.read.format('csv').options(Header=True).load('AA_DFW_2018.csv.gz')
  202.  
  203. # Add the airport column using the F.lower() method
  204. aa_dfw_df = aa_dfw_df.withColumn('airport', F.lower(aa_dfw_df['Destination Airport']))
  205.  
  206. # Drop the Destination Airport column
  207. aa_dfw_df = aa_dfw_df.drop(aa_dfw_df['Destination Airport'])
  208.  
  209. # Show the DataFrame
  210. print(aa_dfw_df.show())
  211.  
  212. # View the row count of df1 and df2
  213. print("df1 Count: %d" % df1.count())
  214. print("df2 Count: %d" % df2.count())
  215.  
  216. # Combine the DataFrames into one
  217. df3 = df1.union(df2)
  218.  
  219. # Save the df3 DataFrame in Parquet format
  220. df3.write.parquet('AA_DFW_ALL.parquet', mode='overwrite')
  221.  
  222. # Read the Parquet file into a new DataFrame and run a count
  223. print(spark.read.parquet('AA_DFW_ALL.parquet').count())
  224.  
  225. # Read the Parquet file into flights_df
  226. flights_df = spark.read.parquet('AA_DFW_ALL.parquet')
  227.  
  228. # Register the temp table
  229. flights_df.createOrReplaceTempView('flights')
  230.  
  231. # Run a SQL query of the average flight duration
  232. avg_duration = spark.sql('SELECT avg(flight_duration) from flights').collect()[0]
  233. print('The average flight time is: %d' % avg_duration)
  234.  
  235. # Show the distinct VOTER_NAME entries
  236. voter_df.select('VOTER_NAME').distinct().show(40, truncate=False)
  237.  
  238. # Filter voter_df where the VOTER_NAME is 1-20 characters in length
  239. voter_df = voter_df.filter('length(VOTER_NAME) > 0 and length(VOTER_NAME) < 20')
  240.  
  241. # Filter out voter_df where the VOTER_NAME contains an underscore
  242. voter_df = voter_df.filter(~ F.col('VOTER_NAME').contains('_'))
  243.  
  244. # Show the distinct VOTER_NAME entries again
  245. voter_df.select('VOTER_NAME').distinct().show(40, truncate=False)
  246.  
  247. # Add a new column called splits separated on whitespace
  248. voter_df = voter_df.withColumn('splits', F.split(voter_df.VOTER_NAME, '\s+'))
  249.  
  250. # Create a new column called first_name based on the first item in splits
  251. voter_df = voter_df.withColumn('first_name', voter_df.splits.getItem(0))
  252.  
  253. # Get the last entry of the splits list and create a column called last_name
  254. voter_df = voter_df.withColumn('last_name', voter_df.splits.getItem(F.size('splits') - 1))
  255.  
  256. # Drop the splits column
  257. voter_df = voter_df.drop('splits')
  258.  
  259. # Show the voter_df DataFrame
  260. voter_df.show()
  261.  
  262. # Add a column to voter_df for any voter with the title **Councilmember**
  263. voter_df = voter_df.withColumn('random_val',
  264. F.when(voter_df.TITLE == 'Councilmember', F.rand()))
  265.  
  266. # Show some of the DataFrame rows, noting whether the when clause worked
  267. voter_df.show()
  268.  
  269. # Add a column to voter_df for a voter based on their position
  270. voter_df = voter_df.withColumn('random_val',
  271. when(voter_df.TITLE == 'Councilmember', F.rand())
  272. .when(voter_df.TITLE == 'Mayor', 2)
  273. .otherwise(0))
  274.  
  275. # Show some of the DataFrame rows
  276. voter_df.show()
  277.  
  278. # Use the .filter() clause with random_val
  279. voter_df.filter(voter_df.random_val == 0).show()
  280.  
  281. def getFirstAndMiddle(names):
  282. # Return a space separated string of names
  283. return ' '.join(names[:-1])
  284.  
  285. # Define the method as a UDF
  286. udfFirstAndMiddle = F.udf(getFirstAndMiddle, StringType())
  287.  
  288. # Create a new column using your UDF
  289. voter_df = voter_df.withColumn('first_and_middle_name', udfFirstAndMiddle(voter_df.splits))
  290.  
  291. # Drop the unnecessary columns then show the DataFrame
  292. voter_df = voter_df.drop('first_name')
  293. voter_df = voter_df.drop('splits')
  294. voter_df.show()
  295.  
  296. # Select all the unique council voters
  297. voter_df = df.select(df["VOTER NAME"]).distinct()
  298.  
  299. # Count the rows in voter_df
  300. print("\nThere are %d rows in the voter_df DataFrame.\n" % voter_df.count())
  301.  
  302. # Add a ROW_ID
  303. voter_df = voter_df.withColumn('ROW_ID', F.monotonically_increasing_id())
  304.  
  305. # Show the rows with 10 highest IDs in the set
  306. voter_df.orderBy(voter_df.ROW_ID.desc()).show(10)
  307.  
  308. # Print the number of partitions in each DataFrame
  309. print("\nThere are %d partitions in the voter_df DataFrame.\n" % voter_df.rdd.getNumPartitions())
  310. print("\nThere are %d partitions in the voter_df_single DataFrame.\n" % voter_df_single.rdd.getNumPartitions())
  311.  
  312. # Add a ROW_ID field to each DataFrame
  313. voter_df = voter_df.withColumn('ROW_ID', F.monotonically_increasing_id())
  314. voter_df_single = voter_df_single.withColumn('ROW_ID', F.monotonically_increasing_id())
  315.  
  316. # Show the top 10 IDs in each DataFrame
  317. voter_df.orderBy(voter_df.ROW_ID.desc()).show(10)
  318. voter_df_single.orderBy(voter_df_single.ROW_ID.desc()).show(10)
  319.  
  320. # Determine the highest ROW_ID and save it in previous_max_ID
  321. previous_max_ID = voter_df_march.select('ROW_ID').rdd.max()[0]
  322.  
  323. # Add a ROW_ID column to voter_df_april starting at the desired value
  324. voter_df_april = voter_df_april.withColumn('ROW_ID', F.monotonically_increasing_id() + previous_max_ID)
  325.  
  326. # Show the ROW_ID from both DataFrames and compare
  327. voter_df_march.select('ROW_ID').show()
  328. voter_df_april.select('ROW_ID').show()
  329.  
  330. start_time = time.time()
  331.  
  332. # Add caching to the unique rows in departures_df
  333. departures_df = departures_df.distinct().cache()
  334.  
  335. # Count the unique rows in departures_df, noting how long the operation takes
  336. print("Counting %d rows took %f seconds" % (departures_df.count(), time.time() - start_time))
  337.  
  338. # Count the rows again, noting the variance in time of a cached DataFrame
  339. start_time = time.time()
  340. print("Counting %d rows again took %f seconds" % (departures_df.count(), time.time() - start_time))
  341.  
  342. # Determine if departures_df is in the cache
  343. print("Is departures_df cached?: %s" % departures_df.is_cached)
  344. print("Removing departures_df from cache")
  345.  
  346. # Remove departures_df from the cache
  347. departures_df.unpersist()
  348.  
  349. # Check the cache status again
  350. print("Is departures_df cached?: %s" % departures_df.is_cached)
  351.  
  352. # Import the full and split files into DataFrames
  353. full_df = spark.read.csv('departures_full.txt.gz')
  354. split_df = spark.read.csv('departures_*.txt.gz')
  355.  
  356. # Print the count and run time for each DataFrame
  357. start_time_a = time.time()
  358. print("Total rows in full DataFrame:\t%d" % full_df.count())
  359. print("Time to run: %f" % (time.time() - start_time_a))
  360.  
  361. start_time_b = time.time()
  362. print("Total rows in split DataFrame:\t%d" % split_df.count())
  363. print("Time to run: %f" % (time.time() - start_time_b))
  364.  
  365. # Name of the Spark application instance
  366. app_name = spark.conf.get('spark.app.name')
  367.  
  368. # Driver TCP port
  369. driver_tcp_port = spark.conf.get('spark.driver.port')
  370.  
  371. # Number of join partitions
  372. num_partitions = spark.conf.get('spark.sql.shuffle.partitions')
  373.  
  374. # Show the results
  375. print("Name: %s" % app_name)
  376. print("Driver TCP port: %s" % driver_tcp_port)
  377. print("Number of partitions: %s" % num_partitions)
  378.  
  379. # Store the number of partitions in variable
  380. before = departures_df.rdd.getNumPartitions()
  381.  
  382. # Configure Spark to use 500 partitions
  383. spark.conf.set('spark.sql.shuffle.partitions', 500)
  384.  
  385. # Recreate the DataFrame using the departures data file
  386. departures_df = spark.read.csv('departures.txt.gz').distinct()
  387.  
  388. # Print the number of partitions for each instance
  389. print("Partition count before change: %d" % before)
  390. print("Partition count after change: %d" % departures_df.rdd.getNumPartitions())
  391.  
  392. # Join the flights_df and aiports_df DataFrames
  393. normal_df = flights_df.join(airports_df, \
  394. flights_df["Destination Airport"] == airports_df["IATA"] )
  395.  
  396. # Show the query plan
  397. normal_df.explain()
  398.  
  399. # Import the broadcast method from pyspark.sql.functions
  400. from pyspark.sql.functions import broadcast
  401.  
  402. # Join the flights_df and airports_df DataFrames using broadcasting
  403. broadcast_df = flights_df.join(broadcast(airports_df), \
  404. flights_df["Destination Airport"] == airports_df["IATA"] )
  405.  
  406. # Show the query plan and compare against the original
  407. broadcast_df.explain()
  408.  
  409. start_time = time.time()
  410. # Count the number of rows in the normal DataFrame
  411. normal_count = normal_df.count()
  412. normal_duration = time.time() - start_time
  413.  
  414. start_time = time.time()
  415. # Count the number of rows in the broadcast DataFrame
  416. broadcast_count = broadcast_df.count()
  417. broadcast_duration = time.time() - start_time
  418.  
  419. # Print the counts and the duration of the tests
  420. print("Normal count:\t\t%d\tduration: %f" % (normal_count, normal_duration))
  421. print("Broadcast count:\t%d\tduration: %f" % (broadcast_count, broadcast_duration))
  422. #############################################################
  423. # Import the data to a DataFrame
  424. departures_df = spark.read.csv('2015-departures.csv.gz', header=True)
  425.  
  426. # Remove any duration of 0
  427. departures_df = departures_df.filter(departures_df[3] != '0')
  428.  
  429. # Add an ID column
  430. departures_df = departures_df.withColumn('id', F.monotonically_increasing_id())
  431.  
  432. # Write the file out to JSON format
  433. departures_df.write.json('output.json')
  434. ###########################################################
  435.  
  436. # Import the file to a DataFrame and perform a row count
  437. annotations_df = spark.read.csv('annotations.csv.gz', sep='|')
  438. full_count = annotations_df.count()
  439.  
  440. # Count the number of rows beginning with '#'
  441. comment_count = annotations_df.filter(col('_c0').startswith('#')).count()
  442.  
  443. # Import the file to a new DataFrame, without commented rows
  444. no_comments_df = spark.read.csv('annotations.csv.gz', sep='|', comment='#')
  445.  
  446. # Count the new DataFrame and verify the difference is as expected
  447. no_comments_count = no_comments_df.count()
  448. print("Full count: %d\nComment count: %d\nRemaining count: %d" % (full_count, comment_count, no_comments_count))
  449.  
  450. # Split _c0 on the tab character and store the list in a variable
  451. tmp_fields = F.split(annotations_df['_c0'], '\t')
  452.  
  453. # Create the colcount column on the DataFrame
  454. annotations_df = annotations_df.withColumn('colcount', F.size(tmp_fields))
  455.  
  456. # Remove any rows containing fewer than 5 fields
  457. annotations_df_filtered = annotations_df.filter(~ (annotations_df['colcount'] < 5))
  458.  
  459. # Count the number of rows
  460. final_count = annotations_df_filtered.count()
  461. print("Initial count: %d\nFinal count: %d" % (initial_count, final_count))
  462.  
  463. # Split the content of _c0 on the tab character (aka, '\t')
  464. split_cols = F.split(annotations_df['_c0'], '\t')
  465.  
  466. # Add the columns folder, filename, width, and height
  467. split_df = annotations_df.withColumn('folder', split_cols.getItem(0))
  468. split_df = split_df.withColumn('filename', split_cols.getItem(1))
  469. split_df = split_df.withColumn('width', split_cols.getItem(2))
  470. split_df = split_df.withColumn('height', split_cols.getItem(3))
  471.  
  472. # Add split_cols as a column
  473. split_df = split_df.withColumn('split_cols', split_cols)
  474.  
  475. def retriever(cols, colcount):
  476. # Return a list of dog data
  477. return cols[4:colcount]
  478.  
  479. # Define the method as a UDF
  480. udfRetriever = F.udf(retriever, ArrayType(StringType()))
  481.  
  482. # Create a new column using your UDF
  483. split_df = split_df.withColumn('dog_list', udfRetriever(split_df.split_cols, split_df.colcount))
  484.  
  485. # Remove the original column, split_cols, and the colcount
  486. split_df = split_df.drop('dog_list').drop('split_cols').drop('colcount')
  487.  
  488. # Rename the column in valid_folders_df
  489. valid_folders_df = valid_folders_df.withColumnRenamed('_c0', 'folder')
  490.  
  491. # Count the number of rows in split_df
  492. split_count = split_df.count()
  493.  
  494. # Join the DataFrames
  495. joined_df = split_df.join(F.broadcast(valid_folders_df), "folder")
  496.  
  497. # Compare the number of rows remaining
  498. joined_count = joined_df.count()
  499. print("Before: %d\nAfter: %d" % (split_count, joined_count))
  500.  
  501. # Determine the row counts for each DataFrame
  502. split_count = split_df.count()
  503. joined_count = joined_df.count()
  504.  
  505. # Create a DataFrame containing the invalid rows
  506. invalid_df = split_df.join(F.broadcast(joined_df), 'folder', 'left_anti')
  507.  
  508. # Validate the count of the new DataFrame is as expected
  509. invalid_count = invalid_df.count()
  510. print(" split_df:\t%d\n joined_df:\t%d\n invalid_df: \t%d" % (split_count, joined_count, invalid_count))
  511.  
  512. # Determine the number of distinct folder rows removed
  513. invalid_folder_count = invalid_df.select('folder').distinct().count()
  514. print("%d distinct invalid folders found" % invalid_folder_count)
  515.  
  516. # Create a function to return the number and type of dogs as a tuple
  517. def dogParse(doglist):
  518. dogs = []
  519. for dog in doglist:
  520. (breed, start_x, start_y, end_x, end_y) = dog.split(',')
  521. dogs.append((breed, int(start_x), int(start_y), int(end_x), int(end_y)))
  522. return dogs
  523.  
  524. # Create a UDF
  525. udfDogParse = F.udf(dogParse, ArrayType(DogType))
  526.  
  527. # Use the UDF to list of dogs and drop the old column
  528. joined_df = joined_df.withColumn('dogs', udfDogParse('dog_list')).drop('dog_list')
  529.  
  530. # Show the number of dogs in the first 10 rows
  531. joined_df.select(F.size('dogs')).show(10)
  532.  
  533. # Define a UDF to determine the number of pixels per image
  534. def dogPixelCount(doglist):
  535. totalpixels = 0
  536. for dog in doglist:
  537. totalpixels += (dog[3] - dog[1]) * (dog[4] - dog[2])
  538. return totalpixels
  539.  
  540. # Define a UDF for the pixel count
  541. udfDogPixelCount = F.udf(dogPixelCount, IntegerType())
  542. joined_df = joined_df.withColumn('dog_pixels', udfDogPixelCount(joined_df.dogs))
  543.  
  544. # Create a column representing the percentage of pixels
  545. joined_df = joined_df.withColumn('dog_percent', (joined_df.dog_pixels / (joined_df.width.cast("integer") * joined_df.height.cast("integer"))) * 100)
  546.  
  547. # Show the first 10 annotations with more than 60% dog
  548. joined_df.filter(joined_df.dog_percent > 60).show(10)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement