Advertisement
Guest User

Untitled

a guest
Mar 17th, 2017
64
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 2.40 KB | None | 0 0
  1. package main
  2.  
  3. type Message interface{}
  4.  
  5. type Syncer interface {
  6.     Sync() error
  7. }
  8.  
  9. type EntityId interface{}
  10.  
  11. type Entity interface {
  12.     Id() EntityId
  13.     Consume(Message) (Syncer, bool)
  14. }
  15.  
  16. type Downstream chan Message
  17.  
  18. type StopChan chan struct{}
  19.  
  20. type EntityFactory func(message Message) Entity
  21.  
  22. type EntityRunner struct {
  23.     entity     Entity
  24.     downstream Downstream
  25. }
  26.  
  27. type Core struct {
  28.     sources  map[Downstream]StopChan
  29.     mux      Downstream
  30.     entities map[EntityId]EntityRunner
  31.  
  32.     upstreamHandlerCount int
  33.  
  34.     stopChan StopChan
  35. }
  36.  
  37. func NewCore(upstreamCount int) *Core {
  38.     return &Core{
  39.         sources:              make(map[Downstream]StopChan),
  40.         mux:                  make(Downstream),
  41.         entities:             make(map[EntityId]EntityRunner),
  42.         upstreamHandlerCount: upstreamCount,
  43.         stopChan:             make(StopChan),
  44.     }
  45. }
  46.  
  47. func (c *Core) WithSources(srcList ...Downstream) *Core {
  48.     for _, src := range srcList {
  49.         stopChan := make(StopChan)
  50.         c.sources[src] = stopChan
  51.         go func(st StopChan, source Downstream) {
  52.             for {
  53.                 select {
  54.                 case msg := <-source:
  55.                     c.mux <- msg
  56.                 case <-st:
  57.                     return
  58.                 }
  59.             }
  60.         }(stopChan, src)
  61.     }
  62.     return c
  63. }
  64.  
  65. func (c *Core) RemoveSource(src Downstream) *Core {
  66.     if s, ok := c.sources[src]; ok {
  67.         close(s)
  68.         delete(c.sources, src)
  69.     }
  70.     return c
  71. }
  72.  
  73. func (core *Core) Stop() {
  74.     close(core.stopChan)
  75. }
  76. func (core *Core) Run(efactory EntityFactory) {
  77.     go func() {
  78.         upstream := make(chan Syncer)
  79.  
  80.         for i := 0; i < core.upstreamHandlerCount; i++ {
  81.             go func() {
  82.                 for {
  83.                     select {
  84.                     case upstreamMsg := <-upstream:
  85.                         upstreamMsg.Sync()
  86.                     case <-core.stopChan:
  87.                         return
  88.                     }
  89.                 }
  90.             }()
  91.         }
  92.  
  93.         for {
  94.             select {
  95.             case msg := <-core.mux:
  96.                 entity := efactory(msg)
  97.                 id := entity.Id()
  98.                 if entityRunner, ok := core.entities[id]; ok {
  99.                     entityRunner.downstream <- msg
  100.                 } else {
  101.                     entityDownstream := make(Downstream)
  102.                     core.entities[id] = EntityRunner{entity, entityDownstream}
  103.                     go func() {
  104.                         for {
  105.                             select {
  106.                             case entityMsg := <-entityDownstream:
  107.                                 if res, ok := entity.Consume(entityMsg); ok {
  108.                                     upstream <- res
  109.                                 }
  110.                             case <-core.stopChan:
  111.                                 return
  112.                             }
  113.                         }
  114.                     }()
  115.                     entityDownstream <- msg
  116.                 }
  117.             case <-core.stopChan:
  118.                 core.entities = make(map[EntityId]EntityRunner)
  119.                 return
  120.             }
  121.         }
  122.     }()
  123. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement