Advertisement
jkbgm

Untitled

May 8th, 2018
86
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.03 KB | None | 0 0
  1. go func() {
  2. spOutputs := &mgo.Collection{}
  3. spend := &mgo.Collection{}
  4. switch networtkID {
  5. case currencies.Main:
  6. spOutputs = spendableOutputs
  7. spend = spentOutputs
  8. case currencies.Test:
  9. spOutputs = spendableOutputsTest
  10. spend = spentOutputsTest
  11. default:
  12. log.Errorf("setGRPCHandlers: wrong networkID:")
  13. }
  14.  
  15. stream, err := cli.ResyncAddress(context.Background(), &pb.Empty{})
  16. if err != nil {
  17. log.Errorf("setGRPCHandlers: cli.EventGetAllMempool: %s", err.Error())
  18. }
  19.  
  20. for {
  21. rTxs, err := stream.Recv()
  22. if err == io.EOF {
  23. break
  24. }
  25. if err != nil {
  26. log.Errorf("initGrpcClient: cli.NewTx:stream.Recv: %s", err.Error())
  27. }
  28.  
  29. // tx history
  30. for _, gTx := range rTxs.Txs {
  31. tx := generatedTxDataToStore(gTx)
  32. setExchangeRates(&tx, gTx.Resync, tx.MempoolTime)
  33. setUserID(&tx)
  34. user := store.User{}
  35. // set wallet index and address index in input
  36. for i := 0; i < len(tx.WalletsInput); i++ {
  37. sel := bson.M{"wallets.addresses.address": tx.WalletsInput[i].Address.Address}
  38. err := usersData.Find(sel).One(&user)
  39. if err == mgo.ErrNotFound {
  40. continue
  41. } else if err != nil && err != mgo.ErrNotFound {
  42. log.Errorf("initGrpcClient: cli.On newIncomingTx: %s", err)
  43. }
  44.  
  45. for _, wallet := range user.Wallets {
  46. for _, addr := range wallet.Adresses {
  47. if addr.Address == tx.WalletsInput[i].Address.Address {
  48. tx.WalletsInput[i].WalletIndex = wallet.WalletIndex
  49. tx.WalletsInput[i].Address.AddressIndex = addr.AddressIndex
  50. }
  51. }
  52. }
  53. }
  54.  
  55. // set wallet index and address index in output
  56. for i := 0; i < len(tx.WalletsOutput); i++ {
  57. sel := bson.M{"wallets.addresses.address": tx.WalletsOutput[i].Address.Address}
  58. err := usersData.Find(sel).One(&user)
  59. if err == mgo.ErrNotFound {
  60. continue
  61. } else if err != nil && err != mgo.ErrNotFound {
  62. log.Errorf("initGrpcClient: cli.On newIncomingTx: %s", err)
  63. }
  64.  
  65. for _, wallet := range user.Wallets {
  66. for _, addr := range wallet.Adresses {
  67. if addr.Address == tx.WalletsOutput[i].Address.Address {
  68. tx.WalletsOutput[i].WalletIndex = wallet.WalletIndex
  69. tx.WalletsOutput[i].Address.AddressIndex = addr.AddressIndex
  70. }
  71. }
  72. }
  73. }
  74. err = saveMultyTransaction(tx, networtkID, gTx.Resync)
  75. if err != nil {
  76. log.Errorf("initGrpcClient: saveMultyTransaction: %s", err)
  77. }
  78. updateWalletAndAddressDate(tx, networtkID)
  79. }
  80.  
  81. // sp outs
  82. for _, gSpOut := range rTxs.SpOuts {
  83. query := bson.M{"userid": gSpOut.UserID, "txid": gSpOut.TxID, "address": gSpOut.Address}
  84. err = spend.Find(query).One(nil)
  85.  
  86. if err == mgo.ErrNotFound {
  87. user := store.User{}
  88. sel := bson.M{"wallets.addresses.address": gSpOut.Address}
  89. err = usersData.Find(sel).One(&user)
  90. if err != nil && err != mgo.ErrNotFound {
  91. log.Errorf("SetWsHandlers: cli.On newIncomingTx: %s", err)
  92. return
  93. }
  94. spOut := generatedSpOutsToStore(gSpOut)
  95.  
  96. log.Infof("Add spendable output : %v", gSpOut.String())
  97.  
  98. exRates, err := GetLatestExchangeRate()
  99. if err != nil {
  100. log.Errorf("initGrpcClient: GetLatestExchangeRate: %s", err.Error())
  101. }
  102. spOut.StockExchangeRate = exRates
  103.  
  104. query := bson.M{"userid": spOut.UserID, "txid": spOut.TxID, "address": spOut.Address}
  105. err = spOutputs.Find(query).One(nil)
  106. if err == mgo.ErrNotFound {
  107. //insertion
  108. err := spOutputs.Insert(spOut)
  109. if err != nil {
  110. log.Errorf("Create spOutputs:txsData.Insert: %s", err.Error())
  111. }
  112. continue
  113. }
  114. if err != nil && err != mgo.ErrNotFound {
  115. log.Errorf("Create spOutputs:spOutputs.Find %s", err.Error())
  116. continue
  117. }
  118.  
  119. update := bson.M{
  120. "$set": bson.M{
  121. "txstatus": spOut.TxStatus,
  122. },
  123. }
  124. err = spOutputs.Update(query, update)
  125. if err != nil {
  126. log.Errorf("CreateSpendableOutputs:spendableOutputs.Update: %s", err.Error())
  127. }
  128. }
  129. }
  130.  
  131. // del sp outs
  132. for _, del := range rTxs.SpOutDelete {
  133. i := 0
  134. for {
  135. //insert to spend collection
  136. err := spend.Insert(del)
  137. if err != nil {
  138. log.Errorf("DeleteSpendableOutputs:spend.Insert: %s", err)
  139. }
  140.  
  141. query := bson.M{"userid": del.UserID, "txid": del.TxID, "address": del.Address}
  142. log.Infof("-------- query delete %v\n", query)
  143. err = spOutputs.Remove(query)
  144. if err != nil {
  145. log.Errorf("DeleteSpendableOutputs:spendableOutputs.Remove: %s", err.Error())
  146. } else {
  147. log.Infof("delete success √: %v", query)
  148. break
  149. }
  150. i++
  151. if i == 10 {
  152. break
  153. }
  154. time.Sleep(time.Second * 3)
  155. }
  156. log.Debugf("DeleteSpendableOutputs:spendableOutputs.Remove: %s", err)
  157.  
  158. }
  159.  
  160. if len(rTxs.Txs) > 0 {
  161.  
  162. resyncM.Lock()
  163. re := *resync
  164. delete(re, rTxs.Txs[0].TxAddress[0])
  165. // re[rTxs.SpOuts[0].Address] = false
  166. *resync = re
  167. resyncM.Unlock()
  168. }
  169.  
  170. }
  171.  
  172. }()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement