Advertisement
Guest User

Untitled

a guest
Mar 25th, 2019
75
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 1.33 KB | None | 0 0
  1. package botmeans
  2.  
  3. import (
  4.     "time"
  5. )
  6.  
  7. //Executer is a executable operation
  8. type Executer interface {
  9.     Id() int64
  10.     Execute()
  11. }
  12.  
  13. //RunMachine creates the machine, which executes Executers in parallel, but Executers with the same id are executed serially
  14. func RunMachine(queueStream chan Executer, interval time.Duration) chan interface{} {
  15.     stopChan := make(chan interface{})
  16.     queueChanMap := make(map[int64]chan Executer)
  17.     handlerClosedChan := make(chan int64)
  18.  
  19.     handler := func(ch chan Executer, ID int64) {
  20.         defer func() {
  21.             if r := recover(); r != nil {
  22.                 handlerClosedChan <- ID
  23.             }
  24.         }()
  25.         exitSignaller := time.After(interval)
  26.         for {
  27.             select {
  28.             case queue := <-ch:
  29.                 queue.Execute()
  30.                 exitSignaller = time.After(interval)
  31.             case <-exitSignaller:
  32.                 handlerClosedChan <- ID
  33.                 return
  34.             }
  35.         }
  36.     }
  37.     go func() {
  38.         for {
  39.             select {
  40.             case queue := <-queueStream:
  41.                 if queue == nil {
  42.                     continue
  43.                 }
  44.                 ID := queue.Id()
  45.                 var queueChan chan Executer
  46.                 ok := false
  47.                 if queueChan, ok = queueChanMap[ID]; !ok {
  48.                     queueChan = make(chan Executer)
  49.                     queueChanMap[ID] = queueChan
  50.                     go handler(queueChan, ID)
  51.                 }
  52.                 queueChan <- queue
  53.             case id := <-handlerClosedChan:
  54.                 delete(queueChanMap, id)
  55.             case <-stopChan:
  56.                 return
  57.             }
  58.         }
  59.     }()
  60.     return stopChan
  61. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement