Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- Heatmaps Service
- ================
- We seem to frequently duplicate work with regards to reading events and
- building heatmaps based on that data. The purpose of this is to build a generic
- and modifiable service which can handle it all.
- Current requirements
- --------------------
- + Listen to `allocationOutcome` events, where outcome was `failed` (perhaps
- `cancelled` as well). Plot each of these by lat/lng and base the 'heat' on
- the number of failed allocations in that area.
- + Listen to `nearDrivers` events, store an average at that location
- Nice to haves?
- --------------
- + Build a heatmap of pmuh points
- + Build a heatmap of destinations
- Example API?
- ------------
- ```go
- strandedPassengersRecorder := heatmap.NewRecorder(
- // The event name to record on
- "allocationOutcome",
- // The recorder options, needs to be flexible enough to cater for future
- // requirements
- &heatmap.RecorderOptions{
- // Validation function as an option
- Validation: func(ev map[string]string) bool {
- lat := ev["pickupLat"]
- lng := ev["pickupLng"]
- out := ev["outcome"]
- if out == "allocated" {
- return false
- }
- latCnv, err := strconv.ParseFloat(lat, 64)
- if err != nil {
- return false
- }
- lngCnv, err := strconv.ParseFloat(lng, 64)
- if err != nil {
- return false
- }
- if latCnv == 0.0 && lngCnv == 0.0 {
- return false
- }
- return true
- },
- // The fields to use for the lat/lng
- LatField: "pickupLat",
- LngField: "pickupLng",
- // The field to use for the hob
- HobField: "city",
- // Retrieve the time, this gives us flexibilty to use the current time,
- // perform any TZ conversion, validate the timestamp etc.
- Time: func(ev map[string]string) (time.Time, error) {
- timestamp := ev["timestamp"]
- tCnv, err := strconv.Atoi(timestamp)
- if err != nil {
- // Falling back to time.Now() in this case, could error otherwise
- return time.Now()
- }
- return time.Unix(tCnv, 0)
- },
- TimeBucket: heatmap.BucketHour, // Can include BucketDay, BucketTen, BucketMinute
- TimeExpiry: heatmap.ExpiryMonth, // Expire after a month
- // What to record? This is the values which will be pushed through the
- // combiner in an external step
- //
- // That means if we want to increment 'a' by 1, we call 'a' 1. Then the
- // combiner will do a + aʹ
- Record: func(ev map[string]string) (aʹ, bʹ, cʹ, dʹ float64, err error) {
- // In this case, we simply want to record a count which we increment
- return 1, 0, 0, 0 // (the combiner will just sum the 1)
- /*
- // In the ETA situation, we might do:
- eta, err := strconv.ParseFloat(ev["eta"])
- if err != nil {
- return
- }
- return eta, eta, eta, 1, nil
- // The combiner would then look like:
- //
- // This would keep track of the minimum, maximum, total entries and
- // total sum of etas
- Combine: func(a, b, c, d, aʹ, bʹ, cʹ, dʹ float64) (aʹʹ, bʹʹ, cʹʹ, dʹʹ float64) {
- return math.Min(a, aʹ), math.Max(b, bʹ), c + cʹ, d + dʹ
- }
- */
- },
- // Teach how to combine, in this case we just want to sum both a's
- //
- // If we know how to combine, then we know how to merge many rows together,
- // and also keep track of certain totals within rows
- Combine: func(a, b, c, d, aʹ, bʹ, cʹ, dʹ float64) (aʹʹ, bʹʹ, cʹʹ, dʹʹ float64) {
- return a + aʹ, 0, 0, 0
- },
- // Suppose we want more recent results to be more important than older
- // ones, we could apply some simple dropoff (create a dropoff function which
- // returns a scalar). We will know a rough date of a batch of results (so
- // that we can expire them) and so this should be feasible.
- Dropoff: func(in float64, t time.Time) float64 {
- since := time.Since(t).Hours()
- // Will mean newer results are weighted higher than older, and they
- // slowly drop off up to a month (there are 672 hours in a 28 day month)
- return 1 - (since / 672)
- },
- // The S2 cell size, as opposed to reducing lat/lng precision
- CellSize: 14,
- })
- // Register it with other recorders, probably in master
- heatmap.RegisterRecorder(strandedPassengersRecorder)
- // Now recorder is registered, we need to be able to ask the recorder for
- // output. Either S2 output (the records per cell), or LatLng output (the
- // records per cell centroid). The output could be json bytes
- heatmapS2 := strandedPassengersRecorder.Read("LON", time.Now()).AsS2(&ValueContext{
- A: "count",
- B: "_", // Skip, unused
- C: "_", // Skip, unused
- D: "_", // Skip, unused
- })
- heatmapLatLng := strandedPassengersRecorder.Read("LON", time.Now()).AsLatLng(&ValueContext{
- A: "count",
- B: "_", // Skip, unused
- C: "_", // Skip, unused
- D: "_", // Skip, unused
- })
- ```
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement