Advertisement
illfate

Untitled

Jan 24th, 2022
1,176
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. package configapp
  2.  
  3. import (
  4.     "context"
  5.     "fmt"
  6.     "os"
  7.     "os/signal"
  8.     "sync"
  9.     "syscall"
  10.     "time"
  11.  
  12.     "go.uber.org/zap"
  13.     // redislib "github.com/go-redis/redis"
  14.  
  15.     "gitlab.fxtm/exinity/cdp/api/pkg/app"
  16.     "gitlab.fxtm/exinity/cdp/message-queue/pkg/mq"
  17.     "gitlab.fxtm/exinity/cdp/message-queue/pkg/mq/redismq"
  18.  
  19.     "gitlab.fxtm/exinity/cdp/loader-framework/loader"
  20. )
  21.  
  22. type StartingOnRunF func(context.Context, chan<- os.Signal, *sync.WaitGroup)
  23.  
  24. type ShutDownF func(ctx context.Context)
  25.  
  26. type BaseApp struct {
  27.     loader             loader.Runner
  28.     consumerBackend    mq.Consumer
  29.     consumer           app.UserListStreamConsumerInterface
  30.     logger             *zap.Logger
  31.     startingOnRunFuncs []StartingOnRunF
  32.     shutdownFuncs      []ShutDownF
  33.     serviceName        string
  34.     appVersion         string
  35. }
  36.  
  37. func NewBaseApp(serviceName, appVersion string) *BaseApp {
  38.     return &BaseApp{
  39.         serviceName: serviceName,
  40.         appVersion:  appVersion,
  41.     }
  42. }
  43.  
  44. type SetupLoaderF func(cfg Config, logger *zap.Logger, consumer app.UserListStreamConsumerInterface) (loader.Runner, error)
  45.  
  46. func (a *BaseApp) InitWithHelper(helper Helper, setupF SetupLoaderF) error {
  47.     ctx := context.Background()
  48.     cfg := helper.Config()
  49.     logger, shutdownLogger, err := helper.SetupLogger()
  50.     if err != nil {
  51.         return err
  52.     }
  53.     a.logger = logger
  54.  
  55.     shutdownConsumerClient, shutdownConsumerBackend, err := a.setupConsumer(ctx, cfg, helper)
  56.     if err != nil {
  57.         shutdownLogger(ctx)
  58.         return err
  59.     }
  60.  
  61.     shutdownMetrics, err := helper.SetupMetricsServer()
  62.     if err != nil {
  63.         shutdownLogger(ctx)
  64.         shutdownConsumerClient(ctx)
  65.         shutdownConsumerBackend(ctx)
  66.         return err
  67.     }
  68.  
  69.     shutdownTracing, err := helper.SetupTracingExporter()
  70.     if err != nil {
  71.         shutdownLogger(ctx)
  72.         shutdownConsumerClient(ctx)
  73.         shutdownConsumerBackend(ctx)
  74.         shutdownMetrics(ctx)
  75.         return err
  76.     }
  77.  
  78.     a.loader, err = setupF(cfg, logger, a.consumer)
  79.     if err != nil {
  80.         shutdownLogger(ctx)
  81.         shutdownConsumerClient(ctx)
  82.         shutdownConsumerBackend(ctx)
  83.         shutdownMetrics(ctx)
  84.         shutdownTracing(ctx)
  85.         return err
  86.     }
  87.  
  88.     a.AddShutDownF(shutdownLogger, shutdownConsumerClient, shutdownMetrics, shutdownTracing, shutdownConsumerBackend)
  89.     a.AddStartingOnRunF(a.runProcessor)
  90.  
  91.     return nil
  92. }
  93.  
  94. func (a *BaseApp) setupConsumer(ctx context.Context, cfg Config, helper Helper) (ShutDownF, ShutDownF, error) {
  95.     switch cfg.ConsumerBackend {
  96.     case ConsumerBackendRedis:
  97.         return a.setupRedisConsumer(ctx, cfg, helper)
  98.     case ConsumerBackendKafka:
  99.         return a.setupKafkaConsumer(ctx, cfg, helper)
  100.     default:
  101.         return nil, nil, fmt.Errorf("unsupportred consumer backend: %s", cfg.ConsumerBackend)
  102.     }
  103. }
  104.  
  105. func (a *BaseApp) setupRedisConsumer(ctx context.Context, cfg Config, helper Helper) (ShutDownF, ShutDownF, error) {
  106.     _, shutdownRedis, err := helper.SetupRedis()
  107.     if err != nil {
  108.         return nil, nil, err
  109.     }
  110.  
  111.     redisConsumer, shutdownRedisConsumer, err := helper.SetupRedisConsumer()
  112.     if err != nil {
  113.         shutdownRedis(ctx)
  114.         return nil, nil, err
  115.     }
  116.     a.consumerBackend = redisConsumer
  117.  
  118.     a.consumer = redismq.NewRedisConsumer(
  119.         cfg.Redis.ConsumerStreamName,
  120.         cfg.Redis.ConsumerGroupName,
  121.         cfg.Redis.ConsumerName,
  122.         cfg.Redis.ReadBatchCount,
  123.         time.Duration(cfg.Redis.TimeoutMs)*time.Millisecond,
  124.         a.consumerBackend,
  125.     )
  126.  
  127.     return shutdownRedis, shutdownRedisConsumer, nil
  128. }
  129.  
  130. func (a *BaseApp) setupKafkaConsumer(ctx context.Context, cfg Config, helper Helper) (ShutDownF, ShutDownF, error) {
  131.     _, shutdownKafka, err := helper.SetupKafka()
  132.     if err != nil {
  133.         return nil, nil, err
  134.     }
  135.  
  136.     kafkaConsumer, shutdownKafkaConsumer, err := helper.SetupKafkaConsumer()
  137.     if err != nil {
  138.         shutdownKafka(ctx)
  139.     }
  140.     a.consumerBackend = kafkaConsumer
  141.  
  142.     a.consumer = redismq.NewRedisConsumer( // It's not redis impl, it's common
  143.         cfg.Kafka.ConsumerTopic,
  144.         cfg.Kafka.ConsumerGroupID,
  145.         "",                                   // not relevant for kafka
  146.         0,                                    // not relevant for kafka
  147.         time.Duration(1000)*time.Millisecond, // not relevant for kafka
  148.         a.consumerBackend,
  149.     )
  150.  
  151.     return shutdownKafka, shutdownKafkaConsumer, nil
  152. }
  153.  
  154. func (a *BaseApp) Init(setupF SetupLoaderF) error {
  155.     cfg, err := ParseConfig()
  156.     if err != nil {
  157.         return err
  158.     }
  159.     helper := NewDefaultHelper(cfg, a.serviceName, a.appVersion, nil)
  160.     return a.InitWithHelper(helper, setupF)
  161. }
  162.  
  163. func (a *BaseApp) AddShutDownF(fs ...ShutDownF) {
  164.     a.shutdownFuncs = append(a.shutdownFuncs, fs...)
  165. }
  166.  
  167. func (a *BaseApp) AddStartingOnRunF(f StartingOnRunF) {
  168.     a.startingOnRunFuncs = append(a.startingOnRunFuncs, f)
  169. }
  170.  
  171. func (a *BaseApp) Run() {
  172.     a.run(context.Background())
  173. }
  174.  
  175. func (a *BaseApp) RunContext(ctx context.Context) {
  176.     a.run(ctx)
  177. }
  178.  
  179. func (a *BaseApp) run(ctx context.Context) {
  180.     runCtx, cancelF := context.WithCancel(ctx)
  181.     signalCh := make(chan os.Signal, 1)
  182.     signal.Notify(signalCh, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
  183.     go func() {
  184.         <-signalCh
  185.         cancelF()
  186.     }()
  187.     go a.monitor(runCtx)
  188.  
  189.     var wg sync.WaitGroup
  190.     wg.Add(len(a.startingOnRunFuncs))
  191.     for _, runF := range a.startingOnRunFuncs {
  192.         go runF(runCtx, signalCh, &wg)
  193.     }
  194.     wg.Wait()
  195. }
  196.  
  197. func (a *BaseApp) Close() {
  198. }
  199.  
  200. func (a *BaseApp) GetLogger() *zap.Logger {
  201.     return a.logger
  202. }
  203.  
  204. // GetConsumer need to check health from the loader
  205. func (a *BaseApp) GetConsumer() mq.Consumer {
  206.     return a.consumerBackend
  207. }
  208.  
  209. func (a *BaseApp) monitor(ctx context.Context) {
  210.     <-ctx.Done()
  211.     const duration = time.Second * 10
  212.     for _, f := range a.shutdownFuncs {
  213.         timeoutCtx, timeoutCancelF := context.WithTimeout(context.Background(), duration)
  214.         f(timeoutCtx)
  215.         timeoutCancelF()
  216.     }
  217. }
  218.  
  219. func (a *BaseApp) runProcessor(ctx context.Context, ch chan<- os.Signal, wg *sync.WaitGroup) {
  220.     defer wg.Done()
  221.     err := a.loader.Run(ctx)
  222.     if err != nil {
  223.         a.logger.Error("Got error during processor run", zap.Error(err))
  224.         ch <- os.Interrupt
  225.     }
  226. }
  227.  
Advertisement
Advertisement
Advertisement
RAW Paste Data Copied
Advertisement