Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- diff -Nurp src/bitcoinrpc.cpp src2/bitcoinrpc.cpp
- --- src/bitcoinrpc.cpp 2012-04-27 05:20:53.091660464 -0700
- +++ src2/bitcoinrpc.cpp 2012-04-27 06:44:35.193529044 -0700
- @@ -3,6 +3,16 @@
- // Distributed under the MIT/X11 software license, see the accompanying
- // file license.txt or http://www.opensource.org/licenses/mit-license.php.
- +// CAUTION: This is not the offical bitcoinrpc.cpp from the official
- +// bitcoin distribution. It has been modified by me
- +// <davidjoelschwartz@gmail.com> to support multi-threaded RPC.
- +// This is quick and dirty code, it may not work for you. No warranties
- +// are expressed or implied. I made a best effort to improve the RPC
- +// performance. This notification is for blame, not for credit and
- +// may be removed if this change, or one similar, is accepted into the
- +// main distribution. If this has helped you, please donate to:
- +// 1H3STBxuzEHZQQD4hkjVE22TWTazcZzeBw
- +
- #include "headers.h"
- #include "db.h"
- #include "net.h"
- @@ -20,6 +30,7 @@
- #include <boost/filesystem/fstream.hpp>
- typedef boost::asio::ssl::stream<boost::asio::ip::tcp::socket> SSLStream;
- #endif
- +#define BOOST_SPIRIT_THREADSAFE
- #include "json/json_spirit_reader_template.h"
- #include "json/json_spirit_writer_template.h"
- #include "json/json_spirit_utils.h"
- @@ -46,6 +57,8 @@ static CCriticalSection cs_nWalletUnlock
- extern Value dumpprivkey(const Array& params, bool fHelp);
- extern Value importprivkey(const Array& params, bool fHelp);
- +void ThreadRPCServer3(void* parg);
- +
- Object JSONRPCError(int code, const string& message)
- {
- Object error;
- @@ -230,6 +243,15 @@ Value stop(const Array& params, bool fHe
- #endif
- }
- +Value sendpollpidsignal(const Array& params, bool fHelp)
- +{
- + if (fHelp)
- + throw runtime_error(
- + "sendpollpidsignal\n"
- + "Returns number of successful signals sent to polling pids.");
- +
- + return SendPollPidSignal(SIGUSR1);
- +}
- Value getblockcount(const Array& params, bool fHelp)
- {
- @@ -2045,6 +2067,7 @@ pair<string, rpcfn_type> pCallTable[] =
- make_pair("listaccounts", &listaccounts),
- make_pair("settxfee", &settxfee),
- make_pair("getmemorypool", &getmemorypool),
- + make_pair("sendpollpidsignal", &sendpollpidsignal),
- make_pair("listsinceblock", &listsinceblock),
- make_pair("dumpprivkey", &dumpprivkey),
- make_pair("importprivkey", &importprivkey)
- @@ -2118,7 +2141,7 @@ string rfc1123Time()
- return string(buffer);
- }
- -static string HTTPReply(int nStatus, const string& strMsg)
- +static string HTTPReply(int nStatus, const string& strMsg, bool keepalive)
- {
- if (nStatus == 401)
- return strprintf("HTTP/1.0 401 Authorization Required\r\n"
- @@ -2147,7 +2170,7 @@ static string HTTPReply(int nStatus, con
- return strprintf(
- "HTTP/1.1 %d %s\r\n"
- "Date: %s\r\n"
- - "Connection: close\r\n"
- + "Connection: %s\r\n"
- "Content-Length: %d\r\n"
- "Content-Type: application/json\r\n"
- "Server: bitcoin-json-rpc/%s\r\n"
- @@ -2156,12 +2179,13 @@ static string HTTPReply(int nStatus, con
- nStatus,
- cStatus,
- rfc1123Time().c_str(),
- + keepalive ? "keep-alive" : "close",
- strMsg.size(),
- FormatFullVersion().c_str(),
- strMsg.c_str());
- }
- -int ReadHTTPStatus(std::basic_istream<char>& stream)
- +int ReadHTTPStatus(std::basic_istream<char>& stream, int &proto)
- {
- string str;
- getline(stream, str);
- @@ -2169,6 +2193,10 @@ int ReadHTTPStatus(std::basic_istream<ch
- boost::split(vWords, str, boost::is_any_of(" "));
- if (vWords.size() < 2)
- return 500;
- + proto = 0;
- + const char *ver = strstr(str.c_str(), "HTTP/1.");
- + if (ver != NULL)
- + proto = atoi(ver+7);
- return atoi(vWords[1].c_str());
- }
- @@ -2203,7 +2231,8 @@ int ReadHTTP(std::basic_istream<char>& s
- strMessageRet = "";
- // Read status
- - int nStatus = ReadHTTPStatus(stream);
- + int nProto;
- + int nStatus = ReadHTTPStatus(stream, nProto);
- // Read header
- int nLen = ReadHTTPHeader(stream, mapHeadersRet);
- @@ -2218,6 +2247,16 @@ int ReadHTTP(std::basic_istream<char>& s
- strMessageRet = string(vch.begin(), vch.end());
- }
- + string sConHdr=mapHeadersRet["connection"];
- +
- + if ( (sConHdr != "close") && (sConHdr != "keep-alive") )
- + {
- + if(nProto >= 1)
- + mapHeadersRet["connection"]="keep-alive";
- + else
- + mapHeadersRet["connection"]="close";
- + }
- +
- return nStatus;
- }
- @@ -2270,7 +2309,7 @@ void ErrorReply(std::ostream& stream, co
- if (code == -32600) nStatus = 400;
- else if (code == -32601) nStatus = 404;
- string strReply = JSONRPCReply(Value::null, objError, id);
- - stream << HTTPReply(nStatus, strReply) << std::flush;
- + stream << HTTPReply(nStatus, strReply, false) << std::flush;
- }
- bool ClientAllowed(const string& strAddress)
- @@ -2338,6 +2377,28 @@ private:
- };
- #endif
- +class AcceptedConnection
- +{
- + public:
- +#ifdef USE_SSL
- + SSLStream sslStream;
- + SSLIOStreamDevice d;
- + iostreams::stream<SSLIOStreamDevice> stream;
- +#else
- + ip::tcp::iostream stream;
- +#endif
- +
- + ip::tcp::endpoint peer;
- +
- +#ifdef USE_SSL
- + AcceptedConnection(asio::io_service &io_service, ssl::context &context,
- + bool fUseSSL) : sslStream(io_service, context), d(sslStream, fUseSSL),
- + stream(d) { ; }
- +#else
- + AcceptedConnection(void) { ; }
- +#endif
- +};
- +
- void ThreadRPCServer(void* parg)
- {
- IMPLEMENT_RANDOMIZE_STACK(ThreadRPCServer(parg));
- @@ -2361,8 +2422,8 @@ void ThreadRPCServer2(void* parg)
- {
- printf("ThreadRPCServer started\n");
- - strRPCUserColonPass = mapArgs["-rpcuser"] + ":" + mapArgs["-rpcpassword"];
- - if (mapArgs["-rpcpassword"] == "")
- + strRPCUserColonPass = strRPCUser + ":" + strRPCPass;
- + if (strRPCPass == "")
- {
- unsigned char rand_pwd[32];
- RAND_bytes(rand_pwd, 32);
- @@ -2423,62 +2484,78 @@ void ThreadRPCServer2(void* parg)
- {
- // Accept connection
- #ifdef USE_SSL
- - SSLStream sslStream(io_service, context);
- - SSLIOStreamDevice d(sslStream, fUseSSL);
- - iostreams::stream<SSLIOStreamDevice> stream(d);
- + AcceptedConnection *conn=new AcceptedConnection(io_service, context, fUseSSL);
- #else
- - ip::tcp::iostream stream;
- + AcceptedConnection *conn=new AcceptedConnection();
- #endif
- - ip::tcp::endpoint peer;
- vnThreadsRunning[THREAD_RPCSERVER]--;
- #ifdef USE_SSL
- - acceptor.accept(sslStream.lowest_layer(), peer);
- + acceptor.accept(conn->sslStream.lowest_layer(), conn->peer);
- #else
- - acceptor.accept(*stream.rdbuf(), peer);
- + acceptor.accept(*conn->stream.rdbuf(), conn->peer);
- #endif
- - vnThreadsRunning[4]++;
- + vnThreadsRunning[THREAD_RPCSERVER]++;
- +
- if (fShutdown)
- + {
- + delete conn;
- return;
- + }
- // Restrict callers by IP
- - if (!ClientAllowed(peer.address().to_string()))
- + if (!ClientAllowed(conn->peer.address().to_string()))
- {
- // Only send a 403 if we're not using SSL to prevent a DoS during the SSL handshake.
- if (!fUseSSL)
- - stream << HTTPReply(403, "") << std::flush;
- - continue;
- + conn->stream << HTTPReply(403, "", false) << std::flush;
- + delete conn;
- + }
- + else
- + CreateThread(ThreadRPCServer3, (void *) conn);
- + }
- +}
- +void ThreadRPCServer3(void* parg)
- +{
- + IMPLEMENT_RANDOMIZE_STACK(ThreadRPCServer3(parg));
- + vnThreadsRunning[THREAD_RPCSERVER0]++;
- + AcceptedConnection *conn=(AcceptedConnection *) parg;
- +
- + bool fRun = true;
- + loop
- + {
- + if (fShutdown || !fRun)
- + {
- + conn->stream.close();
- + delete conn;
- + vnThreadsRunning[THREAD_RPCSERVER0]--;
- + return;
- }
- map<string, string> mapHeaders;
- string strRequest;
- -
- - boost::thread api_caller(ReadHTTP, boost::ref(stream), boost::ref(mapHeaders), boost::ref(strRequest));
- - if (!api_caller.timed_join(boost::posix_time::seconds(GetArg("-rpctimeout", 30))))
- - { // Timed out:
- - acceptor.cancel();
- - printf("ThreadRPCServer ReadHTTP timeout\n");
- - continue;
- - }
- + ReadHTTP(conn->stream, mapHeaders, strRequest);
- // Check authorization
- if (mapHeaders.count("authorization") == 0)
- {
- - stream << HTTPReply(401, "") << std::flush;
- - continue;
- + conn->stream << HTTPReply(401, "", false) << std::flush;
- + break;
- }
- if (!HTTPAuthorized(mapHeaders))
- {
- - printf("ThreadRPCServer incorrect password attempt from %s\n",peer.address().to_string().c_str());
- + printf("ThreadRPCServer incorrect password attempt from %s\n",conn->peer.address().to_string().c_str());
- /* Deter brute-forcing short passwords.
- If this results in a DOS the user really
- shouldn't have their RPC port exposed.*/
- if (mapArgs["-rpcpassword"].size() < 20)
- Sleep(250);
- - stream << HTTPReply(401, "") << std::flush;
- - continue;
- + conn->stream << HTTPReply(401, "", false) << std::flush;
- + break;
- }
- + if (mapHeaders["connection"] == "close")
- + fRun = false;
- Value id = Value::null;
- try
- @@ -2532,22 +2609,32 @@ void ThreadRPCServer2(void* parg)
- // Send reply
- string strReply = JSONRPCReply(result, Value::null, id);
- - stream << HTTPReply(200, strReply) << std::flush;
- + conn->stream << HTTPReply(200, strReply, fRun) << std::flush;
- }
- catch (std::exception& e)
- {
- - ErrorReply(stream, JSONRPCError(-1, e.what()), id);
- + ErrorReply(conn->stream, JSONRPCError(-1, e.what()), id);
- + fRun = false;
- + }
- + catch (Object& e)
- + {
- + ErrorReply(conn->stream, e, id);
- + fRun = false;
- }
- }
- catch (Object& objError)
- {
- - ErrorReply(stream, objError, id);
- + ErrorReply(conn->stream, objError, id);
- + break;
- }
- catch (std::exception& e)
- {
- - ErrorReply(stream, JSONRPCError(-32700, e.what()), id);
- + ErrorReply(conn->stream, JSONRPCError(-32700, e.what()), id);
- + break;
- }
- }
- + delete conn;
- + vnThreadsRunning[THREAD_RPCSERVER0]--;
- }
- @@ -2555,7 +2642,7 @@ void ThreadRPCServer2(void* parg)
- Object CallRPC(const string& strMethod, const Array& params)
- {
- - if (mapArgs["-rpcuser"] == "" && mapArgs["-rpcpassword"] == "")
- + if (strRPCUser == "" && strRPCPass == "")
- throw runtime_error(strprintf(
- _("You must set rpcpassword=<password> in the configuration file:\n%s\n"
- "If the file does not exist, create it with owner-readable-only file permissions."),
- @@ -2583,7 +2670,7 @@ Object CallRPC(const string& strMethod,
- // HTTP basic authentication
- - string strUserPass64 = EncodeBase64(mapArgs["-rpcuser"] + ":" + mapArgs["-rpcpassword"]);
- + string strUserPass64 = EncodeBase64(strRPCUser + ":" + strRPCPass);
- map<string, string> mapRequestHeaders;
- mapRequestHeaders["Authorization"] = string("Basic ") + strUserPass64;
- diff -Nurp src/init.cpp src2/init.cpp
- --- src/init.cpp 2012-04-27 05:20:53.091660464 -0700
- +++ src2/init.cpp 2012-04-27 06:44:35.193529044 -0700
- @@ -195,6 +195,7 @@ bool AppInit2(int argc, char* argv[])
- " -bantime=<n> \t " + _("Number of seconds to keep misbehaving peers from reconnecting (default: 86400)") + "\n" +
- " -maxreceivebuffer=<n>\t " + _("Maximum per-connection receive buffer, <n>*1000 bytes (default: 10000)") + "\n" +
- " -maxsendbuffer=<n>\t " + _("Maximum per-connection send buffer, <n>*1000 bytes (default: 10000)") + "\n" +
- + " -hub=<n> \t " + _("Enable hub mode (1-4)\n") +
- #ifdef USE_UPNP
- #if USE_UPNP
- " -upnp \t " + _("Use Universal Plug and Play to map the listening port (default: 1)") + "\n" +
- @@ -221,6 +222,7 @@ bool AppInit2(int argc, char* argv[])
- " -rpcport=<port> \t\t " + _("Listen for JSON-RPC connections on <port> (default: 8332)") + "\n" +
- " -rpcallowip=<ip> \t\t " + _("Allow JSON-RPC connections from specified IP address") + "\n" +
- " -rpcconnect=<ip> \t " + _("Send commands to node running on <ip> (default: 127.0.0.1)") + "\n" +
- + " -pollpidfile=<f> \t " + _("Support long polling\n") +
- " -blocknotify=<cmd> " + _("Execute command when the best block changes (%s in cmd is replaced by block hash)") + "\n" +
- " -upgradewallet \t " + _("Upgrade wallet to latest format") + "\n" +
- " -keypool=<n> \t " + _("Set key pool size to <n> (default: 100)") + "\n" +
- @@ -270,6 +272,9 @@ bool AppInit2(int argc, char* argv[])
- else
- fServer = GetBoolArg("-server");
- + strRPCUser = mapArgs["-rpcuser"];
- + strRPCPass = mapArgs["-rpcpassword"];
- +
- /* force fServer when running without GUI */
- #if !defined(QT_GUI)
- fServer = true;
- diff -Nurp src/main.cpp src2/main.cpp
- --- src/main.cpp 2012-04-27 05:20:53.091660464 -0700
- +++ src2/main.cpp 2012-04-27 06:44:35.193529044 -0700
- @@ -1484,6 +1484,7 @@ bool CBlock::SetBestChainInner(CTxDB& tx
- bool CBlock::SetBestChain(CTxDB& txdb, CBlockIndex* pindexNew)
- {
- uint256 hash = GetHash();
- + bool lp = false;
- txdb.TxnBegin();
- if (pindexGenesisBlock == NULL && hash == hashGenesisBlock)
- @@ -1545,6 +1546,7 @@ bool CBlock::SetBestChain(CTxDB& txdb, C
- bool fIsInitialDownload = IsInitialBlockDownload();
- if (!fIsInitialDownload)
- {
- + lp = true;
- const CBlockLocator locator(pindexNew);
- ::SetBestChain(locator);
- }
- @@ -1558,6 +1560,12 @@ bool CBlock::SetBestChain(CTxDB& txdb, C
- nTransactionsUpdated++;
- printf("SetBestChain: new best=%s height=%d work=%s\n", hashBestChain.ToString().substr(0,20).c_str(), nBestHeight, bnBestChainWork.ToString().c_str());
- + if(lp)
- + {
- + // Support long polling
- + SendPollPidSignal(SIGUSR1);
- + }
- +
- std::string strCmd = GetArg("-blocknotify", "");
- if (!fIsInitialDownload && !strCmd.empty())
- @@ -2013,6 +2021,31 @@ void PrintBlockTree()
- }
- +int SendPollPidSignal(int signal)
- +{
- + string lp_pid;
- + char s[64];
- + int ret = 0;
- +
- + strcpy(s, "-pollpidfile");
- + for(int i=1; i<=2; ++i) {
- + if(i>1) snprintf(s+12, sizeof(s)-12, "%u", i);
- +
- + lp_pid = mapArgs[s];
- + if(lp_pid != "")
- + {
- + FILE *pidFile = fopen(lp_pid.c_str(), "r");
- + if(pidFile!=NULL)
- + {
- + int pid=0;
- + if ((fscanf(pidFile, "%d", &pid) == 1) && (pid > 1))
- + if(-1!=kill((pid_t) pid, signal)) ++ret;
- + fclose(pidFile);
- + }
- + }
- + }
- + return ret;
- +}
- @@ -3457,7 +3490,7 @@ void static BitcoinMiner(CWallet *pwalle
- {
- nLogTime = GetTime();
- printf("%s ", DateTimeStrFormat("%x %H:%M", GetTime()).c_str());
- - printf("hashmeter %3d CPUs %6.0f khash/s\n", vnThreadsRunning[THREAD_MINER], dHashesPerSec/1000.0);
- + printf("hashmeter %3d CPUs %6.0f khash/s\n", (long)vnThreadsRunning[THREAD_MINER], dHashesPerSec/1000.0);
- }
- }
- }
- @@ -3512,7 +3545,7 @@ void static ThreadBitcoinMiner(void* par
- nHPSTimerStart = 0;
- if (vnThreadsRunning[THREAD_MINER] == 0)
- dHashesPerSec = 0;
- - printf("ThreadBitcoinMiner exiting, %d threads remaining\n", vnThreadsRunning[THREAD_MINER]);
- + printf("ThreadBitcoinMiner exiting, %d threads remaining\n", (long)vnThreadsRunning[THREAD_MINER]);
- }
- diff -Nurp src/main.h src2/main.h
- --- src/main.h 2012-04-27 05:20:53.091660464 -0700
- +++ src2/main.h 2012-04-27 06:44:35.193529044 -0700
- @@ -108,6 +108,7 @@ unsigned int ComputeMinWork(unsigned int
- int GetNumBlocksOfPeers();
- bool IsInitialBlockDownload();
- std::string GetWarnings(std::string strFor);
- +int SendPollPidSignal(int signal);
- diff -Nurp src/netbase.cpp src2/netbase.cpp
- --- src/netbase.cpp 2012-04-27 05:20:53.091660464 -0700
- +++ src2/netbase.cpp 2012-04-27 06:44:35.193529044 -0700
- @@ -521,7 +521,7 @@ std::vector<unsigned char> CNetAddr::Get
- std::vector<unsigned char> vchRet;
- int nClass = 0; // 0=IPv6, 1=IPv4, 254=local, 255=unroutable
- int nStartByte = 0;
- - int nBits = 16;
- + int nBits = HubModes[nHubMode][HM_IP4_BITS];
- // all local addresses belong to the same group
- if (IsLocal())
- @@ -559,10 +559,10 @@ std::vector<unsigned char> CNetAddr::Get
- }
- // for he.net, use /36 groups
- else if (GetByte(15) == 0x20 && GetByte(14) == 0x11 && GetByte(13) == 0x04 && GetByte(12) == 0x70)
- - nBits = 36;
- + nBits = HubModes[nHubMode][HM_IP6_BITS]+4;
- // for the rest of the IPv6 network, use /32 groups
- else
- - nBits = 32;
- + nBits = HubModes[nHubMode][HM_IP6_BITS];
- vchRet.push_back(nClass);
- while (nBits >= 8)
- diff -Nurp src/netbase.h src2/netbase.h
- --- src/netbase.h 2012-04-27 05:20:53.091660464 -0700
- +++ src2/netbase.h 2012-04-27 06:44:35.193529044 -0700
- @@ -29,6 +29,16 @@
- extern int nConnectTimeout;
- +enum {
- + HM_MAX_OUTBOUND=0,
- + HM_MAX_TOTAL,
- + HM_IP4_BITS,
- + HM_IP6_BITS,
- + HM_MULTITHREAD,
- +};
- +extern int nHubMode;
- +extern const unsigned HubModes[5][5];
- +
- /** IP address (IPv6, or IPv4 using mapped IPv6 range (::FFFF:0:0/96)) */
- class CNetAddr
- diff -Nurp src/net.cpp src2/net.cpp
- --- src/net.cpp 2012-04-27 05:20:53.091660464 -0700
- +++ src2/net.cpp 2012-04-27 06:44:35.193529044 -0700
- @@ -22,9 +22,38 @@
- #include <miniupnpc/upnperrors.h>
- #endif
- +// CAUTION: This is not the offical net.cpp from the official
- +// bitcoin distribution. It has been modified by me
- +// <DavidJoelSchwartz@gmail.com> to support 'hub' modes.
- +// This is quick and dirty code, it may not work for you. No warranties
- +// are expressed or implied. I made a best effort to improve the RPC
- +// performance. This notification is for blame, not for credit and
- +// may be removed if this change, or one similar, is accepted into the
- +// main distribution. If this has helped you, please donate to:
- +// 1H3STBxuzEHZQQD4hkjVE22TWTazcZzeBw
- +
- using namespace std;
- using namespace boost;
- +int nHubMode = 0;
- +const unsigned HubModes[5][5]=
- +{ // outbound connections, total connections, IP mask, multithreaded connect
- + { 8, 125, 16, 32, 0 }, // Normal mode
- + { 16, 200, 16, 32, 0 }, // Small hub mode
- + { 32, 384, 24, 36, 1 }, // Medium hub mode
- + { 32, 640, 24, 36, 1 }, // Large hub mode
- + { 32, 1536, 32, 40, 1 } // Largest hub mode
- +};
- +
- +#if defined(FD_SETSIZE) && (FD_SETSIZE<1600)
- +#if FD_SETSIZE<1024
- +#warning This build will not be able to run at high hub levels
- +#else
- +#warning This build will not be able to run at hub mode 4
- +#endif
- +#endif
- +
- +
- static const int MAX_OUTBOUND_CONNECTIONS = 8;
- void ThreadMessageHandler2(void* parg);
- @@ -49,7 +78,7 @@ uint64 nLocalServices = (fClient ? 0 : N
- CAddress addrLocalHost(CService("0.0.0.0", 0), nLocalServices);
- static CNode* pnodeLocalHost = NULL;
- uint64 nLocalHostNonce = 0;
- -array<int, THREAD_MAX> vnThreadsRunning;
- +array<atomic_long, THREAD_MAX> vnThreadsRunning;
- static SOCKET hListenSocket = INVALID_SOCKET;
- CAddrMan addrman;
- @@ -64,6 +93,11 @@ map<CInv, int64> mapAlreadyAskedFor;
- set<CNetAddr> setservAddNodeAddresses;
- CCriticalSection cs_setservAddNodeAddresses;
- +// Work Notification
- +boost::mutex mWorkNotification;
- +boost::condition_variable cvWorkNotification;
- +bool fWorkFound;
- +
- unsigned short GetListenPort()
- @@ -727,7 +761,7 @@ void ThreadSocketHandler2(void* parg)
- if (WSAGetLastError() != WSAEWOULDBLOCK)
- printf("socket error accept failed: %d\n", WSAGetLastError());
- }
- - else if (nInbound >= GetArg("-maxconnections", 125) - MAX_OUTBOUND_CONNECTIONS)
- + else if (nInbound >= GetArg("-maxconnections", HubModes[nHubMode][HM_MAX_TOTAL]) - HubModes[nHubMode][HM_MAX_OUTBOUND])
- {
- CRITICAL_BLOCK(cs_setservAddNodeAddresses)
- if (!setservAddNodeAddresses.count(addr))
- @@ -759,10 +793,18 @@ void ThreadSocketHandler2(void* parg)
- BOOST_FOREACH(CNode* pnode, vNodesCopy)
- pnode->AddRef();
- }
- + bool fGotData = false;
- BOOST_FOREACH(CNode* pnode, vNodesCopy)
- {
- if (fShutdown)
- + {
- + CRITICAL_BLOCK(cs_vNodes)
- + {
- + BOOST_FOREACH(CNode* pnode, vNodesCopy)
- + pnode->Release();
- + }
- return;
- + }
- //
- // Receive
- @@ -790,6 +832,8 @@ void ThreadSocketHandler2(void* parg)
- vRecv.resize(nPos + nBytes);
- memcpy(&vRecv[nPos], pchBuf, nBytes);
- pnode->nLastRecv = GetTime();
- + pnode->fNeedProcess = true;
- + fGotData = true;
- }
- else if (nBytes == 0)
- {
- @@ -850,6 +894,14 @@ void ThreadSocketHandler2(void* parg)
- }
- }
- + if (fGotData)
- + {
- + mWorkNotification.lock();
- + fWorkFound = true;
- + mWorkNotification.unlock();
- + cvWorkNotification.notify_one();
- + }
- +
- //
- // Inactivity checking
- //
- @@ -880,7 +932,7 @@ void ThreadSocketHandler2(void* parg)
- pnode->Release();
- }
- - Sleep(10);
- + Sleep(2);
- }
- }
- @@ -1117,6 +1169,16 @@ void ThreadDNSAddressSeed2(void* parg)
- +void ONCThread(void *parg)
- +{ // Open a network connection in a thread
- + IMPLEMENT_RANDOMIZE_STACK(ONCThread(parg));
- + CAddress *addr=(CAddress *) parg;
- + vnThreadsRunning[THREAD_CONNECT]++;
- + OpenNetworkConnection(*addr);
- + vnThreadsRunning[THREAD_CONNECT]--;
- + delete addr;
- +}
- +
- unsigned int pnSeed[] =
- {
- @@ -1254,6 +1316,31 @@ void ThreadOpenConnections2(void* parg)
- {
- printf("ThreadOpenConnections started\n");
- + // Determine and validate configured hub mode
- + string sHubMode=GetArg("-hub", "0");
- + int nHM=atoi(sHubMode);
- + if ((nHM >= 0) && (nHM <= 4))
- + nHubMode = nHM;
- + if( ( sizeof(fd_set) * 8 ) < ( HubModes[nHubMode][HM_MAX_TOTAL] + 32 ) )
- + {
- + fShutdown = true;
- + printf("*** Unable to support requested hub mode due to compilation flags\n");
- + CreateThread(Shutdown, NULL);
- + return;
- + }
- +
- +#ifdef RLIMIT_NOFILE
- + struct rlimit srLimit;
- + if ( (getrlimit(RLIMIT_NOFILE, &srLimit) == 0) &&
- + (srLimit.rlim_cur < ( HubModes[nHubMode][HM_MAX_TOTAL] + 32 ) ) )
- + {
- + fShutdown = true;
- + printf("*** Unable to support requested hub mode due to resource limit\n");
- + CreateThread(Shutdown, NULL);
- + return;
- + }
- +#endif
- +
- // Connect to specific addresses
- if (mapArgs.count("-connect"))
- {
- @@ -1280,9 +1367,6 @@ void ThreadOpenConnections2(void* parg)
- {
- int nOutbound = 0;
- - vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
- - Sleep(500);
- - vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
- if (fShutdown)
- return;
- @@ -1294,22 +1378,27 @@ void ThreadOpenConnections2(void* parg)
- BOOST_FOREACH(CNode* pnode, vNodes)
- if (!pnode->fInbound)
- nOutbound++;
- - int nMaxOutboundConnections = MAX_OUTBOUND_CONNECTIONS;
- - nMaxOutboundConnections = min(nMaxOutboundConnections, (int)GetArg("-maxconnections", 125));
- + int nMaxOutboundConnections = HubModes[nHubMode][HM_MAX_OUTBOUND];
- + nMaxOutboundConnections = min(nMaxOutboundConnections, (int)GetArg("-maxconnections", nMaxOutboundConnections));
- + int nSleepTime = 5000;
- if (nOutbound < nMaxOutboundConnections)
- - break;
- + nSleepTime = 500;
- vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
- - Sleep(2000);
- + Sleep(nSleepTime);
- vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
- if (fShutdown)
- return;
- + if (nOutbound < nMaxOutboundConnections)
- + break;
- }
- +
- + int64 nNow = GetTime();
- bool fAddSeeds = false;
- // Add seed nodes if IRC isn't working
- bool fTOR = (fUseProxy && addrProxy.GetPort() == 9050);
- - if (addrman.size()==0 && (GetTime() - nStart > 60 || fTOR) && !fTestNet)
- + if (addrman.size()==0 && (nNow - nStart > 60 || fTOR) && !fTestNet)
- {
- std::vector<CAddress> vAdd;
- for (int i = 0; i < ARRAYLEN(pnSeed); i++)
- @@ -1322,7 +1411,7 @@ void ThreadOpenConnections2(void* parg)
- struct in_addr ip;
- memcpy(&ip, &pnSeed[i], sizeof(ip));
- CAddress addr(CService(ip, GetDefaultPort()));
- - addr.nTime = GetTime()-GetRand(nOneWeek)-nOneWeek;
- + addr.nTime = nNow-GetRand(nOneWeek)-nOneWeek;
- vAdd.push_back(addr);
- }
- addrman.Add(vAdd, CNetAddr("127.0.0.1"));
- @@ -1334,7 +1423,7 @@ void ThreadOpenConnections2(void* parg)
- CAddress addrConnect;
- int64 nBest = std::numeric_limits<int64>::min();
- - // Only connect to one address per a.b.?.? range.
- + // Only connect to one address per IP block
- // Do this here so we don't have to critsect vNodes inside mapAddresses critsect.
- set<vector<unsigned char> > setConnected;
- CRITICAL_BLOCK(cs_vNodes)
- @@ -1368,7 +1457,16 @@ void ThreadOpenConnections2(void* parg)
- }
- if (addrConnect.IsValid())
- - OpenNetworkConnection(addrConnect);
- + {
- + if (HubModes[nHubMode][HM_MULTITHREAD]==0)
- + {
- + vnThreadsRunning[THREAD_CONNECT]++;
- + OpenNetworkConnection(addrConnect);
- + vnThreadsRunning[THREAD_CONNECT]--;
- + }
- + else
- + CreateThread(ONCThread, new CAddress(addrConnect));
- + }
- }
- }
- @@ -1453,9 +1551,9 @@ bool OpenNetworkConnection(const CAddres
- FindNode((CNetAddr)addrConnect) || CNode::IsBanned(addrConnect))
- return false;
- - vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
- + vnThreadsRunning[THREAD_CONNECT]--;
- CNode* pnode = ConnectNode(addrConnect);
- - vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
- + vnThreadsRunning[THREAD_CONNECT]++;
- if (fShutdown)
- return false;
- if (!pnode)
- @@ -1505,6 +1603,10 @@ void ThreadMessageHandler2(void* parg)
- pnode->AddRef();
- }
- + mWorkNotification.lock();
- + fWorkFound = false;
- + mWorkNotification.unlock();
- +
- // Poll the connected nodes for messages
- CNode* pnodeTrickle = NULL;
- if (!vNodesCopy.empty())
- @@ -1513,7 +1615,14 @@ void ThreadMessageHandler2(void* parg)
- {
- // Receive messages
- TRY_CRITICAL_BLOCK(pnode->cs_vRecv)
- - ProcessMessages(pnode);
- + {
- + if (pnode->fNeedProcess)
- + {
- + pnode->fNeedProcess = false;
- + ProcessMessages(pnode);
- + }
- + }
- +
- if (fShutdown)
- return;
- @@ -1534,7 +1643,12 @@ void ThreadMessageHandler2(void* parg)
- // Reduce vnThreadsRunning so StopNode has permission to exit while
- // we're sleeping, but we must always check fShutdown after doing this.
- vnThreadsRunning[THREAD_MESSAGEHANDLER]--;
- - Sleep(100);
- + { // CAUTION: Raising the delay will slow connection accept
- + boost::posix_time::time_duration wait_duration = boost::posix_time::millisec(250);
- + boost::unique_lock<boost::mutex> lock(mWorkNotification);
- + if(!fWorkFound)
- + cvWorkNotification.timed_wait(lock, wait_duration);
- + }
- if (fRequestShutdown)
- Shutdown(NULL);
- vnThreadsRunning[THREAD_MESSAGEHANDLER]++;
- @@ -1768,6 +1882,8 @@ bool StopNode()
- if (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0) printf("ThreadMessageHandler still running\n");
- if (vnThreadsRunning[THREAD_MINER] > 0) printf("ThreadBitcoinMiner still running\n");
- if (vnThreadsRunning[THREAD_RPCSERVER] > 0) printf("ThreadRPCServer still running\n");
- + if (vnThreadsRunning[THREAD_CONNECT] > 0) printf("ThreadConnect still running\n");
- + if (vnThreadsRunning[THREAD_RPCSERVER0] > 0) printf("ThreadRPCServer0 still running\n");
- if (fHaveUPnP && vnThreadsRunning[THREAD_UPNP] > 0) printf("ThreadMapPort still running\n");
- if (vnThreadsRunning[THREAD_DNSSEED] > 0) printf("ThreadDNSAddressSeed still running\n");
- if (vnThreadsRunning[THREAD_ADDEDCONNECTIONS] > 0) printf("ThreadOpenAddedConnections still running\n");
- diff -Nurp src/net.h src2/net.h
- --- src/net.h 2012-04-27 05:20:53.091660464 -0700
- +++ src2/net.h 2012-04-27 06:44:35.193529044 -0700
- @@ -8,6 +8,7 @@
- #include <deque>
- #include <boost/array.hpp>
- #include <boost/foreach.hpp>
- +#include <boost/detail/atomic_count.hpp>
- #include <openssl/rand.h>
- #ifndef WIN32
- @@ -77,6 +78,8 @@ enum threadId
- THREAD_MESSAGEHANDLER,
- THREAD_MINER,
- THREAD_RPCSERVER,
- + THREAD_CONNECT,
- + THREAD_RPCSERVER0,
- THREAD_UPNP,
- THREAD_DNSSEED,
- THREAD_ADDEDCONNECTIONS,
- @@ -90,7 +93,18 @@ extern bool fAllowDNS;
- extern uint64 nLocalServices;
- extern CAddress addrLocalHost;
- extern uint64 nLocalHostNonce;
- -extern boost::array<int, THREAD_MAX> vnThreadsRunning;
- +class atomic_long {
- + boost::detail::atomic_count val;
- +
- +public:
- + atomic_long(): val(0) {}
- + void operator++() { ++val; }
- + long operator--() { return --val; }
- + void operator++(int i) { ++val; }
- + void operator--(int i) { --val; }
- + operator long() const { return val; }
- +};
- +extern boost::array<atomic_long, THREAD_MAX> vnThreadsRunning;
- extern CAddrMan addrman;
- extern std::vector<CNode*> vNodes;
- @@ -130,6 +144,7 @@ public:
- bool fNetworkNode;
- bool fSuccessfullyConnected;
- bool fDisconnect;
- + bool fNeedProcess;
- protected:
- int nRefCount;
- @@ -185,6 +200,7 @@ public:
- fNetworkNode = false;
- fSuccessfullyConnected = false;
- fDisconnect = false;
- + fNeedProcess = false;
- nRefCount = 0;
- nReleaseTime = 0;
- hashContinue = 0;
- diff -Nurp src/util.cpp src2/util.cpp
- --- src/util.cpp 2012-04-27 05:20:53.103215469 -0700
- +++ src2/util.cpp 2012-04-27 06:44:35.193529044 -0700
- @@ -27,7 +27,7 @@ bool fShutdown = false;
- bool fDaemon = false;
- bool fServer = false;
- bool fCommandLine = false;
- -string strMiscWarning;
- +string strMiscWarning, strRPCUser, strRPCPass;
- bool fTestNet = false;
- bool fNoListen = false;
- bool fLogTimestamps = false;
- diff -Nurp src/util.h src2/util.h
- --- src/util.h 2012-04-27 05:20:53.103215469 -0700
- +++ src2/util.h 2012-04-27 06:44:35.193529044 -0700
- @@ -119,7 +119,7 @@ extern bool fShutdown;
- extern bool fDaemon;
- extern bool fServer;
- extern bool fCommandLine;
- -extern std::string strMiscWarning;
- +extern std::string strMiscWarning, strRPCUser, strRPCPass;
- extern bool fTestNet;
- extern bool fNoListen;
- extern bool fLogTimestamps;
- @@ -658,7 +658,7 @@ inline void SetThreadPriority(int nPrior
- inline void ExitThread(size_t nExitCode)
- {
- - pthread_exit((void*)nExitCode);
- + pthread_exit((void*)(uintptr_t)nExitCode);
- }
- #endif
Add Comment
Please, Sign In to add comment