Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- go func() {
- spOutputs := &mgo.Collection{}
- spend := &mgo.Collection{}
- switch networtkID {
- case currencies.Main:
- spOutputs = spendableOutputs
- spend = spentOutputs
- case currencies.Test:
- spOutputs = spendableOutputsTest
- spend = spentOutputsTest
- default:
- log.Errorf("setGRPCHandlers: wrong networkID:")
- }
- stream, err := cli.ResyncAddress(context.Background(), &pb.Empty{})
- if err != nil {
- log.Errorf("setGRPCHandlers: cli.EventGetAllMempool: %s", err.Error())
- }
- for {
- rTxs, err := stream.Recv()
- if err == io.EOF {
- break
- }
- if err != nil {
- log.Errorf("initGrpcClient: cli.NewTx:stream.Recv: %s", err.Error())
- }
- // tx history
- for _, gTx := range rTxs.Txs {
- tx := generatedTxDataToStore(gTx)
- setExchangeRates(&tx, gTx.Resync, tx.MempoolTime)
- setUserID(&tx)
- user := store.User{}
- // set wallet index and address index in input
- for i := 0; i < len(tx.WalletsInput); i++ {
- sel := bson.M{"wallets.addresses.address": tx.WalletsInput[i].Address.Address}
- err := usersData.Find(sel).One(&user)
- if err == mgo.ErrNotFound {
- continue
- } else if err != nil && err != mgo.ErrNotFound {
- log.Errorf("initGrpcClient: cli.On newIncomingTx: %s", err)
- }
- for _, wallet := range user.Wallets {
- for _, addr := range wallet.Adresses {
- if addr.Address == tx.WalletsInput[i].Address.Address {
- tx.WalletsInput[i].WalletIndex = wallet.WalletIndex
- tx.WalletsInput[i].Address.AddressIndex = addr.AddressIndex
- }
- }
- }
- }
- // set wallet index and address index in output
- for i := 0; i < len(tx.WalletsOutput); i++ {
- sel := bson.M{"wallets.addresses.address": tx.WalletsOutput[i].Address.Address}
- err := usersData.Find(sel).One(&user)
- if err == mgo.ErrNotFound {
- continue
- } else if err != nil && err != mgo.ErrNotFound {
- log.Errorf("initGrpcClient: cli.On newIncomingTx: %s", err)
- }
- for _, wallet := range user.Wallets {
- for _, addr := range wallet.Adresses {
- if addr.Address == tx.WalletsOutput[i].Address.Address {
- tx.WalletsOutput[i].WalletIndex = wallet.WalletIndex
- tx.WalletsOutput[i].Address.AddressIndex = addr.AddressIndex
- }
- }
- }
- }
- err = saveMultyTransaction(tx, networtkID, gTx.Resync)
- if err != nil {
- log.Errorf("initGrpcClient: saveMultyTransaction: %s", err)
- }
- updateWalletAndAddressDate(tx, networtkID)
- }
- // sp outs
- for _, gSpOut := range rTxs.SpOuts {
- query := bson.M{"userid": gSpOut.UserID, "txid": gSpOut.TxID, "address": gSpOut.Address}
- err = spend.Find(query).One(nil)
- if err == mgo.ErrNotFound {
- user := store.User{}
- sel := bson.M{"wallets.addresses.address": gSpOut.Address}
- err = usersData.Find(sel).One(&user)
- if err != nil && err != mgo.ErrNotFound {
- log.Errorf("SetWsHandlers: cli.On newIncomingTx: %s", err)
- return
- }
- spOut := generatedSpOutsToStore(gSpOut)
- log.Infof("Add spendable output : %v", gSpOut.String())
- exRates, err := GetLatestExchangeRate()
- if err != nil {
- log.Errorf("initGrpcClient: GetLatestExchangeRate: %s", err.Error())
- }
- spOut.StockExchangeRate = exRates
- query := bson.M{"userid": spOut.UserID, "txid": spOut.TxID, "address": spOut.Address}
- err = spOutputs.Find(query).One(nil)
- if err == mgo.ErrNotFound {
- //insertion
- err := spOutputs.Insert(spOut)
- if err != nil {
- log.Errorf("Create spOutputs:txsData.Insert: %s", err.Error())
- }
- continue
- }
- if err != nil && err != mgo.ErrNotFound {
- log.Errorf("Create spOutputs:spOutputs.Find %s", err.Error())
- continue
- }
- update := bson.M{
- "$set": bson.M{
- "txstatus": spOut.TxStatus,
- },
- }
- err = spOutputs.Update(query, update)
- if err != nil {
- log.Errorf("CreateSpendableOutputs:spendableOutputs.Update: %s", err.Error())
- }
- }
- }
- // del sp outs
- for _, del := range rTxs.SpOutDelete {
- i := 0
- for {
- //insert to spend collection
- err := spend.Insert(del)
- if err != nil {
- log.Errorf("DeleteSpendableOutputs:spend.Insert: %s", err)
- }
- query := bson.M{"userid": del.UserID, "txid": del.TxID, "address": del.Address}
- log.Infof("-------- query delete %v\n", query)
- err = spOutputs.Remove(query)
- if err != nil {
- log.Errorf("DeleteSpendableOutputs:spendableOutputs.Remove: %s", err.Error())
- } else {
- log.Infof("delete success √: %v", query)
- break
- }
- i++
- if i == 10 {
- break
- }
- time.Sleep(time.Second * 3)
- }
- log.Debugf("DeleteSpendableOutputs:spendableOutputs.Remove: %s", err)
- }
- if len(rTxs.Txs) > 0 {
- resyncM.Lock()
- re := *resync
- delete(re, rTxs.Txs[0].TxAddress[0])
- // re[rTxs.SpOuts[0].Address] = false
- *resync = re
- resyncM.Unlock()
- }
- }
- }()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement