Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package configapp
- import (
- "context"
- "fmt"
- "os"
- "os/signal"
- "sync"
- "syscall"
- "time"
- "go.uber.org/zap"
- // redislib "github.com/go-redis/redis"
- "gitlab.fxtm/exinity/cdp/api/pkg/app"
- "gitlab.fxtm/exinity/cdp/message-queue/pkg/mq"
- "gitlab.fxtm/exinity/cdp/message-queue/pkg/mq/redismq"
- "gitlab.fxtm/exinity/cdp/loader-framework/loader"
- )
- type StartingOnRunF func(context.Context, chan<- os.Signal, *sync.WaitGroup)
- type ShutDownF func(ctx context.Context)
- type BaseApp struct {
- loader loader.Runner
- consumerBackend mq.Consumer
- consumer app.UserListStreamConsumerInterface
- logger *zap.Logger
- startingOnRunFuncs []StartingOnRunF
- shutdownFuncs []ShutDownF
- serviceName string
- appVersion string
- }
- func NewBaseApp(serviceName, appVersion string) *BaseApp {
- return &BaseApp{
- serviceName: serviceName,
- appVersion: appVersion,
- }
- }
- type SetupLoaderF func(cfg Config, logger *zap.Logger, consumer app.UserListStreamConsumerInterface) (loader.Runner, error)
- func (a *BaseApp) InitWithHelper(helper Helper, setupF SetupLoaderF) error {
- ctx := context.Background()
- cfg := helper.Config()
- logger, shutdownLogger, err := helper.SetupLogger()
- if err != nil {
- return err
- }
- a.logger = logger
- shutdownConsumerClient, shutdownConsumerBackend, err := a.setupConsumer(ctx, cfg, helper)
- if err != nil {
- shutdownLogger(ctx)
- return err
- }
- shutdownMetrics, err := helper.SetupMetricsServer()
- if err != nil {
- shutdownLogger(ctx)
- shutdownConsumerClient(ctx)
- shutdownConsumerBackend(ctx)
- return err
- }
- shutdownTracing, err := helper.SetupTracingExporter()
- if err != nil {
- shutdownLogger(ctx)
- shutdownConsumerClient(ctx)
- shutdownConsumerBackend(ctx)
- shutdownMetrics(ctx)
- return err
- }
- a.loader, err = setupF(cfg, logger, a.consumer)
- if err != nil {
- shutdownLogger(ctx)
- shutdownConsumerClient(ctx)
- shutdownConsumerBackend(ctx)
- shutdownMetrics(ctx)
- shutdownTracing(ctx)
- return err
- }
- a.AddShutDownF(shutdownLogger, shutdownConsumerClient, shutdownMetrics, shutdownTracing, shutdownConsumerBackend)
- a.AddStartingOnRunF(a.runProcessor)
- return nil
- }
- func (a *BaseApp) setupConsumer(ctx context.Context, cfg Config, helper Helper) (ShutDownF, ShutDownF, error) {
- switch cfg.ConsumerBackend {
- case ConsumerBackendRedis:
- return a.setupRedisConsumer(ctx, cfg, helper)
- case ConsumerBackendKafka:
- return a.setupKafkaConsumer(ctx, cfg, helper)
- default:
- return nil, nil, fmt.Errorf("unsupportred consumer backend: %s", cfg.ConsumerBackend)
- }
- }
- func (a *BaseApp) setupRedisConsumer(ctx context.Context, cfg Config, helper Helper) (ShutDownF, ShutDownF, error) {
- _, shutdownRedis, err := helper.SetupRedis()
- if err != nil {
- return nil, nil, err
- }
- redisConsumer, shutdownRedisConsumer, err := helper.SetupRedisConsumer()
- if err != nil {
- shutdownRedis(ctx)
- return nil, nil, err
- }
- a.consumerBackend = redisConsumer
- a.consumer = redismq.NewRedisConsumer(
- cfg.Redis.ConsumerStreamName,
- cfg.Redis.ConsumerGroupName,
- cfg.Redis.ConsumerName,
- cfg.Redis.ReadBatchCount,
- time.Duration(cfg.Redis.TimeoutMs)*time.Millisecond,
- a.consumerBackend,
- )
- return shutdownRedis, shutdownRedisConsumer, nil
- }
- func (a *BaseApp) setupKafkaConsumer(ctx context.Context, cfg Config, helper Helper) (ShutDownF, ShutDownF, error) {
- _, shutdownKafka, err := helper.SetupKafka()
- if err != nil {
- return nil, nil, err
- }
- kafkaConsumer, shutdownKafkaConsumer, err := helper.SetupKafkaConsumer()
- if err != nil {
- shutdownKafka(ctx)
- }
- a.consumerBackend = kafkaConsumer
- a.consumer = redismq.NewRedisConsumer( // It's not redis impl, it's common
- cfg.Kafka.ConsumerTopic,
- cfg.Kafka.ConsumerGroupID,
- "", // not relevant for kafka
- 0, // not relevant for kafka
- time.Duration(1000)*time.Millisecond, // not relevant for kafka
- a.consumerBackend,
- )
- return shutdownKafka, shutdownKafkaConsumer, nil
- }
- func (a *BaseApp) Init(setupF SetupLoaderF) error {
- cfg, err := ParseConfig()
- if err != nil {
- return err
- }
- helper := NewDefaultHelper(cfg, a.serviceName, a.appVersion, nil)
- return a.InitWithHelper(helper, setupF)
- }
- func (a *BaseApp) AddShutDownF(fs ...ShutDownF) {
- a.shutdownFuncs = append(a.shutdownFuncs, fs...)
- }
- func (a *BaseApp) AddStartingOnRunF(f StartingOnRunF) {
- a.startingOnRunFuncs = append(a.startingOnRunFuncs, f)
- }
- func (a *BaseApp) Run() {
- a.run(context.Background())
- }
- func (a *BaseApp) RunContext(ctx context.Context) {
- a.run(ctx)
- }
- func (a *BaseApp) run(ctx context.Context) {
- runCtx, cancelF := context.WithCancel(ctx)
- signalCh := make(chan os.Signal, 1)
- signal.Notify(signalCh, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
- go func() {
- <-signalCh
- cancelF()
- }()
- go a.monitor(runCtx)
- var wg sync.WaitGroup
- wg.Add(len(a.startingOnRunFuncs))
- for _, runF := range a.startingOnRunFuncs {
- go runF(runCtx, signalCh, &wg)
- }
- wg.Wait()
- }
- func (a *BaseApp) Close() {
- }
- func (a *BaseApp) GetLogger() *zap.Logger {
- return a.logger
- }
- // GetConsumer need to check health from the loader
- func (a *BaseApp) GetConsumer() mq.Consumer {
- return a.consumerBackend
- }
- func (a *BaseApp) monitor(ctx context.Context) {
- <-ctx.Done()
- const duration = time.Second * 10
- for _, f := range a.shutdownFuncs {
- timeoutCtx, timeoutCancelF := context.WithTimeout(context.Background(), duration)
- f(timeoutCtx)
- timeoutCancelF()
- }
- }
- func (a *BaseApp) runProcessor(ctx context.Context, ch chan<- os.Signal, wg *sync.WaitGroup) {
- defer wg.Done()
- err := a.loader.Run(ctx)
- if err != nil {
- a.logger.Error("Got error during processor run", zap.Error(err))
- ch <- os.Interrupt
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement