Advertisement
Guest User

Untitled

a guest
Mar 17th, 2017
64
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 2.49 KB | None | 0 0
  1. package main
  2.  
  3. import "log"
  4.  
  5. type Message interface{}
  6.  
  7. type Syncer interface {
  8.     Sync() error
  9. }
  10.  
  11. type EntityId interface{}
  12.  
  13. type Entity interface {
  14.     Id() EntityId
  15.     Consume(Message) (Syncer, bool)
  16. }
  17.  
  18. type Downstream chan Message
  19.  
  20. type StopChan chan struct{}
  21.  
  22. type EntityFactory func(message Message) Entity
  23.  
  24. type EntityRunner struct {
  25.     entity     Entity
  26.     downstream Downstream
  27. }
  28.  
  29. type Core struct {
  30.     sources  map[Downstream]StopChan
  31.     mux      Downstream
  32.     entities map[EntityId]EntityRunner
  33.     logger   *log.Logger
  34.  
  35.     upstreamHandlerCount int
  36.  
  37.     stopChan StopChan
  38. }
  39.  
  40. func NewCore(logger *log.Logger, upstreamCount int) *Core {
  41.     return &Core{
  42.         sources:              make(map[Downstream]StopChan),
  43.         mux:                  make(Downstream),
  44.         entities:             make(map[EntityId]EntityRunner),
  45.         logger:               logger,
  46.         upstreamHandlerCount: upstreamCount,
  47.         stopChan:             make(StopChan),
  48.     }
  49. }
  50.  
  51. func (c *Core) WithSources(srcList ...Downstream) *Core {
  52.     for _, src := range srcList {
  53.         stopChan := make(StopChan)
  54.         c.sources[src] = stopChan
  55.         go func(st StopChan, source Downstream) {
  56.             for {
  57.                 select {
  58.                 case msg := <-source:
  59.                     c.mux <- msg
  60.                 case <-st:
  61.                     return
  62.                 }
  63.             }
  64.         }(stopChan, src)
  65.     }
  66.     return c
  67. }
  68.  
  69. func (c *Core) RemoveSource(src Downstream) *Core {
  70.     if s, ok := c.sources[src]; ok {
  71.         close(s)
  72.         delete(c.sources, src)
  73.     }
  74.     return c
  75. }
  76.  
  77. func (core *Core) Stop() {
  78.     close(core.stopChan)
  79. }
  80. func (core *Core) Run(efactory EntityFactory) {
  81.     go func() {
  82.         upstream := make(chan Syncer)
  83.  
  84.         for i := 0; i < core.upstreamHandlerCount; i++ {
  85.             go func() {
  86.                 for {
  87.                     select {
  88.                     case upstreamMsg := <-upstream:
  89.                         upstreamMsg.Sync()
  90.                     case <-core.stopChan:
  91.                         return
  92.                     }
  93.                 }
  94.             }()
  95.         }
  96.  
  97.         for {
  98.             select {
  99.             case msg := <-core.mux:
  100.                 entity := efactory(msg)
  101.                 id := entity.Id()
  102.                 if entityRunner, ok := core.entities[id]; ok {
  103.                     entityRunner.downstream <- msg
  104.                 } else {
  105.                     entityDownstream := make(Downstream)
  106.                     core.entities[id] = EntityRunner{entity, entityDownstream}
  107.                     go func() {
  108.                         for {
  109.                             select {
  110.                             case entityMsg := <-entityDownstream:
  111.                                 if res, ok := entity.Consume(entityMsg); ok {
  112.                                     upstream <- res
  113.                                 }
  114.                             case <-core.stopChan:
  115.                                 return
  116.                             }
  117.                         }
  118.                     }()
  119.                     entityDownstream <- msg
  120.                 }
  121.             case <-core.stopChan:
  122.                 core.entities = make(map[EntityId]EntityRunner)
  123.                 return
  124.             }
  125.         }
  126.     }()
  127. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement