Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package main
- import "log"
- type Message interface{}
- type Syncer interface {
- Sync() error
- }
- type EntityId interface{}
- type Entity interface {
- Id() EntityId
- Consume(Message) (Syncer, bool)
- }
- type Downstream chan Message
- type StopChan chan struct{}
- type EntityFactory func(message Message) Entity
- type EntityRunner struct {
- entity Entity
- downstream Downstream
- }
- type Core struct {
- sources map[Downstream]StopChan
- mux Downstream
- entities map[EntityId]EntityRunner
- logger *log.Logger
- upstreamHandlerCount int
- stopChan StopChan
- }
- func NewCore(logger *log.Logger, upstreamCount int) *Core {
- return &Core{
- sources: make(map[Downstream]StopChan),
- mux: make(Downstream),
- entities: make(map[EntityId]EntityRunner),
- logger: logger,
- upstreamHandlerCount: upstreamCount,
- stopChan: make(StopChan),
- }
- }
- func (c *Core) WithSources(srcList ...Downstream) *Core {
- for _, src := range srcList {
- stopChan := make(StopChan)
- c.sources[src] = stopChan
- go func(st StopChan, source Downstream) {
- for {
- select {
- case msg := <-source:
- c.mux <- msg
- case <-st:
- return
- }
- }
- }(stopChan, src)
- }
- return c
- }
- func (c *Core) RemoveSource(src Downstream) *Core {
- if s, ok := c.sources[src]; ok {
- close(s)
- delete(c.sources, src)
- }
- return c
- }
- func (core *Core) Stop() {
- close(core.stopChan)
- }
- func (core *Core) Run(efactory EntityFactory) {
- go func() {
- upstream := make(chan Syncer)
- for i := 0; i < core.upstreamHandlerCount; i++ {
- go func() {
- for {
- select {
- case upstreamMsg := <-upstream:
- upstreamMsg.Sync()
- case <-core.stopChan:
- return
- }
- }
- }()
- }
- for {
- select {
- case msg := <-core.mux:
- entity := efactory(msg)
- id := entity.Id()
- if entityRunner, ok := core.entities[id]; ok {
- entityRunner.downstream <- msg
- } else {
- entityDownstream := make(Downstream)
- core.entities[id] = EntityRunner{entity, entityDownstream}
- go func() {
- for {
- select {
- case entityMsg := <-entityDownstream:
- if res, ok := entity.Consume(entityMsg); ok {
- upstream <- res
- }
- case <-core.stopChan:
- return
- }
- }
- }()
- entityDownstream <- msg
- }
- case <-core.stopChan:
- core.entities = make(map[EntityId]EntityRunner)
- return
- }
- }
- }()
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement