Guest User

test-serf.go

a guest
Apr 26th, 2018
42
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 2.52 KB | None | 0 0
  1. package main
  2.  
  3. import (
  4.     "fmt"
  5.     "log"
  6.     "os"
  7.     "sync"
  8.  
  9.     "github.com/hashicorp/memberlist"
  10.     "github.com/hashicorp/serf/serf"
  11. )
  12.  
  13. const (
  14.     UDPBufferSize = 2000
  15.     baseAddress   = "127.0.0.1"
  16. )
  17.  
  18. var (
  19.     nodeAddress string
  20.     NODE_ID     = os.Getenv("NODE_ID")
  21.     knownNodes  = []string{fmt.Sprintf("%s:%s", baseAddress, NODE_ID)}
  22.     broadcasts  *memberlist.TransmitLimitedQueue
  23. )
  24.  
  25. type broadcast struct {
  26.     msg    []byte
  27.     notify chan<- struct{}
  28. }
  29.  
  30. type delegate struct{}
  31.  
  32. type update struct {
  33.     Action string // add, del
  34.     Data   map[string]string
  35. }
  36.  
  37. func (b *broadcast) Invalidates(other memberlist.Broadcast) bool {
  38.     return false
  39. }
  40.  
  41. func (b *broadcast) Message() []byte {
  42.     return b.msg
  43. }
  44.  
  45. func (b *broadcast) Finished() {
  46.     if b.notify != nil {
  47.         close(b.notify)
  48.     }
  49. }
  50.  
  51. func (d *delegate) NodeMeta(limit int) []byte {
  52.     return []byte{}
  53. }
  54.  
  55. func (d *delegate) NotifyMsg(b []byte) {
  56.     if len(b) == 0 {
  57.         return
  58.     }
  59.     fmt.Println("Received message")
  60. }
  61.  
  62. func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
  63.     return broadcasts.GetBroadcasts(overhead, limit)
  64. }
  65.  
  66. func (d *delegate) LocalState(join bool) []byte {
  67.     fmt.Println("push pulllll")
  68.     return []byte{}
  69. }
  70.  
  71. func (d *delegate) MergeRemoteState(buf []byte, join bool) {
  72. }
  73.  
  74. func configServer() error {
  75.     hostname, _ := os.Hostname()
  76.     memberlistConfig := memberlist.DefaultLocalConfig()
  77.     memberlistConfig.Name = nodeAddress
  78.     memberlistConfig.BindAddr = baseAddress
  79.     if _, err := fmt.Sscanf(NODE_ID, "%d", &memberlistConfig.BindPort); err != nil {
  80.         return err
  81.     }
  82.     memberlistConfig.PushPullInterval = 0
  83.     memberlistConfig.UDPBufferSize = UDPBufferSize
  84.     memberlistConfig.Delegate = &delegate{}
  85.     memberlistConfig.Name = nodeAddress
  86.  
  87.     serfConfig := serf.DefaultConfig()
  88.     serfConfig.NodeName = nodeAddress
  89.     // serfConfig.EventCh = serfEvents
  90.     serfConfig.MemberlistConfig = memberlistConfig
  91.  
  92.     m, err := serf.Create(serfConfig)
  93.  
  94.     if err != nil {
  95.         return err
  96.     }
  97.     if nodeAddress != knownNodes[0] {
  98.         _, err := m.Join(knownNodes, false)
  99.         if err != nil {
  100.             return err
  101.         }
  102.     }
  103.     broadcasts = &memberlist.TransmitLimitedQueue{
  104.         NumNodes: func() int {
  105.             return m.NumNodes()
  106.         },
  107.         RetransmitMult: 1,
  108.     }
  109.     node := m.LocalMember()
  110.     fmt.Printf("Local member %s:%d\n", node.Addr, node.Port)
  111.     return nil
  112. }
  113.  
  114. func main() {
  115.     var err error
  116.     var wg sync.WaitGroup
  117.  
  118.     nodeAddress = fmt.Sprintf("%s:%s", baseAddress, NODE_ID)
  119.  
  120.     wg.Add(1)
  121.     go func() {
  122.         err = configServer()
  123.         if err != nil {
  124.             log.Panic(err)
  125.         }
  126.         wg.Done()
  127.     }()
  128.     wg.Wait()
  129.  
  130. }
Advertisement
Add Comment
Please, Sign In to add comment