Advertisement
Guest User

Untitled

a guest
Mar 17th, 2017
64
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 1.96 KB | None | 0 0
  1. package main
  2.  
  3. import "log"
  4.  
  5. type Message interface{}
  6.  
  7. type Syncer interface {
  8.     Sync() error
  9. }
  10.  
  11. type EntityId interface{}
  12.  
  13. type Entity interface {
  14.     Id() EntityId
  15.     Consume(Message) (Syncer, bool)
  16. }
  17.  
  18. type Downstream chan Message
  19.  
  20. type StopChan chan struct{}
  21.  
  22. type EntityFactory func(message Message) Entity
  23.  
  24. type EntityRunner struct {
  25.     entity     Entity
  26.     downstream Downstream
  27. }
  28.  
  29. type Core struct {
  30.     sources  map[Downstream]StopChan
  31.     mux      Downstream
  32.     entities map[EntityId]EntityRunner
  33.     logger   *log.Logger
  34.  
  35.     upstreamHandlerCount int
  36. }
  37.  
  38. func (c *Core) WithSource(src Downstream) *Core {
  39.     stopChan := make(StopChan)
  40.     c.sources[src] = stopChan
  41.     go func() {
  42.         for {
  43.             select {
  44.             case msg := <-src:
  45.                 c.mux <- msg
  46.             case <-stopChan:
  47.                 return
  48.             }
  49.         }
  50.     }()
  51.     return c
  52. }
  53.  
  54. func (c *Core) RemoveSource(src Downstream) *Core {
  55.     if s, ok := c.sources[src]; ok {
  56.         close(s)
  57.         delete(c.sources, src)
  58.     }
  59.     return c
  60. }
  61.  
  62. func (core *Core) Run(stopChan StopChan, efactory EntityFactory) {
  63.     eStop := make(StopChan)
  64.     upstream := make(chan Syncer)
  65.  
  66.     for i := 0; i < core.upstreamHandlerCount; i++ {
  67.         go func() {
  68.             for {
  69.                 select {
  70.                 case upstreamMsg := <-upstream:
  71.                     upstreamMsg.Sync()
  72.                 case <-eStop:
  73.                     return
  74.                 }
  75.             }
  76.         }()
  77.     }
  78.  
  79.     for {
  80.         select {
  81.         case msg := <-core.mux:
  82.             entity := efactory(msg)
  83.             id := entity.Id()
  84.             if entityRunner, ok := core.entities[id]; ok {
  85.                 entityRunner.downstream <- msg
  86.             } else {
  87.                 entityDownstream := make(Downstream)
  88.                 core.entities[id] = EntityRunner{entity, entityDownstream}
  89.                 go func() {
  90.                     for {
  91.                         select {
  92.                         case entityMsg := <-entityDownstream:
  93.                             if res, ok := entity.Consume(entityMsg); ok {
  94.                                 upstream <- res
  95.                             }
  96.                         case <-eStop:
  97.                             return
  98.                         }
  99.                     }
  100.                 }()
  101.                 entityDownstream <- msg
  102.             }
  103.         case <-stopChan:
  104.             close(eStop)
  105.             core.entities = make(map[EntityId]EntityRunner)
  106.             return
  107.         }
  108.     }
  109. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement