Advertisement
Guest User

main.go

a guest
Sep 13th, 2019
250
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.33 KB | None | 0 0
  1. package main
  2.  
  3. import (
  4. "bramp.net/morebeam/csvio"
  5. "context"
  6. "encoding/json"
  7. "flag"
  8. "fmt"
  9. "github.com/apache/beam/sdks/go/pkg/beam"
  10. "github.com/apache/beam/sdks/go/pkg/beam/io/textio"
  11. "github.com/apache/beam/sdks/go/pkg/beam/log"
  12. "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"
  13. "github.com/apache/beam/sdks/go/pkg/beam/transforms/top"
  14. "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
  15. "reflect"
  16. )
  17.  
  18. var (
  19. input = flag.String("file", "./csv/sample.csv", "input file")
  20. output = flag.String("output", "./outputs/reporting.txt", "output results")
  21. )
  22.  
  23. type Rating struct {
  24. UserId int `csv:"userId" json:"userId"`
  25. MovieId int `csv:"movieId" json:"movieId"`
  26. Rating float64 `csv:"rating" json:"rating"`
  27. Timestamp int `csv:"timestamp" json:"timestamp"`
  28. }
  29.  
  30. /* struct representation of a rating */
  31. type UserRatings struct {
  32. UserId int `json:"userId"`
  33. Ratings int `json:"ratings"`
  34. }
  35.  
  36. /* RegisterType inserts "external" types into a global type registry to bypass
  37. serialization and preserve full method information.
  38. It should be called in init() only. */
  39. func init() {
  40. beam.RegisterType(reflect.TypeOf(Rating{}))
  41. beam.RegisterType(reflect.TypeOf(UserRatings{}))
  42. }
  43.  
  44. func main() {
  45. flag.Parse()
  46.  
  47. beam.Init()
  48.  
  49. p, s := beam.NewPipelineWithRoot()
  50.  
  51. ctx := context.Background()
  52.  
  53. log.Infof(ctx, "Started pipeline on scope: %s", s)
  54.  
  55. /* [TEST PIPELINE START ]*/
  56.  
  57. s = s.Scope("Test Batch Pipeline")
  58.  
  59. sr := csvio.Read(s, *input, reflect.TypeOf(Rating{}))
  60.  
  61. pwo := beam.ParDo(s.Scope("Pair Key With One"),
  62. func(x Rating, emit func(int, int)) {
  63. emit(x.UserId, 1)
  64. }, sr)
  65.  
  66. spk := stats.SumPerKey(s, pwo)
  67.  
  68. mp := beam.ParDo(s.Scope("Map KV To Struct"),
  69. func(k int, v int, emit func(UserRatings)) {
  70. emit(UserRatings{
  71. UserId: k,
  72. Ratings: v,
  73. })
  74. }, spk)
  75.  
  76. t := top.Largest(s, mp, 1000, func(x, y UserRatings) bool { return x.Ratings < y.Ratings })
  77.  
  78. o := beam.ParDo(s, func(x []UserRatings) string {
  79. if data, err := json.MarshalIndent(x, "", ""); err != nil {
  80. return fmt.Sprintf("[Err]: %v", err)
  81. } else {
  82. return fmt.Sprintf("Output: %s", data)
  83. }
  84. }, t)
  85.  
  86. textio.Write(s, *output, o)
  87.  
  88. /* [TEST PIPELINE END ]*/
  89.  
  90. if err := beamx.Run(ctx, p); err != nil {
  91. fmt.Println(err)
  92. log.Exitf(ctx, "Failed to execute job: on ctx=%v:")
  93. }
  94. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement