Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- ENV variables
- Common
- CONSUMER_BACKEND - required flag, it's used to specify to use redis or kafka as a backend for the consumer. Depending on this, variables with the prefix KAFKA_ or REDIS_ will be required.
- IS_DEBUG - not required flag, default value is false, if true is specified, debug logging will be available.
- SERVER_PORT - required flag, it's used to specify on which port http server runs, example: "8080"
- JAEGER_URI - required flag, it's used to specify jaeger agent uri, default:":6831"
- ENV_TAG - not required flag, it's used to specify environment, default:"development"
- Redis ENV variables
- REDIS_CONSUMER_HOST - required flag, it's used to connect to redis instance example: ":6379"
- REDIS_CONSUMER_STREAM_NAME - required flag, it's used to connect to specific stream, example:"transformed-events"
- REDIS_CONSUMER_GROUP_NAME - required flag, it's used to connect to redis stream within specific group name, example:"creatio"
- REDIS_CONSUMER_NAME - required flag, it's used to connect to redis stream with given consumer name, example:"creatio-loader"
- REDIS_CONSUMER_READ_BATCH_COUNT - required flag, it's max number of messages, that will be read during one redis XRead, any positive number.
- REDIS_CONSUMER_BLOCK_TIMEOUT_MS - required flag, it's number of timeout after redis read.
- Kafka ENV variables
- KAFKA_CONSUMER_BROKERS - required flag,
- KAFKA_CONSUMER_GROUP_ID - required flag,
- KAFKA_CONSUMER_TOPIC - required flag,
- KAFKA_CONSUMER_CONNECT_TIMEOUT_SEC - required flag,
- KAFKA_CONSUMER_TLS_ENABLED
- KAFKA_CONSUMER_SSL_BUNDLE_FILE_PATH - required flag if KAFKA_CONSUMER_TLS_ENABLED is true
- KAFKA_CONSUMER_SASL_USER
- KAFKA_CONSUMER_SASL_PASSWORD
- Basic example
- This is an example of creating application main runner with retrier and single runner.
- Usually you just want to create your own implementation that aggregates configapp.App.
- type app struct{
- baseApp *configapp.App
- }
- func setupApp() (*app, error) {
- serviceName := "service_name" // snake case as it's required by prometheus
- baseApp := configapp.NewBaseApp(serviceName, appVersion)
- err = baseApp.Init(func(cfg configapp.Config, logger *zap.Logger, consumer app.UserListStreamConsumerInterface) (loader.Runner, error) {
- retrier := retry.NewRetrier(retry.NewBackoff(retry.DefaultMaxAttempt, retry.DefaultMaxTime, retry.DefaultMinTime))
- metricsLoader := metrics.NewLoader(serviceName, func (err error) string {
- switch err {
- case nil:
- return "no error"
- default:
- return "bad_response"
- }
- })
- // config.App uses loader.Runner interface, you can use both SingleRunner, BatchRunner or implement own runner.
- return loader.NewSingleRunner(ownLoader, consumer, retrier, metricsLoader), nil
- })
- return &app{
- baseApp: baseApp,
- }, err
- }
- func (a *app) Run() {
- a.baseApp.Run()
- }
- func (a *app) Close() {
- a.baseApp.Close()
- }
- Running app
- Then run this in main.
- func main(){
- app, err :=setupApp()
- ...
- app.Run()
- app.Close()
- }
- Add on start and on shutdown
- If you want to add function that will live with an app, you can use AddStartingOnRunF method.
- app.AddStartingOnRunF(func(ctx context.Context, ch chan<- os.Signal, wg *sync.WaitGroup) {
- defer wg.Done() // you should be sure to call wg.Done
- for {
- // if some fatal error was happened, you can signal to stop app
- if err!=nil{
- ch <- syscall.SIGINT
- }
- }
- })
- If you need to stop some execution, you can use AddShutdownF, check for ctx.Done as it uses timeout.
- app.AddShutDownF(func(ctx context.Context) {
- <-ctx.Done()
- // stop smth gracefully
- })
- loader
- Loader package provides wrappers on loaders with retrying option, metrics.
- Loader package contains Single and Batch runners implementation of Runner interface.
- Single runner provides callback function that accepts only one incoming event.
- Batch runner provides callback function that accepts slice of incoming events,
- that can be set up using REDIS_BATCH_COUNT env variable.
- Note: current implementation of redis streams message queue always pass only one event in slice.
- To create Single or Batch runner you should implement Processor or BatchProcessor.
- Skipped return parameter means that for OnErr methods runners won't count this as error,
- and for BeforeProcess method, that it will increment skipped event counter in metrics.
- If you want to skip event during Process method, you can return metrics.ErrSkipEvent
- OnErrMethod, also using context you can get access to span and set additional info.
- span := trace.FromContext(ctx)
- if errors.Cause(err) == ErrEntityExists {
- span.SetStatus(trace.Status{
- Code: int32(code.Code_ALREADY_EXISTS),
- Message: err.Error(),
- })
- return true
- }
- span.SetStatus(trace.Status{
- Code: int32(code.Code_UNKNOWN),
- Message: err.Error(),
- })
- retrier
- Retry package provides ability of retrying functions.
- By default, it just passes errors forward and doesn't retry, but you can specify errors,
- on these errors retrier retries execution.
- So if your Proccess function returns an error, that was specified as retry error in Retrier,
- then it will be retried according to backoff setup.
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement