Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package main
- import (
- "context"
- "flag"
- "fmt"
- "log"
- "os"
- "strings"
- "time"
- "cloud.google.com/go/spanner"
- "go.opencensus.io/trace"
- "google.golang.org/api/option"
- gtransport "google.golang.org/api/transport/grpc"
- sppb "google.golang.org/genproto/googleapis/spanner/v1"
- )
- type customExporter struct{}
- // Compile time assertion that the exporter implements trace.Exporter
- var _ trace.Exporter = (*customExporter)(nil)
- func (cse *customExporter) ExportSpan(sd *trace.SpanData) {
- // only print logs for session wait
- for _, annotation := range sd.Annotations {
- if strings.HasPrefix(annotation.Message, "Waiting for") {
- log.Println(annotation.Message)
- }
- }
- }
- func main() {
- var project string
- var instance string
- var database string
- flag.StringVar(&project, "project", "", "")
- flag.StringVar(&instance, "instance", "", "")
- flag.StringVar(&database, "database", "", "")
- flag.Parse()
- if project == "" || instance == "" || database == "" {
- flag.Usage()
- os.Exit(1)
- }
- trace.ApplyConfig(trace.Config{DefaultSampler: trace.AlwaysSample()})
- trace.RegisterExporter(new(customExporter))
- ctx := context.Background()
- sessionLabelKey := "client_ts"
- sessionLabelValue := fmt.Sprintf("%d", time.Now().Unix())
- sessionFilter := fmt.Sprintf("labels.%s:%s\n", sessionLabelKey, sessionLabelValue)
- dbPath := fmt.Sprintf("projects/%s/instances/%s/databases/%s", project, instance, database)
- client, err := spanner.NewClientWithConfig(ctx, dbPath, spanner.ClientConfig{
- SessionPoolConfig: spanner.SessionPoolConfig{
- MinOpened: 5,
- MaxOpened: 30,
- MaxIdle: 10,
- MaxBurst: 30,
- },
- SessionLabels: map[string]string{
- sessionLabelKey: sessionLabelValue,
- },
- })
- if err != nil {
- log.Fatal(err)
- }
- defer client.Close()
- // do transactions after 10 sec
- time.AfterFunc(time.Second*10, func() {
- log.Printf("Run 100 transactions in parallel\n")
- for i := 0; i < 100; i++ {
- go func() {
- client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
- time.Sleep(time.Second * 1)
- return nil
- })
- }()
- }
- })
- // check sessions periodically
- for {
- count, err := getCurrentSessionCount(ctx, dbPath, sessionFilter)
- if err != nil {
- log.Fatal(err)
- }
- log.Printf("Number of sessions: %d\n", count)
- }
- }
- func getCurrentSessionCount(ctx context.Context, dbPath string, filter string) (int, error) {
- allOpts := []option.ClientOption{
- option.WithEndpoint("spanner.googleapis.com:443"),
- }
- conn, err := gtransport.Dial(ctx, allOpts...)
- if err != nil {
- return 0, err
- }
- rpcClient := sppb.NewSpannerClient(conn)
- resp, err := rpcClient.ListSessions(ctx, &sppb.ListSessionsRequest{
- Database: dbPath,
- Filter: filter,
- })
- if err != nil {
- return 0, err
- }
- return len(resp.Sessions), nil
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement