Advertisement
Guest User

Untitled

a guest
Dec 19th, 2014
176
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 11.38 KB | None | 0 0
  1. #include <pthread.h>
  2. #include <stdio.h>
  3. #include <unistd.h>
  4. #include <vector>
  5. #include <string>
  6. #include <iostream>
  7.  
  8. pthread_mutex_t demoMutex = PTHREAD_MUTEX_INITIALIZER;
  9. pthread_cond_t conditionVariable = PTHREAD_COND_INITIALIZER;
  10. unsigned int condition = 0;
  11.  
  12. struct serverInfo
  13. {
  14. unsigned int serverId;
  15. pthread_t threadId;
  16. std :: vector <std :: string> queue;
  17. };
  18. std :: vector <serverInfo> serverInfoVector;
  19.  
  20. void * printHello (void* threadId)
  21. {
  22. pthread_t *my_tid = (pthread_t *)threadId;
  23.  
  24. pthread_mutex_lock (&demoMutex);
  25.  
  26. while (condition == 0)
  27. pthread_cond_wait (&conditionVariable, &demoMutex);
  28.  
  29. unsigned int i = 0;
  30. char found = false;
  31.  
  32. if (serverInfoVector.size () > 0)
  33. {
  34. while ((i < serverInfoVector.size ()) && (found == false))
  35. {
  36. if (pthread_equal (pthread_self(), serverInfoVector [i].threadId))
  37. {
  38. found = true;
  39. break;
  40. }
  41. else
  42. i++;
  43. }
  44. }
  45.  
  46. while ((found == true) && (!serverInfoVector [i].queue.empty ()))
  47. {
  48. std :: cout << "nThread: " << pthread_self () << ", poped from queue: " << serverInfoVector [i].queue.front () << "n";
  49. serverInfoVector [i].queue.pop_back ();
  50. }
  51.  
  52. pthread_mutex_unlock (&demoMutex);
  53. pthread_exit (NULL);
  54. }
  55.  
  56. void checkServerExists (unsigned int serverNumber, std :: string message)
  57. {
  58. unsigned int i = 0;
  59. char found = false;
  60.  
  61. pthread_mutex_lock (&demoMutex);
  62.  
  63. if (serverInfoVector.size () > 0)
  64. {
  65. while ((i < serverInfoVector.size ()) && (found == false))
  66. {
  67. if (serverNumber == serverInfoVector [i].serverId)
  68. {
  69. found = true;
  70. break;
  71. }
  72. else
  73. i++;
  74. }
  75. }
  76.  
  77. if (found == false)
  78. {
  79. // This server doesn't exist, so create a thread for it, create a queue for it, push the message in the corresponding queue.
  80. // Push the server number in the serverNumberArray.
  81.  
  82. // Create a thread for it.
  83. pthread_t newThread;
  84. int returnValue;
  85. if ((returnValue = pthread_create (&newThread,
  86. NULL,
  87. printHello,
  88. (void*) &newThread)) != 0)
  89. {
  90. printf ("nerror: pthread_create failed with error number %d", returnValue);
  91. }
  92. printf ("nIn checkServerExists ()`: thread id %ldn", newThread);
  93.  
  94. // Push the message in its queue.
  95. serverInfo obj;
  96. obj.serverId = serverNumber;
  97. obj.threadId = newThread;
  98. obj.queue.push_back (message);
  99. serverInfoVector.push_back (obj);
  100.  
  101. condition++;
  102. pthread_cond_signal (&conditionVariable);
  103. pthread_mutex_unlock (&demoMutex);
  104. }
  105. else
  106. {
  107. // This server exists, so lookup its thread and queue, push the message in the corresponding queue.
  108. printf ("nIn else ()`: thread id %ldn", serverInfoVector [i].threadId);
  109. serverInfoVector [i].queue.push_back (message);
  110.  
  111. condition++;
  112. pthread_cond_signal (&conditionVariable);
  113. pthread_mutex_unlock (&demoMutex);
  114. }
  115. }
  116.  
  117. int main ()
  118. {
  119. checkServerExists (1, "anisha");
  120. checkServerExists (2, "kaul");
  121. checkServerExists (1, "sanjeev");
  122. checkServerExists (2, "sharma");
  123.  
  124. for (unsigned int i = 0; i < serverInfoVector.size (); i++)
  125. pthread_join (serverInfoVector [i].threadId, NULL);
  126.  
  127. return 0;
  128. }
  129.  
  130. extern "C" void* printHello (void* threadId);
  131.  
  132. while (condition == 0)
  133. pthread_cond_wait (&conditionVariable, &demoMutex);
  134.  
  135. while (condition == 0)
  136. {
  137. // Note I always enclose the statement in {} what happens if pthread_cond_wait()
  138. // had been a macro? You can never trust all third party libraries so it
  139. // is better to be safe than struggle to find it with debugger.
  140. pthread_cond_wait (&conditionVariable, &demoMutex);
  141. }
  142. // Once you know it is your decrement.
  143. --condition;
  144.  
  145. void * printHello (void* threadId)
  146. {
  147. void* result = NULL;
  148. try
  149. {
  150. result = // Your code here
  151. }
  152. catch(std::exception const& e) // Optional
  153. {
  154. // Log Exception
  155. std::cerr << "Exception: " << e.what() << "n";
  156. }
  157. catch(...) // MUST have this one.
  158. {
  159. // Log Exception
  160. std::cerr << "Exception: Unknown(...)n";
  161. }
  162. return result;
  163. }
  164.  
  165. class MutexLocker
  166. {
  167. pthread_mutex_t& mutex;
  168. MutextLocker(pthread_mutex_t& mutex)
  169. : mutex(mutex)
  170. {
  171. pthread_mutex_lock(&mutex);
  172. }
  173. ~MutexLocker()
  174. {
  175. pthread_mutex_unlock(&mutex);
  176. }
  177. };
  178.  
  179. struct serverInfo
  180. {
  181. unsigned int serverId;
  182. pthread_t threadId;
  183.  
  184. // Make the queue a pointer
  185. // So even when this object is copied the queue is unaffected.
  186. // May want to use a smart pointer or something (needs slightly more thought).
  187. std::vector<std::string>* queue;
  188. };
  189.  
  190. std::auto_ptr<std::vector<std::string>> queue = new std::vector<std::string>();
  191. // Pass a pointer to the queue to the thread.
  192. // Now the thread does not need to know or care about serverInfoVector
  193. // Which is good because this is being mutated by other people.
  194. pthread_create(&newThread, NULL, printHello, queue.get()); // Add error code
  195.  
  196. if (/* Everything OK */)
  197. {
  198. serverInfoVector.push_back(serverInfo(id, newThread, queue.release());
  199. }
  200.  
  201. struct serverInfo
  202. {
  203. unsigned int serverId;
  204. pthread_t threadId;
  205. std :: vector <std :: string> queue;
  206.  
  207. // Add this:
  208. serverInfo(unsigned int serverId, pthread_t threadId, std::string const& message)
  209. : serverId(serverId),
  210. , threadId(threadId)
  211. {
  212. queue.push_back(message);
  213. }
  214. };
  215.  
  216. std :: vector <serverInfo> serverInfoVector;
  217.  
  218. unsigned int i = 0;
  219.  
  220. if (serverInfoVector.size () > 0)
  221. {
  222.  
  223. if (serverInfoVector.size () > 0)
  224. {
  225. while ((i < serverInfoVector.size ()) && (found == false))
  226. {
  227. if (serverNumber == serverInfoVector [i].serverId)
  228. {
  229. found = true;
  230. break;
  231. }
  232. else
  233. i++;
  234. }
  235. }
  236.  
  237. // Push the message in its queue.
  238. serverInfo obj;
  239. obj.serverId = serverNumber;
  240. obj.threadId = newThread;
  241. obj.queue.push_back (message);
  242. serverInfoVector.push_back (obj);
  243.  
  244. serverInfoVector.push_back(serverInfo(serverNumber, newThread, message));
  245.  
  246. {
  247. // STUFF
  248.  
  249. condition++;
  250. pthread_cond_signal (&conditionVariable);
  251. pthread_mutex_unlock (&demoMutex);
  252. }
  253. else
  254. {
  255. // STUFF
  256.  
  257. condition++;
  258. pthread_cond_signal (&conditionVariable);
  259. pthread_mutex_unlock (&demoMutex);
  260. }
  261.  
  262. checkServerExists (1, "anisha");
  263. checkServerExists (2, "kaul");
  264. checkServerExists (1, "sanjeev");
  265. checkServerExists (2, "sharma");
  266.  
  267. MultiThreadServer server;
  268. server.AddJobToThreadWithID (1, "anisha");
  269. server.AddJobToThreadWithID (2, "kaul");
  270. server.AddJobToThreadWithID (1, "sanjeev");
  271. server.AddJobToThreadWithID (2, "sharma");
  272.  
  273. for (unsigned int i = 0; i < serverInfoVector.size (); i++)
  274. pthread_join (serverInfoVector [i].threadId, NULL);
  275.  
  276. return 0;
  277.  
  278. void myFunc()
  279. {
  280. pthread_mutex_lock(mutex)
  281.  
  282. // WORK
  283.  
  284. pthread_mutex_unlock(mutex);
  285. }
  286.  
  287. void myFunc()
  288. {
  289. MutexLocker lock(mutex)
  290.  
  291. // WORK
  292. }
  293.  
  294. for (unsigned int i = 0; i < serverInfoVector.size (); i++)
  295. pthread_join (serverInfoVector [i].threadId, NULL);
  296.  
  297. #include <string>
  298. #include <map>
  299. #include <list>
  300. #include <iostream>
  301.  
  302. #include <pthread.h>
  303.  
  304. class MutextLocker
  305. {
  306. pthread_mutex_t& mutex;
  307. public:
  308. MutextLocker(pthread_mutex_t& mutex): mutex(mutex)
  309. {
  310. pthread_mutex_lock(&mutex);
  311. }
  312. ~MutextLocker()
  313. {
  314. pthread_mutex_unlock(&mutex);
  315. }
  316. };
  317.  
  318. class QueInfo
  319. {
  320. public:
  321. QueInfo()
  322. : noMoreWork(false)
  323. , threadStarted(false)
  324. {
  325. if (pthread_mutex_init(&mutex, NULL) != 0)
  326. { throw int(1);
  327. }
  328. if (pthread_cond_init(&cond, NULL) != 0)
  329. {
  330. pthread_mutex_destroy(&mutex);
  331. throw int(2);
  332. }
  333. }
  334. ~QueInfo()
  335. {
  336. pthread_cond_destroy(&cond);
  337. pthread_mutex_destroy(&mutex);
  338. }
  339. bool getWorkItem(std::string& item)
  340. {
  341. MutextLocker lock(mutex);
  342.  
  343. while ((queue.size() == 0) && (!noMoreWork))
  344. {
  345. pthread_cond_wait (&cond, &mutex);
  346. }
  347. bool result = false;
  348. if (queue.size() != 0)
  349. {
  350. item = queue.front();
  351. queue.pop_front();
  352. result = true;
  353. }
  354. return result;
  355. }
  356. void addMessage(std::string const& item)
  357. {
  358. MutextLocker lock(mutex);
  359. queue.push_back(item);
  360. pthread_cond_signal(&cond);
  361. }
  362. void finishedAdding()
  363. {
  364. MutextLocker lock(mutex);
  365. noMoreWork = true;
  366. pthread_cond_signal(&cond);
  367. }
  368.  
  369. // These two are accessed by ServerInfo but
  370. // never by the thread. This means we do not need
  371. // to lock on their use. But I am being lazy here
  372. // leaving them as public members.
  373. pthread_t threadId; // Being lazy here
  374. bool threadStarted; // these two are for use by ServerInfo
  375. private:
  376. pthread_mutex_t mutex;
  377. pthread_cond_t cond;
  378. bool noMoreWork;
  379. std::list<std::string> queue;
  380. };
  381.  
  382. class ServerInfo
  383. {
  384. public:
  385. ~ServerInfo()
  386. {
  387. for (Cont::iterator loop = queue.begin(); loop != queue.end(); ++loop)
  388. {
  389. loop->second.finishedAdding();
  390. void* result;
  391. pthread_join(loop->second.threadId, &result);
  392. }
  393. }
  394.  
  395. void checkServerExists(unsigned int serverNumber, std::string const& message);
  396.  
  397. private:
  398. typedef std::map<unsigned int, QueInfo> Cont;
  399. Cont queue;
  400. };
  401.  
  402. void* printHello(void* data)
  403. {
  404. QueInfo* myQueue = reinterpret_cast<QueInfo*>(data);
  405.  
  406. std::string workItem;
  407. while(myQueue->getWorkItem(workItem))
  408. {
  409. std::cout << "nThread: " << pthread_self () << ", poped from queue: " << workItem << "n";
  410. }
  411. return NULL; // Return NULL on exit.
  412. }
  413.  
  414. void ServerInfo::checkServerExists(unsigned int serverNumber, std::string const& message)
  415. {
  416. QueInfo& item = queue[serverNumber]; // If it does not exist it is inserted.
  417. item.addMessage(message);
  418.  
  419. if (!item.threadStarted)
  420. {
  421. int returnValue;
  422. if ((returnValue = pthread_create (&item.threadId,
  423. NULL,
  424. printHello,
  425. reinterpret_cast<void*>(&item))) != 0)
  426. {
  427. std::cout << "nerror: pthread_create failed with error number "<< returnValue;
  428. queue.erase(serverNumber);
  429. }
  430. else
  431. {
  432. item.threadStarted = true;
  433. }
  434. std::cout << "nIn checkServerExists ()`: thread id " << item.threadId << "n";
  435. }
  436. else
  437. {
  438. std::cout << "nIn else ()`: thread id " << item.threadId << "n";
  439. }
  440. }
  441.  
  442. int main ()
  443. {
  444. ServerInfo server;
  445. server.checkServerExists (1, "anisha");
  446. server.checkServerExists (2, "kaul");
  447. server.checkServerExists (1, "sanjeev");
  448. server.checkServerExists (2, "sharma");
  449.  
  450. // Note ServerInfo destructor will wait for all the threads.
  451. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement