Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package main
- import (
- "bramp.net/morebeam/csvio"
- "context"
- "encoding/json"
- "flag"
- "fmt"
- "github.com/apache/beam/sdks/go/pkg/beam"
- "github.com/apache/beam/sdks/go/pkg/beam/io/textio"
- "github.com/apache/beam/sdks/go/pkg/beam/log"
- "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"
- "github.com/apache/beam/sdks/go/pkg/beam/transforms/top"
- "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
- "reflect"
- )
- var (
- input = flag.String("file", "./csv/sample.csv", "input file")
- output = flag.String("output", "./outputs/reporting.txt", "output results")
- )
- type Rating struct {
- UserId int `csv:"userId" json:"userId"`
- MovieId int `csv:"movieId" json:"movieId"`
- Rating float64 `csv:"rating" json:"rating"`
- Timestamp int `csv:"timestamp" json:"timestamp"`
- }
- /* struct representation of a rating */
- type UserRatings struct {
- UserId int `json:"userId"`
- Ratings int `json:"ratings"`
- }
- /* RegisterType inserts "external" types into a global type registry to bypass
- serialization and preserve full method information.
- It should be called in init() only. */
- func init() {
- beam.RegisterType(reflect.TypeOf(Rating{}))
- beam.RegisterType(reflect.TypeOf(UserRatings{}))
- }
- func main() {
- flag.Parse()
- beam.Init()
- p, s := beam.NewPipelineWithRoot()
- ctx := context.Background()
- log.Infof(ctx, "Started pipeline on scope: %s", s)
- /* [TEST PIPELINE START ]*/
- s = s.Scope("Test Batch Pipeline")
- sr := csvio.Read(s, *input, reflect.TypeOf(Rating{}))
- pwo := beam.ParDo(s.Scope("Pair Key With One"),
- func(x Rating, emit func(int, int)) {
- emit(x.UserId, 1)
- }, sr)
- spk := stats.SumPerKey(s, pwo)
- mp := beam.ParDo(s.Scope("Map KV To Struct"),
- func(k int, v int, emit func(UserRatings)) {
- emit(UserRatings{
- UserId: k,
- Ratings: v,
- })
- }, spk)
- t := top.Largest(s, mp, 1000, func(x, y UserRatings) bool { return x.Ratings < y.Ratings })
- o := beam.ParDo(s, func(x []UserRatings) string {
- if data, err := json.MarshalIndent(x, "", ""); err != nil {
- return fmt.Sprintf("[Err]: %v", err)
- } else {
- return fmt.Sprintf("Output: %s", data)
- }
- }, t)
- textio.Write(s, *output, o)
- /* [TEST PIPELINE END ]*/
- if err := beamx.Run(ctx, p); err != nil {
- fmt.Println(err)
- log.Exitf(ctx, "Failed to execute job: on ctx=%v:")
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement