Advertisement
illfate

Untitled

Jan 24th, 2022
64
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.28 KB | None | 0 0
  1. ENV variables
  2.  
  3. Common
  4.  
  5.  
  6. CONSUMER_BACKEND - required flag, it's used to specify to use redis or kafka as a backend for the consumer. Depending on this, variables with the prefix KAFKA_ or REDIS_ will be required.
  7.  
  8. IS_DEBUG - not required flag, default value is false, if true is specified, debug logging will be available.
  9.  
  10. SERVER_PORT - required flag, it's used to specify on which port http server runs, example: "8080"
  11.  
  12. JAEGER_URI - required flag, it's used to specify jaeger agent uri, default:":6831"
  13.  
  14. ENV_TAG - not required flag, it's used to specify environment, default:"development"
  15.  
  16.  
  17. Redis ENV variables
  18.  
  19.  
  20. REDIS_CONSUMER_HOST - required flag, it's used to connect to redis instance example: ":6379"
  21.  
  22. REDIS_CONSUMER_STREAM_NAME - required flag, it's used to connect to specific stream, example:"transformed-events"
  23.  
  24. REDIS_CONSUMER_GROUP_NAME - required flag, it's used to connect to redis stream within specific group name, example:"creatio"
  25.  
  26. REDIS_CONSUMER_NAME - required flag, it's used to connect to redis stream with given consumer name, example:"creatio-loader"
  27.  
  28. REDIS_CONSUMER_READ_BATCH_COUNT - required flag, it's max number of messages, that will be read during one redis XRead, any positive number.
  29.  
  30. REDIS_CONSUMER_BLOCK_TIMEOUT_MS - required flag, it's number of timeout after redis read.
  31.  
  32.  
  33. Kafka ENV variables
  34.  
  35.  
  36. KAFKA_CONSUMER_BROKERS - required flag,
  37.  
  38. KAFKA_CONSUMER_GROUP_ID - required flag,
  39.  
  40. KAFKA_CONSUMER_TOPIC - required flag,
  41.  
  42. KAFKA_CONSUMER_CONNECT_TIMEOUT_SEC - required flag,
  43. KAFKA_CONSUMER_TLS_ENABLED
  44.  
  45. KAFKA_CONSUMER_SSL_BUNDLE_FILE_PATH - required flag if KAFKA_CONSUMER_TLS_ENABLED is true
  46.  
  47. KAFKA_CONSUMER_SASL_USER
  48. KAFKA_CONSUMER_SASL_PASSWORD
  49.  
  50.  
  51. Basic example
  52. This is an example of creating application main runner with retrier and single runner.
  53. Usually you just want to create your own implementation that aggregates configapp.App.
  54.  
  55. type app struct{
  56. baseApp *configapp.App
  57. }
  58.  
  59. func setupApp() (*app, error) {
  60. serviceName := "service_name" // snake case as it's required by prometheus
  61. baseApp := configapp.NewBaseApp(serviceName, appVersion)
  62. err = baseApp.Init(func(cfg configapp.Config, logger *zap.Logger, consumer app.UserListStreamConsumerInterface) (loader.Runner, error) {
  63. retrier := retry.NewRetrier(retry.NewBackoff(retry.DefaultMaxAttempt, retry.DefaultMaxTime, retry.DefaultMinTime))
  64.  
  65. metricsLoader := metrics.NewLoader(serviceName, func (err error) string {
  66. switch err {
  67. case nil:
  68. return "no error"
  69. default:
  70. return "bad_response"
  71. }
  72. })
  73.  
  74. // config.App uses loader.Runner interface, you can use both SingleRunner, BatchRunner or implement own runner.
  75. return loader.NewSingleRunner(ownLoader, consumer, retrier, metricsLoader), nil
  76. })
  77.  
  78. return &app{
  79. baseApp: baseApp,
  80. }, err
  81. }
  82.  
  83. func (a *app) Run() {
  84. a.baseApp.Run()
  85. }
  86.  
  87. func (a *app) Close() {
  88. a.baseApp.Close()
  89. }
  90.  
  91.  
  92.  
  93.  
  94. Running app
  95. Then run this in main.
  96.  
  97. func main(){
  98. app, err :=setupApp()
  99. ...
  100. app.Run()
  101. app.Close()
  102. }
  103.  
  104.  
  105.  
  106. Add on start and on shutdown
  107. If you want to add function that will live with an app, you can use AddStartingOnRunF method.
  108.  
  109. app.AddStartingOnRunF(func(ctx context.Context, ch chan<- os.Signal, wg *sync.WaitGroup) {
  110. defer wg.Done() // you should be sure to call wg.Done
  111. for {
  112. // if some fatal error was happened, you can signal to stop app
  113. if err!=nil{
  114. ch <- syscall.SIGINT
  115. }
  116. }
  117. })
  118.  
  119.  
  120. If you need to stop some execution, you can use AddShutdownF, check for ctx.Done as it uses timeout.
  121.  
  122. app.AddShutDownF(func(ctx context.Context) {
  123. <-ctx.Done()
  124. // stop smth gracefully
  125. })
  126.  
  127.  
  128.  
  129. loader
  130. Loader package provides wrappers on loaders with retrying option, metrics.
  131. Loader package contains Single and Batch runners implementation of Runner interface.
  132. Single runner provides callback function that accepts only one incoming event.
  133. Batch runner provides callback function that accepts slice of incoming events,
  134. that can be set up using REDIS_BATCH_COUNT env variable.
  135. Note: current implementation of redis streams message queue always pass only one event in slice.
  136. To create Single or Batch runner you should implement Processor or BatchProcessor.
  137. Skipped return parameter means that for OnErr methods runners won't count this as error,
  138. and for BeforeProcess method, that it will increment skipped event counter in metrics.
  139. If you want to skip event during Process method, you can return metrics.ErrSkipEvent
  140. OnErrMethod, also using context you can get access to span and set additional info.
  141.  
  142. span := trace.FromContext(ctx)
  143. if errors.Cause(err) == ErrEntityExists {
  144. span.SetStatus(trace.Status{
  145. Code: int32(code.Code_ALREADY_EXISTS),
  146. Message: err.Error(),
  147. })
  148. return true
  149. }
  150. span.SetStatus(trace.Status{
  151. Code: int32(code.Code_UNKNOWN),
  152. Message: err.Error(),
  153. })
  154.  
  155.  
  156.  
  157.  
  158. retrier
  159. Retry package provides ability of retrying functions.
  160. By default, it just passes errors forward and doesn't retry, but you can specify errors,
  161. on these errors retrier retries execution.
  162. So if your Proccess function returns an error, that was specified as retry error in Retrier,
  163. then it will be retried according to backoff setup.
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement