Advertisement
Guest User

SearchAttributesWorkflow

a guest
Oct 17th, 2019
156
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 6.07 KB | None | 0 0
  1. package main
  2.  
  3. import (
  4.     "bytes"
  5.     "context"
  6.     "errors"
  7.     "fmt"
  8.     "go.uber.org/cadence/activity"
  9.     "time"
  10.  
  11.     "github.com/uber-common/cadence-samples/cmd/samples/common"
  12.     "go.uber.org/cadence/.gen/go/shared"
  13.     "go.uber.org/cadence/client"
  14.     "go.uber.org/cadence/workflow"
  15.     "go.uber.org/zap"
  16. )
  17.  
  18. /**
  19.  * This sample shows how to use search attributes. (Note this feature only work with ElasticSearch)
  20.  */
  21.  
  22. // ApplicationName is the task list for this sample
  23. const ApplicationName = "searchAttributesGroup"
  24.  
  25. // ClientKey is the key for lookup
  26. type ClientKey int
  27.  
  28. const (
  29.     // DomainName used for this sample
  30.     DomainName = "samples-domain"
  31.     // CadenceClientKey for retrieving cadence client from context
  32.     CadenceClientKey ClientKey = iota
  33. )
  34.  
  35. var (
  36.     // ErrCadenceClientNotFound when cadence client is not found on context
  37.     ErrCadenceClientNotFound = errors.New("failed to retrieve cadence client from context")
  38. )
  39.  
  40. // This is registration process where you register all your workflows
  41. // and activity function handlers.
  42. func init() {
  43.     workflow.Register(SearchAttributesWorkflow)
  44.     activity.Register(listExecutions)
  45.     activity.Register(TestActivity)
  46. }
  47.  
  48.  
  49. func TestActivity(ctx context.Context) (string, error) {
  50.     logger := activity.GetLogger(ctx)
  51.     logger.Info("Running test activity")
  52.     time.Sleep(time.Second * 1)
  53.     return "ok", nil
  54. }
  55.  
  56. // SearchAttributesWorkflow workflow decider
  57. func SearchAttributesWorkflow(ctx workflow.Context) error {
  58.     logger := workflow.GetLogger(ctx)
  59.     logger.Info("SearchAttributes workflow started")
  60.  
  61.     // get search attributes that provided when start workflow
  62.     //info := workflow.GetInfo(ctx)
  63.     //val := info.SearchAttributes.IndexedFields["CustomIntField"]
  64.     //var currentIntValue int
  65.     //err := client.NewValue(val).Get(&currentIntValue)
  66.     //if err != nil {
  67.     //  logger.Error("Get search attribute failed", zap.Error(err))
  68.     //  return err
  69.     //}
  70.     //logger.Info("Current Search Attributes: ", zap.String("CustomIntField", strconv.Itoa(currentIntValue)))
  71.  
  72.     // upsert search attributes
  73.     attributes := map[string]interface{}{
  74.         "CustomIntField":      2, // update CustomIntField from 1 to 2, then insert other fields
  75.         "CustomKeywordField":  "Update1",
  76.         "CustomBoolField":     true,
  77.         "CustomDoubleField":   3.14,
  78.         "CustomDatetimeField": time.Date(2019, 1, 1, 0, 0, 0, 0, time.Local),
  79.         "CustomStringField":   "String field is for text. When query, it will be tokenized for partial match. StringTypeField cannot be used in Order By",
  80.     }
  81.     workflow.UpsertSearchAttributes(ctx, attributes)
  82.  
  83.     // print current search attributes
  84.     info := workflow.GetInfo(ctx)
  85.     err := printSearchAttributes(info.SearchAttributes, logger)
  86.     if err != nil {
  87.         return err
  88.     }
  89.  
  90.     ao := workflow.ActivityOptions{
  91.         ScheduleToStartTimeout: 2 * time.Minute,
  92.         StartToCloseTimeout:    2 * time.Minute,
  93.         HeartbeatTimeout:       time.Second * 20,
  94.     }
  95.     ctx = workflow.WithActivityOptions(ctx, ao)
  96.  
  97.     var result interface{}
  98.     err = workflow.ExecuteActivity(ctx, TestActivity).Get(ctx, &result)
  99.     if err != nil {
  100.         return err
  101.     }
  102.  
  103.     // update search attributes again
  104.     attributes = map[string]interface{}{
  105.         "CustomKeywordField": "Update2",
  106.         "CustomBoolField":     true,
  107.     }
  108.     workflow.UpsertSearchAttributes(ctx, attributes)
  109.  
  110.     // print current search attributes
  111.     info = workflow.GetInfo(ctx)
  112.     err = printSearchAttributes(info.SearchAttributes, logger)
  113.     if err != nil {
  114.         return err
  115.     }
  116.  
  117.     // update search attributes again
  118.     attributes = map[string]interface{}{
  119.         "CustomKeywordField": "Update2",
  120.     }
  121.     workflow.UpsertSearchAttributes(ctx, attributes)
  122.  
  123.     // print current search attributes
  124.     info = workflow.GetInfo(ctx)
  125.     err = printSearchAttributes(info.SearchAttributes, logger)
  126.     if err != nil {
  127.         return err
  128.     }
  129.  
  130.     workflow.Sleep(ctx, 2*time.Second) // wait update reflected on ElasticSearch
  131.  
  132.     // list workflow
  133.     query := "CustomIntField=2 and CustomKeywordField='Update2' order by CustomDatetimeField DESC"
  134.     var listResults []*shared.WorkflowExecutionInfo
  135.     err = workflow.ExecuteActivity(ctx, listExecutions, query).Get(ctx, &listResults)
  136.     if err != nil {
  137.         logger.Error("Failed to list workflow executions.", zap.Error(err))
  138.         return err
  139.     }
  140.  
  141.     logger.Info("Workflow completed.", zap.String("Execution", listResults[0].String()))
  142.  
  143.     return nil
  144. }
  145.  
  146. func printSearchAttributes(searchAttributes *shared.SearchAttributes, logger *zap.Logger) error {
  147.     buf := new(bytes.Buffer)
  148.     for k, v := range searchAttributes.IndexedFields {
  149.         var currentVal interface{}
  150.         err := client.NewValue(v).Get(&currentVal)
  151.         if err != nil {
  152.             logger.Error(fmt.Sprintf("Get search attribute for key %s failed", k), zap.Error(err))
  153.             return err
  154.         }
  155.         fmt.Fprintf(buf, "%s=%v\n", k, currentVal)
  156.     }
  157.     logger.Info(fmt.Sprintf("Current Search Attributes: \n%s", buf.String()))
  158.     return nil
  159. }
  160.  
  161. func listExecutions(ctx context.Context, query string) ([]*shared.WorkflowExecutionInfo, error) {
  162.     logger := activity.GetLogger(ctx)
  163.     logger.Info("List executions.", zap.String("Query", query))
  164.  
  165.     cadenceClient, err := getCadenceClientFromContext(ctx)
  166.     if err != nil {
  167.         logger.Error("Error when get cadence client")
  168.         return nil, err
  169.     }
  170.  
  171.     var executions []*shared.WorkflowExecutionInfo
  172.     var nextPageToken []byte
  173.     for hasMore := true; hasMore; hasMore = len(nextPageToken) > 0 {
  174.         resp, err := cadenceClient.ListWorkflow(ctx, &shared.ListWorkflowExecutionsRequest{
  175.             Domain:        common.StringPtr(DomainName),
  176.             PageSize:      common.Int32Ptr(10),
  177.             NextPageToken: nextPageToken,
  178.             Query:         common.StringPtr(query),
  179.         })
  180.         if err != nil {
  181.             return nil, err
  182.         }
  183.  
  184.         for _, r := range resp.Executions {
  185.             executions = append(executions, r)
  186.         }
  187.  
  188.         nextPageToken = resp.NextPageToken
  189.         activity.RecordHeartbeat(ctx, nextPageToken)
  190.     }
  191.  
  192.     return executions, nil
  193. }
  194.  
  195. func getCadenceClientFromContext(ctx context.Context) (client.Client, error) {
  196.     logger := activity.GetLogger(ctx)
  197.     cadenceClient := ctx.Value(CadenceClientKey).(client.Client)
  198.     if cadenceClient == nil {
  199.         logger.Error("Could not retrieve cadence client from context.")
  200.         return nil, ErrCadenceClientNotFound
  201.     }
  202.  
  203.     return cadenceClient, nil
  204. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement