Advertisement
franji

Untitled

Jan 12th, 2016
349
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 4.12 KB | None | 0 0
  1. // events.json contains one json per line:
  2. //{"utcdate": "2014-11-10 22:23:23", "deviceid": "33028801", "lat": 40.055023, "lon": -74.996469}
  3. //{"utcdate": "2014-11-10 22:23:23", "deviceid": "38241762", "lat": 40.055012, "lon": -74.996447}
  4. //{"utcdate": "2014-11-10 22:23:23", "deviceid": "32740723", "lat": 40.177671, "lon": -74.911994}
  5.  
  6.  
  7. val devpos1 = sqlContext.read.json("file:///Users/talfranji/Dropbox/InterBit/Hadoop/ATT2014/events.json")
  8. devpos1.registerTempTable("devpos1")
  9.  
  10. val devpos = sqlContext.sql("select deviceid, lat, lon, from_utc_timestamp(utcdate, 'UTC') as t from devpos1")
  11. devpos.registerTempTable("devpos")
  12.  
  13. // How many devices in table
  14. val s = "select count(distinct deviceid) from devpos"
  15. sqlContext.sql(s).collect
  16.  
  17. //Plans conatins each device - to which plan. it is tab separated
  18. //33028801  business1
  19. //38241762  kid
  20. //32740723  business1
  21.  
  22. // Create an RDD of Plan objects and register it as a table.
  23. case class Plan(deviceid: String, plan: String)
  24. val plans = sc.textFile("file:///Users/talfranji/Dropbox/InterBit/Hadoop/ATT2014/plans.txt"
  25.      ).map(_.split("\t")).map(p => Plan(p(0), p(1))).toDF()
  26. plans.registerTempTable("plans")
  27.  
  28. // active devices by plan
  29. val s = """select
  30. count(distinct devpos.deviceid), plans.plan
  31. from plans
  32.  join devpos on (devpos.deviceid = plans.deviceid)
  33. group by plans.plan"""
  34. sqlContext.sql(s).collect.foreach(println)
  35.  
  36. // time limit -
  37. val s = "select count(distinct deviceid) from devpos where t >= '2014-11-10' and t <'2014-11-11'"
  38. sqlContext.sql(s).collect.foreach(println)
  39.  
  40.  
  41. // qutization
  42. // 1 lat =~ 110Km  lat*110 * 10 - you get resolution of 100m
  43. // 1 lon =~ 100Km if we are in lat==32 so lon *100 *10 get you to resolution of 100m
  44.  
  45. val s = """select
  46. max(deviceid), avg(lat), avg(lon) from devpos
  47. where (cast(deviceid as int) % 10) = 3 group by
  48. round(lat *110 * 10),  round(lon*100 *10)"""
  49. // try *10000 - 10m
  50. sqlContext.sql(s).take(10).foreach(println)
  51.  
  52.  
  53. //##### Areas/Places where a user spent more than an hour.
  54.  
  55. //# NOT good:
  56. //select min(t), max(t), max(deviceid), avg(lat), avg(lon)
  57. // from devpos
  58. // where (deviceid % 10) = 3
  59. // group by round(lat *1000),  round(lon*1000);
  60.  
  61.  
  62. //# This can be done with Oracle analytical functions. Very powerfull tool!
  63.  
  64.  
  65. //#inner query - just quantify
  66. val s1 = "select t, deviceid, lat, lon, round(lat *110 * 100) as qlat,  round(lon*100 * 10) as qlon from devpos"
  67.  
  68. //##inner 2 - add the prev location to each record
  69. val s2 = """select t, deviceid, qlat, qlon,
  70.                     lag(qlat,1,0) over (partition by deviceid order by t) as prev_qlat,
  71.                      lag(qlon,1,0) over (partition by deviceid order by t) as prev_qlon
  72. from
  73. (select t, deviceid, lat, lon, round(lat *110 *10) as qlat,  round(lon*100* 100) as qlon from devpos) qdevpos
  74. order by deviceid,t"""
  75.  
  76.  
  77. //###inner 3 - find only "new" locations
  78. val s3 = """select
  79. t, deviceid, qlat,qlon, prev_qlat, prev_qlon
  80. from (
  81.  select t, deviceid, qlat, qlon,
  82.         lag(qlat,1,0) over (partition by deviceid order by t) as prev_qlat,
  83.         lag(qlon,1,0) over (partition by deviceid order by t) as prev_qlon
  84.    from
  85.    (select t, deviceid, lat, lon, round(lat *110 *10) as qlat,  round(lon*100 *10) as qlon from devpos) qdevpos
  86. ) poschg
  87.  where qlat <> prev_qlat and qlon<> prev_qlon
  88.  order by deviceid, t"""
  89.  
  90. sqlContext.sql(s3).take(10).foreach(println)
  91.  
  92.  
  93. // User defined functions
  94. // Given a latitude - what is the distance between two longtitude units, in Km
  95. val LonDistAtLat = (lat : Double) => {
  96.     math.cos(math.abs(lat) / 90.0) * 111.321;
  97. }
  98.  
  99. sqlContext.udf.register("LonDistAtLat", LonDistAtLat)
  100.  
  101. val s4 = """select
  102. t, deviceid, qlat,qlon, prev_qlat, prev_qlon
  103. from (
  104.  select t, deviceid, qlat, qlon,
  105.         lag(qlat,1,0) over (partition by deviceid order by t) as prev_qlat,
  106.         lag(qlon,1,0) over (partition by deviceid order by t) as prev_qlon
  107.    from
  108.    (select t, deviceid, lat, lon, round(lat *110 *10) as qlat,  round(lon*LonDistAtLat(lat) *10) as qlon from devpos) qdevpos
  109. ) poschg
  110.  where qlat <> prev_qlat and qlon<> prev_qlon
  111.  order by deviceid, t"""
  112.  
  113. sqlContext.sql(s4).take(10).foreach(println)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement