Advertisement
Guest User

Untitled

a guest
Nov 14th, 2018
88
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 8.35 KB | None | 0 0
  1. package payouts
  2.  
  3. import (
  4. "fmt"
  5. "log"
  6. "math/big"
  7. "os"
  8. "strconv"
  9. "time"
  10. "sync"
  11.  
  12. "github.com/ethereum/go-ethereum/common/hexutil"
  13.  
  14. "github.com/TeamEGEM/open-egem-pool/rpc"
  15. "github.com/TeamEGEM/open-egem-pool/storage"
  16. "github.com/TeamEGEM/open-egem-pool/util"
  17. )
  18.  
  19. const txCheckInterval = 5 * time.Second
  20.  
  21. type PayoutsConfig struct {
  22. Enabled bool `json:"enabled"`
  23. RequirePeers int64 `json:"requirePeers"`
  24. Interval string `json:"interval"`
  25. Daemon string `json:"daemon"`
  26. Timeout string `json:"timeout"`
  27. Address string `json:"address"`
  28. Gas string `json:"gas"`
  29. GasPrice string `json:"gasPrice"`
  30. AutoGas bool `json:"autoGas"`
  31. // In Shannon
  32. Threshold int64 `json:"threshold"`
  33. BgSave bool `json:"bgsave"`
  34. ConcurrentTx int `json:"concurrentTx"`
  35. }
  36.  
  37. func (self PayoutsConfig) GasHex() string {
  38. x := util.String2Big(self.Gas)
  39. return hexutil.EncodeBig(x)
  40. }
  41.  
  42. func (self PayoutsConfig) GasPriceHex() string {
  43. x := util.String2Big(self.GasPrice)
  44. return hexutil.EncodeBig(x)
  45. }
  46.  
  47. type PayoutsProcessor struct {
  48. config *PayoutsConfig
  49. backend *storage.RedisClient
  50. rpc *rpc.RPCClient
  51. halt bool
  52. lastFail error
  53. }
  54.  
  55. func NewPayoutsProcessor(cfg *PayoutsConfig, backend *storage.RedisClient) *PayoutsProcessor {
  56. u := &PayoutsProcessor{config: cfg, backend: backend}
  57. u.rpc = rpc.NewRPCClient("PayoutsProcessor", cfg.Daemon, cfg.Timeout)
  58. return u
  59. }
  60.  
  61. func (u *PayoutsProcessor) Start() {
  62. log.Println("Starting payouts")
  63.  
  64. if u.mustResolvePayout() {
  65. log.Println("Running with env RESOLVE_PAYOUT=1, now trying to resolve locked payouts")
  66. u.resolvePayouts()
  67. log.Println("Now you have to restart payouts module with RESOLVE_PAYOUT=0 for normal run")
  68. return
  69. }
  70.  
  71. intv := util.MustParseDuration(u.config.Interval)
  72. timer := time.NewTimer(intv)
  73. log.Printf("Set payouts interval to %v", intv)
  74.  
  75. payments := u.backend.GetPendingPayments()
  76. if len(payments) > 0 {
  77. log.Printf("Previous payout failed, you have to resolve it. List of failed payments:\n %v",
  78. formatPendingPayments(payments))
  79. return
  80. }
  81.  
  82. locked, err := u.backend.IsPayoutsLocked()
  83. if err != nil {
  84. log.Println("Unable to start payouts:", err)
  85. return
  86. }
  87. if locked {
  88. log.Println("Unable to start payouts because they are locked")
  89. return
  90. }
  91.  
  92. // Immediately process payouts after start
  93. u.process()
  94. timer.Reset(intv)
  95.  
  96. go func() {
  97. for {
  98. select {
  99. case <-timer.C:
  100. u.process()
  101. timer.Reset(intv)
  102. }
  103. }
  104. }()
  105. }
  106.  
  107. func (u *PayoutsProcessor) process() {
  108. if u.halt {
  109. log.Println("Payments suspended due to last critical error:", u.lastFail)
  110. os.Exit(1)
  111. return
  112. }
  113. mustPay := 0
  114. minersPaid := 0
  115. totalAmount := big.NewInt(0)
  116. payees, err := u.backend.GetPayees()
  117. if err != nil {
  118. log.Println("Error while retrieving payees from backend:", err)
  119. return
  120. }
  121.  
  122. waitingCount := 0
  123. var wg sync.WaitGroup
  124.  
  125. for _, login := range payees {
  126. amount, _ := u.backend.GetBalance(login)
  127. amountInShannon := big.NewInt(amount)
  128.  
  129. // Shannon^2 = Wei
  130. amountInWei := new(big.Int).Mul(amountInShannon, util.Shannon)
  131.  
  132. if !u.reachedThreshold(amountInShannon) {
  133. continue
  134. }
  135. mustPay++
  136.  
  137. // Require active peers before processing
  138. if !u.checkPeers() {
  139. break
  140. }
  141. // Require unlocked account
  142. if !u.isUnlockedAccount() {
  143. break
  144. }
  145.  
  146. // Check if we have enough funds
  147. poolBalance, err := u.rpc.GetBalance(u.config.Address)
  148. if err != nil {
  149. u.halt = true
  150. u.lastFail = err
  151. break
  152. }
  153. if poolBalance.Cmp(amountInWei) < 0 {
  154. err := fmt.Errorf("Not enough balance for payment, need %s Wei, pool has %s Wei",
  155. amountInWei.String(), poolBalance.String())
  156. u.halt = true
  157. u.lastFail = err
  158. break
  159. }
  160.  
  161. // Lock payments for current payout
  162. err = u.backend.LockPayouts(login, amount)
  163. if err != nil {
  164. log.Printf("Failed to lock payment for %s: %v", login, err)
  165. u.halt = true
  166. u.lastFail = err
  167. break
  168. }
  169. log.Printf("Locked payment for %s, %v Shannon", login, amount)
  170.  
  171. // Debit miner's balance and update stats
  172. err = u.backend.UpdateBalance(login, amount)
  173. if err != nil {
  174. log.Printf("Failed to update balance for %s, %v Shannon: %v", login, amount, err)
  175. u.halt = true
  176. u.lastFail = err
  177. break
  178. }
  179.  
  180. value := hexutil.EncodeBig(amountInWei)
  181. txHash, err := u.rpc.SendTransaction(u.config.Address, login, u.config.GasHex(), u.config.GasPriceHex(), value, u.config.AutoGas)
  182. if err != nil {
  183. log.Printf("Failed to send payment to %s, %v Shannon: %v. Check outgoing tx for %s in block explorer and docs/PAYOUTS.md",
  184. login, amount, err, login)
  185. u.halt = true
  186. u.lastFail = err
  187. break
  188. }
  189.  
  190. // Log transaction hash
  191. err = u.backend.WritePayment(login, txHash, amount)
  192. if err != nil {
  193. log.Printf("Failed to log payment data for %s, %v Shannon, tx: %s: %v", login, amount, txHash, err)
  194. u.halt = true
  195. u.lastFail = err
  196. break
  197. }
  198.  
  199. minersPaid++
  200. totalAmount.Add(totalAmount, big.NewInt(amount))
  201. log.Printf("Paid %v Shannon to %v, TxHash: %v", amount, login, txHash)
  202.  
  203. wg.Add(1)
  204. waitingCount++
  205. go func(txHash string, login string, wg *sync.WaitGroup) {
  206. // Wait for TX confirmation before further payouts
  207. for {
  208. log.Printf("Waiting for tx confirmation: %v", txHash)
  209. time.Sleep(txCheckInterval)
  210. receipt, err := u.rpc.GetTxReceipt(txHash)
  211. if err != nil {
  212. log.Printf("Failed to get tx receipt for %v: %v", txHash, err)
  213. continue
  214. }
  215. // Tx has been mined
  216. if receipt != nil && receipt.Confirmed() {
  217. if receipt.Successful() {
  218. log.Printf("Payout tx successful for %s: %s", login, txHash)
  219. } else {
  220. log.Printf("Payout tx failed for %s: %s. Address contract throws on incoming tx.", login, txHash)
  221. }
  222. break
  223. }
  224. }
  225. wg.Done()
  226. }(txHash, login, &wg)
  227.  
  228. if waitingCount > u.config.ConcurrentTx {
  229. wg.Wait()
  230. waitingCount = 0
  231. }
  232. }
  233.  
  234. wg.Wait()
  235. waitingCount = 0
  236.  
  237. if mustPay > 0 {
  238. log.Printf("Paid total %v Shannon to %v of %v payees", totalAmount, minersPaid, mustPay)
  239. } else {
  240. log.Println("No payees that have reached payout threshold")
  241. }
  242.  
  243. // Save redis state to disk
  244. if minersPaid > 0 && u.config.BgSave {
  245. u.bgSave()
  246. }
  247. }
  248.  
  249. func (self PayoutsProcessor) isUnlockedAccount() bool {
  250. _, err := self.rpc.Sign(self.config.Address, "0x0")
  251. if err != nil {
  252. log.Println("Unable to process payouts:", err)
  253. return false
  254. }
  255. return true
  256. }
  257.  
  258. func (self PayoutsProcessor) checkPeers() bool {
  259. n, err := self.rpc.GetPeerCount()
  260. if err != nil {
  261. log.Println("Unable to start payouts, failed to retrieve number of peers from node:", err)
  262. return false
  263. }
  264. if n < self.config.RequirePeers {
  265. log.Println("Unable to start payouts, number of peers on a node is less than required", self.config.RequirePeers)
  266. return false
  267. }
  268. return true
  269. }
  270.  
  271. func (self PayoutsProcessor) reachedThreshold(amount *big.Int) bool {
  272. return big.NewInt(self.config.Threshold).Cmp(amount) < 0
  273. }
  274.  
  275. func formatPendingPayments(list []*storage.PendingPayment) string {
  276. var s string
  277. for _, v := range list {
  278. s += fmt.Sprintf("\tAddress: %s, Amount: %v Shannon, %v\n", v.Address, v.Amount, time.Unix(v.Timestamp, 0))
  279. }
  280. return s
  281. }
  282.  
  283. func (self PayoutsProcessor) bgSave() {
  284. result, err := self.backend.BgSave()
  285. if err != nil {
  286. log.Println("Failed to perform BGSAVE on backend:", err)
  287. return
  288. }
  289. log.Println("Saving backend state to disk:", result)
  290. }
  291.  
  292. func (self PayoutsProcessor) resolvePayouts() {
  293. payments := self.backend.GetPendingPayments()
  294.  
  295. if len(payments) > 0 {
  296. log.Printf("Will credit back following balances:\n%s", formatPendingPayments(payments))
  297.  
  298. for _, v := range payments {
  299. err := self.backend.RollbackBalance(v.Address, v.Amount)
  300. if err != nil {
  301. log.Printf("Failed to credit %v Shannon back to %s, error is: %v", v.Amount, v.Address, err)
  302. return
  303. }
  304. log.Printf("Credited %v Shannon back to %s", v.Amount, v.Address)
  305. }
  306. err := self.backend.UnlockPayouts()
  307. if err != nil {
  308. log.Println("Failed to unlock payouts:", err)
  309. return
  310. }
  311. } else {
  312. log.Println("No pending payments to resolve")
  313. }
  314.  
  315. if self.config.BgSave {
  316. self.bgSave()
  317. }
  318. log.Println("Payouts unlocked")
  319. }
  320.  
  321. func (self PayoutsProcessor) mustResolvePayout() bool {
  322. v, _ := strconv.ParseBool(os.Getenv("RESOLVE_PAYOUT"))
  323. return v
  324. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement