Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package configapp
- import (
- "context"
- "os"
- "sync"
- "syscall"
- "github.com/kelseyhightower/envconfig"
- "go.uber.org/zap"
- "gitlab.fxtm/exinity/cdp/api/pkg/app"
- "gitlab.fxtm/exinity/cdp/loader-framework/configapp"
- frameworkloader "gitlab.fxtm/exinity/cdp/loader-framework/loader"
- "gitlab.fxtm/exinity/cdp/loader-framework/retry"
- "gitlab.fxtm/exinity/cdp/metrics/pkg/metrics"
- "gitlab.fxtm/exinity/cdp/autoposting-loader/internal/autoposting"
- "gitlab.fxtm/exinity/cdp/autoposting-loader/internal/config"
- "gitlab.fxtm/exinity/cdp/autoposting-loader/internal/grpc"
- "gitlab.fxtm/exinity/cdp/autoposting-loader/internal/health"
- "gitlab.fxtm/exinity/cdp/autoposting-loader/internal/notifiers"
- )
- const serviceName = "autoposting_app"
- type envConfig struct {
- PathToEventsConfig string `envconfig:"PATH_TO_EVENTS_CONFIG" default:"events.yaml"`
- GRPCServerHost string `envconfig:"GRPC_SERVER_HOST" required:"true"`
- }
- type AutoPostingLoaderApp struct {
- baseApp *configapp.BaseApp
- }
- func BuildPostingEvents(fileName string) (map[string]autoposting.Event, error) {
- file, err := os.ReadFile(fileName)
- if err != nil {
- return nil, err
- }
- postingCfg, err := config.BuildPostingConfig(file)
- if err != nil {
- return nil, err
- }
- return postingCfg.ConvertToAutoPostingEvents()
- }
- func SetupAutoPostingLoaderApp(appVersion string) (*AutoPostingLoaderApp, error) {
- var envCfg envConfig
- err := envconfig.Process("", &envCfg)
- if err != nil {
- return nil, err
- }
- var grpcServer *grpc.Server
- baseApp := configapp.NewBaseApp(serviceName, appVersion)
- postingEvents, err := BuildPostingEvents(envCfg.PathToEventsConfig)
- if err != nil {
- return nil, err
- }
- err = baseApp.Init(func(cfg configapp.Config, l *zap.Logger, cons app.UserListStreamConsumerInterface) (frameworkloader.Runner, error) {
- healthChecker := health.NewHealthChecker(baseApp.GetLogger())
- healthChecker.RegisterCheck("consumer", func(ctx context.Context) error {
- return baseApp.GetConsumer().Ping(ctx)
- })
- grpcServer = grpc.NewServer(healthChecker)
- processor := autoposting.NewLoader(
- l,
- postingEvents,
- )
- retrier := retry.NewRetrier(retry.NewBackoff(retry.DefaultMaxAttempt, retry.DefaultMaxTime, retry.DefaultMinTime, notifiers.ErrTooMany, notifiers.ErrTimeout))
- metricsLoader := metrics.NewLoader(serviceName, func(err error) string {
- switch err {
- case nil:
- return "no error"
- default:
- return "bad_response"
- }
- })
- return frameworkloader.NewSingleRunner(processor, cons, retrier, metricsLoader), nil
- })
- if err != nil {
- return nil, err
- }
- baseApp.AddStartingOnRunF(func(ctx context.Context, ch chan<- os.Signal, wg *sync.WaitGroup) {
- defer wg.Done()
- baseApp.GetLogger().Debug("going to start grpcServer listening", zap.String("host", envCfg.GRPCServerHost))
- if err := <-grpcServer.StartListening(envCfg.GRPCServerHost); err != nil {
- baseApp.GetLogger().Error("failed to start listening grpcServer", zap.Error(err))
- ch <- syscall.SIGINT
- }
- })
- baseApp.AddShutDownF(func(ctx context.Context) {
- <-ctx.Done()
- baseApp.GetLogger().Debug("going to gracefully stopping grpcServer")
- grpcServer.GracefulStop()
- baseApp.GetLogger().Debug("grpcServer gracefully stopped")
- })
- return &AutoPostingLoaderApp{
- baseApp: baseApp,
- }, nil
- }
- func (a *AutoPostingLoaderApp) Run() {
- a.baseApp.Run()
- }
- func (a *AutoPostingLoaderApp) Close() {
- a.baseApp.Close()
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement