Advertisement
Guest User

Untitled

a guest
Mar 17th, 2018
73
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.65 KB | None | 0 0
  1. // +build !solution
  2.  
  3. package multipaxos
  4.  
  5. import (
  6. "container/list"
  7. "time"
  8.  
  9. "github.com/uis-dat520-s18/glabs/grouplab1/detector"
  10. )
  11.  
  12. // Proposer represents a proposer as defined by the Multi-Paxos algorithm.
  13. type Proposer struct {
  14. id int
  15. quorum int
  16. n int
  17.  
  18. crnd Round
  19. adu SlotID
  20. nextSlot SlotID
  21.  
  22. promises []*Promise
  23. promiseCount int
  24.  
  25. /* Added */
  26. min SlotID
  27. max SlotID
  28. slotsOut []PromiseSlot
  29.  
  30. phaseOneDone bool
  31. phaseOneProgressTicker *time.Ticker
  32.  
  33. acceptsOut *list.List
  34. requestsIn *list.List
  35.  
  36. ld detector.LeaderDetector
  37. leader int
  38.  
  39. prepareOut chan<- Prepare
  40. acceptOut chan<- Accept
  41. promiseIn chan Promise
  42. cvalIn chan Value
  43.  
  44. incDcd chan struct{}
  45. stop chan struct{}
  46. }
  47.  
  48. // NewProposer returns a new Multi-Paxos proposer. It takes the following
  49. // arguments:
  50. //
  51. // id: The id of the node running this instance of a Paxos proposer.
  52. //
  53. // nrOfNodes: The total number of Paxos nodes.
  54. //
  55. // adu: all-decided-up-to. The initial id of the highest _consecutive_ slot
  56. // that has been decided. Should normally be set to -1 initially, but for
  57. // testing purposes it is passed in the constructor.
  58. //
  59. // ld: A leader detector implementing the detector.LeaderDetector interface.
  60. //
  61. // prepareOut: A send only channel used to send prepares to other nodes.
  62. //
  63. // The proposer's internal crnd field should initially be set to the value of
  64. // its id.
  65. func NewProposer(id, nrOfNodes, adu int, ld detector.LeaderDetector, prepareOut chan<- Prepare, acceptOut chan<- Accept) *Proposer {
  66. return &Proposer{
  67. id: id,
  68. quorum: (nrOfNodes / 2) + 1,
  69. n: nrOfNodes,
  70.  
  71. crnd: Round(id),
  72. adu: SlotID(adu),
  73. nextSlot: 0,
  74.  
  75. promises: make([]*Promise, nrOfNodes),
  76.  
  77. phaseOneProgressTicker: time.NewTicker(time.Second),
  78.  
  79. acceptsOut: list.New(),
  80. requestsIn: list.New(),
  81.  
  82. ld: ld,
  83. leader: ld.Leader(),
  84.  
  85. prepareOut: prepareOut,
  86. acceptOut: acceptOut,
  87. promiseIn: make(chan Promise, 8),
  88. cvalIn: make(chan Value, 8),
  89.  
  90. incDcd: make(chan struct{}),
  91. stop: make(chan struct{}),
  92. }
  93.  
  94. }
  95.  
  96. // Start starts p's main run loop as a separate goroutine.
  97. func (p *Proposer) Start() {
  98. trustMsgs := p.ld.Subscribe()
  99. go func() {
  100. for {
  101. select {
  102. case prm := <-p.promiseIn:
  103. accepts, output := p.handlePromise(prm)
  104. if !output {
  105. continue
  106. }
  107. p.nextSlot = p.adu + 1
  108. p.acceptsOut.Init()
  109. p.phaseOneDone = true
  110. for _, acc := range accepts {
  111. p.acceptsOut.PushBack(acc)
  112. }
  113. p.sendAccept()
  114. case cval := <-p.cvalIn:
  115. if p.id != p.leader {
  116. continue
  117. }
  118. p.requestsIn.PushBack(cval)
  119. if !p.phaseOneDone {
  120. continue
  121. }
  122. p.sendAccept()
  123. case <-p.incDcd:
  124. p.adu++
  125. if p.id != p.leader {
  126. continue
  127. }
  128. if !p.phaseOneDone {
  129. continue
  130. }
  131. p.sendAccept()
  132. case <-p.phaseOneProgressTicker.C:
  133. if p.id == p.leader && !p.phaseOneDone {
  134. p.startPhaseOne()
  135. }
  136. case leader := <-trustMsgs:
  137. p.leader = leader
  138. if leader == p.id {
  139. p.startPhaseOne()
  140. }
  141. case <-p.stop:
  142. return
  143. }
  144. }
  145. }()
  146. }
  147.  
  148. // Stop stops p's main run loop.
  149. func (p *Proposer) Stop() {
  150. p.stop <- struct{}{}
  151. }
  152.  
  153. // DeliverPromise delivers promise prm to proposer p.
  154. func (p *Proposer) DeliverPromise(prm Promise) {
  155. p.promiseIn <- prm
  156. }
  157.  
  158. // DeliverClientValue delivers client value cval from to proposer p.
  159. func (p *Proposer) DeliverClientValue(cval Value) {
  160. p.cvalIn <- cval
  161. }
  162.  
  163. // IncrementAllDecidedUpTo increments the Proposer's adu variable by one.
  164. func (p *Proposer) IncrementAllDecidedUpTo() {
  165. p.incDcd <- struct{}{}
  166. }
  167.  
  168. // Internal: handlePromise processes promise prm according to the Multi-Paxos
  169. // algorithm. If handling the promise results in proposer p emitting a
  170. // corresponding accept slice, then output will be true and accs contain the
  171. // accept messages. If handlePromise returns false as output, then accs will be
  172. // a nil slice. See the Lab 5 text for a more complete specification.
  173. func (p *Proposer) handlePromise(prm Promise) (accs []Accept, output bool) {
  174. // TODO(student)
  175. if prm.Rnd != p.crnd {
  176. return []Accept{
  177. {From: -1, Slot: -1, Rnd: -2},
  178. }, false
  179. }
  180.  
  181. // Avoids duplicates
  182. if p.promises[prm.From] != nil {
  183. return []Accept{
  184. {From: -1, Slot: -1, Rnd: -2},
  185. }, false
  186. }
  187. p.promises[prm.From] = &prm
  188. p.promiseCount++
  189.  
  190. /* Initialize slots out for new slots */
  191. if p.slotsOut == nil {
  192. p.initializeSlotsOut(prm.Slots)
  193. }
  194.  
  195. /* Handle new slot values */
  196. for _, slot := range prm.Slots {
  197. if slot.ID > p.max {
  198. p.slotsOutExpand(slot.ID)
  199. }
  200. /* Only write slot if vrnd is greater than previous vrnd */
  201. if slot.Vrnd >= p.slotsOut[slot.ID-p.min].Vrnd && slot.ID > p.adu {
  202. p.slotsOut[slot.ID-p.min] = slot
  203. }
  204. }
  205.  
  206. /* If we got quorum */
  207. if p.promiseCount >= p.quorum {
  208. if p.phaseOneProgressTicker != nil {
  209. p.phaseOneProgressTicker.Stop()
  210. }
  211.  
  212. accs := make([]Accept, 0)
  213. for idx, slot := range p.slotsOut {
  214. if slot == (PromiseSlot{}) {
  215. if len(accs) > 0 {
  216. accs = append(accs, Accept{From: p.id, Slot: p.min + SlotID(idx), Rnd: prm.Rnd, Val: Value{Noop: true}})
  217. }
  218. } else {
  219. accs = append(accs, Accept{From: p.id, Slot: p.min + SlotID(idx), Rnd: prm.Rnd, Val: slot.Vval})
  220. }
  221. }
  222. return accs, true
  223. }
  224.  
  225. return []Accept{
  226. {From: -1, Slot: -1, Rnd: -2},
  227. }, false
  228. }
  229.  
  230. func (p *Proposer) initializeSlotsOut(slots []PromiseSlot) {
  231. min := SlotID(-1)
  232. max := SlotID(-1)
  233. for _, s := range slots {
  234. if min == -1 || s.ID < min {
  235. min = s.ID
  236. }
  237. if max == -1 || s.ID > max {
  238. max = s.ID
  239. }
  240. }
  241. p.min = min
  242. p.max = max
  243. p.slotsOut = make([]PromiseSlot, 1+max-min)
  244. }
  245.  
  246. func (p *Proposer) slotsOutExpand(newMax SlotID) {
  247. p.slotsOut = append(p.slotsOut, make([]PromiseSlot, newMax-p.max)...)
  248. p.max = newMax
  249. }
  250.  
  251. // Internal: increaseCrnd increases proposer p's crnd field by the total number
  252. // of Paxos nodes.
  253. func (p *Proposer) increaseCrnd() {
  254. p.crnd = p.crnd + Round(p.n)
  255. }
  256.  
  257. // Internal: startPhaseOne resets all Phase One data, increases the Proposer's
  258. // crnd and sends a new Prepare with Slot as the current adu.
  259. func (p *Proposer) startPhaseOne() {
  260. p.phaseOneDone = false
  261. p.promises = make([]*Promise, p.n)
  262. p.slotsOut = nil
  263. p.increaseCrnd()
  264. p.prepareOut <- Prepare{From: p.id, Slot: p.adu, Crnd: p.crnd}
  265. }
  266.  
  267. // Internal: sendAccept sends an accept from either the accept out queue
  268. // (generated after Phase One) if not empty, or, it generates an accept using a
  269. // value from the client request queue if not empty. It does not send an accept
  270. // if the previous slot has not been decided yet.
  271. func (p *Proposer) sendAccept() {
  272. const alpha = 1
  273. if !(p.nextSlot <= p.adu+alpha) {
  274. // We must wait for the next slot to be decided before we can
  275. // send an accept.
  276. //
  277. // For Lab 6: Alpha has a value of one here. If you later
  278. // implement pipelining then alpha should be extracted to a
  279. // proposer variable (alpha) and this function should have an
  280. // outer for loop.
  281. return
  282. }
  283.  
  284. // Pri 1: If bounded by any accepts from Phase One -> send previously
  285. // generated accept and return.
  286. if p.acceptsOut.Len() > 0 {
  287. acc := p.acceptsOut.Front().Value.(Accept)
  288. p.acceptsOut.Remove(p.acceptsOut.Front())
  289. p.acceptOut <- acc
  290. p.nextSlot++
  291. return
  292. }
  293.  
  294. // Pri 2: If any client request in queue -> generate and send
  295. // accept.
  296. if p.requestsIn.Len() > 0 {
  297. cval := p.requestsIn.Front().Value.(Value)
  298. p.requestsIn.Remove(p.requestsIn.Front())
  299. acc := Accept{
  300. From: p.id,
  301. Slot: p.nextSlot,
  302. Rnd: p.crnd,
  303. Val: cval,
  304. }
  305. p.nextSlot++
  306. p.acceptOut <- acc
  307. }
  308. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement