Advertisement
Guest User

Untitled

a guest
Mar 17th, 2017
77
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 2.05 KB | None | 0 0
  1. package main
  2.  
  3. import "log"
  4.  
  5. type Message interface{}
  6.  
  7. type Syncer interface {
  8.     Sync() error
  9. }
  10.  
  11. type EntityId interface{}
  12.  
  13. type Entity interface {
  14.     Id() EntityId
  15.     Consume(Message) (Syncer, bool)
  16. }
  17.  
  18. type Downstream chan Message
  19.  
  20. type StopChan chan struct{}
  21.  
  22. type EntityFactory func(message Message) Entity
  23.  
  24. type EntityRunner struct {
  25.     entity     Entity
  26.     downstream Downstream
  27. }
  28.  
  29. type Core struct {
  30.     sources  map[Downstream]StopChan
  31.     mux      Downstream
  32.     entities map[EntityId]EntityRunner
  33.     logger   *log.Logger
  34.  
  35.     upstreamHandlerCount int
  36. }
  37.  
  38. func (c *Core) WithSources(srcList ...Downstream) *Core {
  39.     for _, src := range srcList {
  40.         stopChan := make(StopChan)
  41.         c.sources[src] = stopChan
  42.         go func(st StopChan, source Downstream) {
  43.             for {
  44.                 select {
  45.                 case msg := <-source:
  46.                     c.mux <- msg
  47.                 case <-st:
  48.                     return
  49.                 }
  50.             }
  51.         }(stopChan, src)
  52.     }
  53.     return c
  54. }
  55.  
  56. func (c *Core) RemoveSource(src Downstream) *Core {
  57.     if s, ok := c.sources[src]; ok {
  58.         close(s)
  59.         delete(c.sources, src)
  60.     }
  61.     return c
  62. }
  63.  
  64. func (core *Core) Run(stopChan StopChan, efactory EntityFactory) {
  65.     eStop := make(StopChan)
  66.     upstream := make(chan Syncer)
  67.  
  68.     for i := 0; i < core.upstreamHandlerCount; i++ {
  69.         go func() {
  70.             for {
  71.                 select {
  72.                 case upstreamMsg := <-upstream:
  73.                     upstreamMsg.Sync()
  74.                 case <-eStop:
  75.                     return
  76.                 }
  77.             }
  78.         }()
  79.     }
  80.  
  81.     for {
  82.         select {
  83.         case msg := <-core.mux:
  84.             entity := efactory(msg)
  85.             id := entity.Id()
  86.             if entityRunner, ok := core.entities[id]; ok {
  87.                 entityRunner.downstream <- msg
  88.             } else {
  89.                 entityDownstream := make(Downstream)
  90.                 core.entities[id] = EntityRunner{entity, entityDownstream}
  91.                 go func() {
  92.                     for {
  93.                         select {
  94.                         case entityMsg := <-entityDownstream:
  95.                             if res, ok := entity.Consume(entityMsg); ok {
  96.                                 upstream <- res
  97.                             }
  98.                         case <-eStop:
  99.                             return
  100.                         }
  101.                     }
  102.                 }()
  103.                 entityDownstream <- msg
  104.             }
  105.         case <-stopChan:
  106.             close(eStop)
  107.             core.entities = make(map[EntityId]EntityRunner)
  108.             return
  109.         }
  110.     }
  111. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement