Advertisement
Guest User

Untitled

a guest
Apr 29th, 2016
61
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.31 KB | None | 0 0
  1. Heatmaps Service
  2. ================
  3.  
  4. We seem to frequently duplicate work with regards to reading events and
  5. building heatmaps based on that data. The purpose of this is to build a generic
  6. and modifiable service which can handle it all.
  7.  
  8. Current requirements
  9. --------------------
  10.  
  11. + Listen to `allocationOutcome` events, where outcome was `failed` (perhaps
  12. `cancelled` as well). Plot each of these by lat/lng and base the 'heat' on
  13. the number of failed allocations in that area.
  14. + Listen to `nearDrivers` events, store an average at that location
  15.  
  16. Nice to haves?
  17. --------------
  18.  
  19. + Build a heatmap of pmuh points
  20. + Build a heatmap of destinations
  21.  
  22. Example API?
  23. ------------
  24.  
  25. ```go
  26. strandedPassengersRecorder := heatmap.NewRecorder(
  27.  
  28. // The event name to record on
  29. "allocationOutcome",
  30.  
  31. // The recorder options, needs to be flexible enough to cater for future
  32. // requirements
  33. &heatmap.RecorderOptions{
  34.  
  35. // Validation function as an option
  36. Validation: func(ev map[string]string) bool {
  37.  
  38. lat := ev["pickupLat"]
  39. lng := ev["pickupLng"]
  40. out := ev["outcome"]
  41.  
  42. if out == "allocated" {
  43. return false
  44. }
  45.  
  46. latCnv, err := strconv.ParseFloat(lat, 64)
  47. if err != nil {
  48. return false
  49. }
  50.  
  51. lngCnv, err := strconv.ParseFloat(lng, 64)
  52. if err != nil {
  53. return false
  54. }
  55.  
  56. if latCnv == 0.0 && lngCnv == 0.0 {
  57. return false
  58. }
  59.  
  60. return true
  61. },
  62.  
  63. // The fields to use for the lat/lng
  64. LatField: "pickupLat",
  65. LngField: "pickupLng",
  66.  
  67. // The field to use for the hob
  68. HobField: "city",
  69.  
  70. // Retrieve the time, this gives us flexibilty to use the current time,
  71. // perform any TZ conversion, validate the timestamp etc.
  72. Time: func(ev map[string]string) (time.Time, error) {
  73. timestamp := ev["timestamp"]
  74. tCnv, err := strconv.Atoi(timestamp)
  75. if err != nil {
  76. // Falling back to time.Now() in this case, could error otherwise
  77. return time.Now()
  78. }
  79.  
  80. return time.Unix(tCnv, 0)
  81. },
  82.  
  83. TimeBucket: heatmap.BucketHour, // Can include BucketDay, BucketTen, BucketMinute
  84. TimeExpiry: heatmap.ExpiryMonth, // Expire after a month
  85.  
  86. // What to record? This is the values which will be pushed through the
  87. // combiner in an external step
  88. //
  89. // That means if we want to increment 'a' by 1, we call 'a' 1. Then the
  90. // combiner will do a + aʹ
  91. Record: func(ev map[string]string) (aʹ, bʹ, cʹ, dʹ float64, err error) {
  92.  
  93. // In this case, we simply want to record a count which we increment
  94. return 1, 0, 0, 0 // (the combiner will just sum the 1)
  95.  
  96. /*
  97. // In the ETA situation, we might do:
  98. eta, err := strconv.ParseFloat(ev["eta"])
  99. if err != nil {
  100. return
  101. }
  102.  
  103. return eta, eta, eta, 1, nil
  104.  
  105. // The combiner would then look like:
  106. //
  107. // This would keep track of the minimum, maximum, total entries and
  108. // total sum of etas
  109. Combine: func(a, b, c, d, aʹ, bʹ, cʹ, dʹ float64) (aʹʹ, bʹʹ, cʹʹ, dʹʹ float64) {
  110. return math.Min(a, aʹ), math.Max(b, bʹ), c + cʹ, d + dʹ
  111. }
  112. */
  113. },
  114.  
  115. // Teach how to combine, in this case we just want to sum both a's
  116. //
  117. // If we know how to combine, then we know how to merge many rows together,
  118. // and also keep track of certain totals within rows
  119. Combine: func(a, b, c, d, aʹ, bʹ, cʹ, dʹ float64) (aʹʹ, bʹʹ, cʹʹ, dʹʹ float64) {
  120. return a + aʹ, 0, 0, 0
  121. },
  122.  
  123. // Suppose we want more recent results to be more important than older
  124. // ones, we could apply some simple dropoff (create a dropoff function which
  125. // returns a scalar). We will know a rough date of a batch of results (so
  126. // that we can expire them) and so this should be feasible.
  127. Dropoff: func(in float64, t time.Time) float64 {
  128. since := time.Since(t).Hours()
  129.  
  130. // Will mean newer results are weighted higher than older, and they
  131. // slowly drop off up to a month (there are 672 hours in a 28 day month)
  132. return 1 - (since / 672)
  133. },
  134.  
  135. // The S2 cell size, as opposed to reducing lat/lng precision
  136. CellSize: 14,
  137. })
  138.  
  139. // Register it with other recorders, probably in master
  140. heatmap.RegisterRecorder(strandedPassengersRecorder)
  141.  
  142. // Now recorder is registered, we need to be able to ask the recorder for
  143. // output. Either S2 output (the records per cell), or LatLng output (the
  144. // records per cell centroid). The output could be json bytes
  145. heatmapS2 := strandedPassengersRecorder.Read("LON", time.Now()).AsS2(&ValueContext{
  146. A: "count",
  147. B: "_", // Skip, unused
  148. C: "_", // Skip, unused
  149. D: "_", // Skip, unused
  150. })
  151. heatmapLatLng := strandedPassengersRecorder.Read("LON", time.Now()).AsLatLng(&ValueContext{
  152. A: "count",
  153. B: "_", // Skip, unused
  154. C: "_", // Skip, unused
  155. D: "_", // Skip, unused
  156. })
  157. ```
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement