Advertisement
Guest User

Untitled

a guest
May 23rd, 2019
61
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.73 KB | None | 0 0
  1. package main
  2.  
  3. import (
  4. "context"
  5. "flag"
  6. "fmt"
  7. "log"
  8. "os"
  9. "strings"
  10. "time"
  11.  
  12. "cloud.google.com/go/spanner"
  13. "go.opencensus.io/trace"
  14. "google.golang.org/api/option"
  15. gtransport "google.golang.org/api/transport/grpc"
  16. sppb "google.golang.org/genproto/googleapis/spanner/v1"
  17. )
  18.  
  19. type customExporter struct{}
  20.  
  21. // Compile time assertion that the exporter implements trace.Exporter
  22. var _ trace.Exporter = (*customExporter)(nil)
  23.  
  24. func (cse *customExporter) ExportSpan(sd *trace.SpanData) {
  25. // only print logs for session wait
  26. for _, annotation := range sd.Annotations {
  27. if strings.HasPrefix(annotation.Message, "Waiting for") {
  28. log.Println(annotation.Message)
  29. }
  30. }
  31. }
  32.  
  33. func main() {
  34. var project string
  35. var instance string
  36. var database string
  37.  
  38. flag.StringVar(&project, "project", "", "")
  39. flag.StringVar(&instance, "instance", "", "")
  40. flag.StringVar(&database, "database", "", "")
  41. flag.Parse()
  42.  
  43. if project == "" || instance == "" || database == "" {
  44. flag.Usage()
  45. os.Exit(1)
  46. }
  47.  
  48. trace.ApplyConfig(trace.Config{DefaultSampler: trace.AlwaysSample()})
  49. trace.RegisterExporter(new(customExporter))
  50. ctx := context.Background()
  51.  
  52. sessionLabelKey := "client_ts"
  53. sessionLabelValue := fmt.Sprintf("%d", time.Now().Unix())
  54. sessionFilter := fmt.Sprintf("labels.%s:%s\n", sessionLabelKey, sessionLabelValue)
  55.  
  56. dbPath := fmt.Sprintf("projects/%s/instances/%s/databases/%s", project, instance, database)
  57. client, err := spanner.NewClientWithConfig(ctx, dbPath, spanner.ClientConfig{
  58. SessionPoolConfig: spanner.SessionPoolConfig{
  59. MinOpened: 5,
  60. MaxOpened: 30,
  61. MaxIdle: 10,
  62. MaxBurst: 30,
  63. },
  64. SessionLabels: map[string]string{
  65. sessionLabelKey: sessionLabelValue,
  66. },
  67. })
  68. if err != nil {
  69. log.Fatal(err)
  70. }
  71. defer client.Close()
  72.  
  73. // do transactions after 10 sec
  74. time.AfterFunc(time.Second*10, func() {
  75. log.Printf("Run 100 transactions in parallel\n")
  76. for i := 0; i < 100; i++ {
  77. go func() {
  78. client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
  79. time.Sleep(time.Second * 1)
  80. return nil
  81. })
  82. }()
  83. }
  84. })
  85.  
  86. // check sessions periodically
  87. for {
  88. count, err := getCurrentSessionCount(ctx, dbPath, sessionFilter)
  89. if err != nil {
  90. log.Fatal(err)
  91. }
  92. log.Printf("Number of sessions: %d\n", count)
  93. }
  94. }
  95.  
  96. func getCurrentSessionCount(ctx context.Context, dbPath string, filter string) (int, error) {
  97. allOpts := []option.ClientOption{
  98. option.WithEndpoint("spanner.googleapis.com:443"),
  99. }
  100. conn, err := gtransport.Dial(ctx, allOpts...)
  101. if err != nil {
  102. return 0, err
  103. }
  104. rpcClient := sppb.NewSpannerClient(conn)
  105.  
  106. resp, err := rpcClient.ListSessions(ctx, &sppb.ListSessionsRequest{
  107. Database: dbPath,
  108. Filter: filter,
  109. })
  110. if err != nil {
  111. return 0, err
  112. }
  113.  
  114. return len(resp.Sessions), nil
  115. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement