Advertisement
illfate

Untitled

Jan 24th, 2022
952
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. package configapp
  2.  
  3. import (
  4.     "context"
  5.     "os"
  6.     "sync"
  7.     "syscall"
  8.  
  9.     "github.com/kelseyhightower/envconfig"
  10.     "go.uber.org/zap"
  11.  
  12.     "gitlab.fxtm/exinity/cdp/api/pkg/app"
  13.     "gitlab.fxtm/exinity/cdp/loader-framework/configapp"
  14.     frameworkloader "gitlab.fxtm/exinity/cdp/loader-framework/loader"
  15.     "gitlab.fxtm/exinity/cdp/loader-framework/retry"
  16.     "gitlab.fxtm/exinity/cdp/metrics/pkg/metrics"
  17.  
  18.     "gitlab.fxtm/exinity/cdp/autoposting-loader/internal/autoposting"
  19.     "gitlab.fxtm/exinity/cdp/autoposting-loader/internal/config"
  20.     "gitlab.fxtm/exinity/cdp/autoposting-loader/internal/grpc"
  21.     "gitlab.fxtm/exinity/cdp/autoposting-loader/internal/health"
  22.     "gitlab.fxtm/exinity/cdp/autoposting-loader/internal/notifiers"
  23. )
  24.  
  25. const serviceName = "autoposting_app"
  26.  
  27. type envConfig struct {
  28.     PathToEventsConfig string `envconfig:"PATH_TO_EVENTS_CONFIG" default:"events.yaml"`
  29.     GRPCServerHost     string `envconfig:"GRPC_SERVER_HOST" required:"true"`
  30. }
  31.  
  32. type AutoPostingLoaderApp struct {
  33.     baseApp *configapp.BaseApp
  34. }
  35.  
  36. func BuildPostingEvents(fileName string) (map[string]autoposting.Event, error) {
  37.     file, err := os.ReadFile(fileName)
  38.     if err != nil {
  39.         return nil, err
  40.     }
  41.  
  42.     postingCfg, err := config.BuildPostingConfig(file)
  43.     if err != nil {
  44.         return nil, err
  45.     }
  46.     return postingCfg.ConvertToAutoPostingEvents()
  47. }
  48.  
  49. func SetupAutoPostingLoaderApp(appVersion string) (*AutoPostingLoaderApp, error) {
  50.     var envCfg envConfig
  51.     err := envconfig.Process("", &envCfg)
  52.     if err != nil {
  53.         return nil, err
  54.     }
  55.  
  56.     var grpcServer *grpc.Server
  57.  
  58.     baseApp := configapp.NewBaseApp(serviceName, appVersion)
  59.  
  60.     postingEvents, err := BuildPostingEvents(envCfg.PathToEventsConfig)
  61.     if err != nil {
  62.         return nil, err
  63.     }
  64.  
  65.     err = baseApp.Init(func(cfg configapp.Config, l *zap.Logger, cons app.UserListStreamConsumerInterface) (frameworkloader.Runner, error) {
  66.         healthChecker := health.NewHealthChecker(baseApp.GetLogger())
  67.         healthChecker.RegisterCheck("consumer", func(ctx context.Context) error {
  68.             return baseApp.GetConsumer().Ping(ctx)
  69.         })
  70.  
  71.         grpcServer = grpc.NewServer(healthChecker)
  72.  
  73.         processor := autoposting.NewLoader(
  74.             l,
  75.             postingEvents,
  76.         )
  77.         retrier := retry.NewRetrier(retry.NewBackoff(retry.DefaultMaxAttempt, retry.DefaultMaxTime, retry.DefaultMinTime, notifiers.ErrTooMany, notifiers.ErrTimeout))
  78.         metricsLoader := metrics.NewLoader(serviceName, func(err error) string {
  79.             switch err {
  80.             case nil:
  81.                 return "no error"
  82.             default:
  83.                 return "bad_response"
  84.             }
  85.         })
  86.         return frameworkloader.NewSingleRunner(processor, cons, retrier, metricsLoader), nil
  87.     })
  88.     if err != nil {
  89.         return nil, err
  90.     }
  91.  
  92.     baseApp.AddStartingOnRunF(func(ctx context.Context, ch chan<- os.Signal, wg *sync.WaitGroup) {
  93.         defer wg.Done()
  94.  
  95.         baseApp.GetLogger().Debug("going to start grpcServer listening", zap.String("host", envCfg.GRPCServerHost))
  96.  
  97.         if err := <-grpcServer.StartListening(envCfg.GRPCServerHost); err != nil {
  98.             baseApp.GetLogger().Error("failed to start listening grpcServer", zap.Error(err))
  99.             ch <- syscall.SIGINT
  100.         }
  101.     })
  102.  
  103.     baseApp.AddShutDownF(func(ctx context.Context) {
  104.         <-ctx.Done()
  105.  
  106.         baseApp.GetLogger().Debug("going to gracefully stopping grpcServer")
  107.         grpcServer.GracefulStop()
  108.         baseApp.GetLogger().Debug("grpcServer gracefully stopped")
  109.     })
  110.  
  111.     return &AutoPostingLoaderApp{
  112.         baseApp: baseApp,
  113.     }, nil
  114. }
  115.  
  116. func (a *AutoPostingLoaderApp) Run() {
  117.     a.baseApp.Run()
  118. }
  119.  
  120. func (a *AutoPostingLoaderApp) Close() {
  121.     a.baseApp.Close()
  122. }
  123.  
Advertisement
Advertisement
Advertisement
RAW Paste Data Copied
Advertisement