Advertisement
Guest User

incron multicast patch

a guest
Feb 3rd, 2012
166
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 14.68 KB | None | 0 0
  1. diff -ru --exclude=.svn incron-svn/appargs.cpp incron/appargs.cpp
  2. --- incron-svn/appargs.cpp 2012-02-03 11:34:52.255380477 +0100
  3. +++ incron/appargs.cpp 2012-02-03 11:45:30.327058000 +0100
  4. @@ -21,6 +21,7 @@
  5.  
  6.  
  7. #include <cstring>
  8. +#include <stdio.h>
  9.  
  10. #include "strtok.h"
  11.  
  12. diff -ru --exclude=.svn incron-svn/icd-main.cpp incron/icd-main.cpp
  13. --- incron-svn/icd-main.cpp 2012-02-03 11:34:52.254394984 +0100
  14. +++ incron/icd-main.cpp 2012-02-03 11:42:16.702707000 +0100
  15. @@ -24,6 +24,14 @@
  16. #include <sys/poll.h>
  17. #include <sys/stat.h>
  18. #include <cstring>
  19. +#ifdef WITH_MULTICAST_SUPPORT
  20. +# include <stdio.h>
  21. +# include <pthread.h>
  22. +# include <netinet/in.h>
  23. +# include <arpa/inet.h>
  24. +# include <stdlib.h>
  25. +# include <stdarg.h>
  26. +#endif
  27.  
  28. #include "inotify-cxx.h"
  29. #include "appinst.h"
  30. @@ -79,6 +87,244 @@
  31. /// Daemonize true/false
  32. bool g_daemon = true;
  33.  
  34. +/// Does it run as a master Node ?
  35. +volatile bool g_fMaster = true;
  36. +
  37. +#ifdef WITH_MULTICAST_SUPPORT
  38. +
  39. +/// mutex/cond/bool used to wait for network nodes before watching FS
  40. +pthread_mutex_t multiple_instance_mutex = PTHREAD_MUTEX_INITIALIZER;
  41. +pthread_cond_t multiple_instance_cond = PTHREAD_COND_INITIALIZER;
  42. +volatile bool g_threadRunning = false;
  43. +
  44. +/// used to exchange ID accross the network
  45. +typedef struct id_exchange {
  46. + uint32_t _magic; /// magic number to check application
  47. + uint32_t id; /// the id
  48. + uint32_t _mark; /// a random mark to identify sender
  49. +} id_exchange_t;
  50. +
  51. +#define _MAGIC (htonl(0xdeadbeef)) /// a random (but not too much) number
  52. +
  53. +/// a little helper function which print a string on console
  54. +/**
  55. + * Display a formatted string on stderr, if daemon mode is false
  56. + * \param[in] fmt formatted string, arguments follow
  57. + */
  58. +static void _debug(const char * fmt, ...) {
  59. +#ifdef DEBUG
  60. + if (!g_daemon) {
  61. + va_list vl;
  62. + va_start(vl, fmt);
  63. + vfprintf(stderr, fmt, vl);
  64. + va_end(vl);
  65. + fprintf(stderr, "\n");
  66. + }
  67. +#endif
  68. +}
  69. +
  70. +/// wait network read event on socket
  71. +/**
  72. + * \param[in] sock the socket on which we should wait for event
  73. + * \param[in] timeout how long ?
  74. + * \retval 0 timeout
  75. + * \retval 1 network read event handled
  76. + * \retval -1 network read error (other than EINTR)
  77. + */
  78. +int wait_network(int sock, double timeout = 1.0) {
  79. + fd_set read_fd;
  80. + FD_ZERO(&read_fd);
  81. + FD_SET(sock, &read_fd);
  82. + // one second timeout
  83. + struct timeval tv = { timeout, 0.0};
  84. + int max_fd = sock +1;
  85. + // select or timeout
  86. + int r;
  87. +
  88. + do {
  89. + r = select(max_fd, &read_fd, NULL, NULL, &tv);
  90. + } while (r == -1 && errno == EINTR);
  91. + return r;
  92. +}
  93. +
  94. +/// read network configuration as configured in file
  95. +int readNetworkConfiguration(struct sockaddr_in * addr_dest) {
  96. + if (!addr_dest)
  97. + return -1;
  98. + std::string cfgPeer;
  99. + std::string cfgPort;
  100. + bool r1 = false, r2 = false;
  101. + r1 = IncronCfg::GetValue("peer", cfgPeer);
  102. + r2 = IncronCfg::GetValue("listen_port", cfgPort);
  103. + memset(addr_dest, '\0', sizeof(*addr_dest));
  104. + addr_dest->sin_family = AF_INET;
  105. + addr_dest->sin_addr.s_addr = inet_addr(cfgPeer.c_str());
  106. + unsigned long uPort = strtoul(cfgPort.c_str(), NULL, 10);
  107. + addr_dest->sin_port = htons(uPort);
  108. + return 0;
  109. +}
  110. +
  111. +/// bind socket to the specified port
  112. +/** \param[in] port a NETWORK BYTE order port number
  113. + * \retval sock the socket bound to the port
  114. + * \retval -1 on error
  115. + */
  116. +int bindSocket(uint16_t port) {
  117. + sockaddr_in bind_addr;
  118. + unsigned char one = 1;
  119. +
  120. + int sock = socket(AF_INET, SOCK_DGRAM, 0);
  121. + if (sock == -1) {
  122. + syslog(LOG_CRIT, "Failed to create socket %m");
  123. + return sock;
  124. + }
  125. + setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&one,sizeof(one));
  126. +
  127. + memset(&bind_addr, '\0', sizeof(sockaddr_in));
  128. + bind_addr.sin_family = AF_INET;
  129. + bind_addr.sin_port = port; /* already in net order */
  130. + if (bind(sock, (struct sockaddr *)&bind_addr, sizeof(bind_addr))) {
  131. + syslog(LOG_CRIT, "Failed to bind socket %m");
  132. + close(sock);
  133. + sock = -1;
  134. + }
  135. + return sock;
  136. +}
  137. +/// Handle socket server
  138. +/** The thread routing which handle network event and master/slave mode
  139. + * \param[in] arg unused
  140. + */
  141. +void * networkServer(void * arg) {
  142. + srand((unsigned int)time(NULL)); // init random numbers, used later
  143. +
  144. + int hSocket = -1;
  145. +
  146. + // network initializations
  147. + sockaddr_in addr_dest;
  148. + readNetworkConfiguration(&addr_dest);
  149. +
  150. + id_exchange_t send_exchange = { _MAGIC, 0, rand() };
  151. + id_exchange_t recv_exchange = { 0, 0 };
  152. +
  153. + hSocket = bindSocket(addr_dest.sin_port);
  154. + if (hSocket == -1)
  155. + exit(1);
  156. +
  157. + if (IN_MULTICAST(addr_dest.sin_addr.s_addr)) {
  158. + unsigned char one = 1;
  159. + struct ip_mreq mreq;
  160. + memset(&mreq, '\0', sizeof(mreq));
  161. + mreq.imr_multiaddr = addr_dest.sin_addr;
  162. + if (setsockopt(hSocket, IPPROTO_IP, IP_MULTICAST_TTL, (char *)&one, sizeof(one)))
  163. + _debug("failed to set TTL");
  164. + if (setsockopt(hSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)))
  165. + _debug("failed to join multicast");
  166. + }
  167. +#define OTHER_ADDR_INIT(X) \
  168. + struct sockaddr_in X = { 0 }; \
  169. + (X).sin_family = AF_INET; \
  170. + socklen_t X ## _len = sizeof(X); \
  171. + (X).sin_addr.s_addr = htonl(INADDR_ANY)
  172. +
  173. + {
  174. + int r = wait_network(hSocket, 2.0);
  175. + if (r == 1) { // got something to read
  176. + OTHER_ADDR_INIT(other_addr);
  177. + r = recvfrom(hSocket, &recv_exchange, sizeof(recv_exchange), 0, (struct sockaddr *)&other_addr, &other_addr_len);
  178. + if (r == sizeof(recv_exchange) && recv_exchange._magic == _MAGIC) {
  179. + send_exchange.id = htonl(ntohl(recv_exchange.id)+10);
  180. + g_fMaster = false;
  181. + _debug("multiple nodes - slave mode");
  182. + } else {
  183. + _debug("bad node packet received");
  184. + syslog(LOG_CRIT, "bad packet received. Check network applications");
  185. + close(hSocket);
  186. + exit (1);
  187. + }
  188. + } else {
  189. + // no other nodes
  190. + send_exchange.id = htonl(10);
  191. + g_fMaster = true;
  192. + _debug("single node - master mode");
  193. + }
  194. + }
  195. +
  196. + // make the main thread running
  197. + pthread_mutex_lock(&multiple_instance_mutex);
  198. + g_threadRunning = true;
  199. + pthread_cond_broadcast(&multiple_instance_cond);
  200. + pthread_mutex_unlock(&multiple_instance_mutex);
  201. +
  202. + while (!g_fFinish) {
  203. + // master is sending keepalive each seconds
  204. + // slave is waiting for 2 seconds
  205. + int r = wait_network(hSocket, (g_fMaster ? 1.0 : 2.0));
  206. + if (r == 1) { // got something to read
  207. + OTHER_ADDR_INIT(other_addr);
  208. + r = recvfrom(hSocket, &recv_exchange, sizeof(recv_exchange), 0,
  209. + (struct sockaddr *)&other_addr, &other_addr_len);
  210. + if (other_addr.sin_addr.s_addr != 0) {
  211. +#ifdef DEBUG
  212. + {
  213. +#ifndef MAX
  214. +# define MAX(a,b) ((a) > (b) ? (a) : (b))
  215. +#endif
  216. + char addr[MAX(INET_ADDRSTRLEN,INET6_ADDRSTRLEN)] = { '\0' };
  217. + inet_ntop(other_addr.sin_family, &(other_addr.sin_addr),
  218. + addr, MAX(INET_ADDRSTRLEN,INET6_ADDRSTRLEN));
  219. + _debug("packet from %s", addr);
  220. + }
  221. +#endif
  222. + if (r == sizeof(recv_exchange) &&
  223. + recv_exchange._magic == _MAGIC &&
  224. + recv_exchange._mark != send_exchange._mark) {
  225. + /*
  226. + * the lowest ID is the master, unless if 0
  227. + * if the received ID (the "other") is lowest than mine,
  228. + * I am no more the master
  229. + * if the received ID is higher, I am still the master
  230. + * if both are equal, a new master is elected. I have to regenerate a new ID higher.
  231. + */
  232. + if (ntohl(recv_exchange.id) > ntohl(send_exchange.id)) {
  233. + _debug("got id %u, higher than mine %u: no changes (%s)",
  234. + ntohl(recv_exchange.id), ntohl(send_exchange.id),
  235. + (g_fMaster ? "master" : "slave"));
  236. + } else if (ntohl(recv_exchange.id) < ntohl(send_exchange.id)) {
  237. + _debug("got lower id %u, lower than mine %u: not master",
  238. + ntohl(recv_exchange.id), ntohl(send_exchange.id));
  239. + g_fMaster = false;
  240. + } else if (recv_exchange.id == send_exchange.id) {
  241. + _debug("Now a slave node (conflict)");
  242. + g_fMaster = false;
  243. + // generate a new id
  244. + send_exchange.id = rand();
  245. + sendto(hSocket, &send_exchange, sizeof(send_exchange), 0, (struct sockaddr *)&addr_dest, sizeof(addr_dest));
  246. + }
  247. + } else {
  248. + if (r == (signed int)sizeof(recv_exchange)) {
  249. + _debug ("-my pkt? - recv.magic = %x / .id = %u", ntohl(recv_exchange._magic), ntohl(recv_exchange.id));
  250. + }
  251. + // ignore other packets
  252. + }
  253. + } else {
  254. + _debug ("got a packet from 0.0.0.0 ?");
  255. + }
  256. + } else if (r == 0) {
  257. + // timeout
  258. + _debug("net timeout, sending keepalive, now master");
  259. + sendto(hSocket, &send_exchange, sizeof(send_exchange), 0, (struct sockaddr *)&addr_dest, sizeof(addr_dest));
  260. + g_fMaster = true;
  261. + } else if (r == -1) {
  262. + g_fMaster = false;
  263. + close(hSocket);
  264. + }
  265. + }
  266. + close(hSocket);
  267. + return NULL;
  268. +}
  269. +
  270. +#endif /* WITH_MULTICAST_SUPPORT */
  271. +
  272. /// Handles a signal.
  273. /**
  274. * For SIGTERM and SIGINT it sets the program finish variable.
  275. @@ -367,7 +613,7 @@
  276. if (access(sysBase.c_str(), R_OK) != 0) {
  277. syslog(LOG_CRIT, "cannot read directory for system tables (%s): (%i) %s", sysBase.c_str(), errno, strerror(errno));
  278. if (!g_daemon)
  279. - fprintf(stderr, "cannot read directory for system tables (%s): (%i) %s", sysBase.c_str(), errno, strerror(errno));
  280. + fprintf(stderr, "cannot read directory for system tables (%s): (%i) %s\n", sysBase.c_str(), errno, strerror(errno));
  281. ret = 1;
  282. goto error;
  283. }
  284. @@ -378,7 +624,7 @@
  285. if (access(userBase.c_str(), R_OK) != 0) {
  286. syslog(LOG_CRIT, "cannot read directory for user tables (%s): (%i) %s", userBase.c_str(), errno, strerror(errno));
  287. if (!g_daemon)
  288. - fprintf(stderr, "cannot read directory for user tables (%s): (%i) %s", userBase.c_str(), errno, strerror(errno));
  289. + fprintf(stderr, "cannot read directory for user tables (%s): (%i) %s\n", userBase.c_str(), errno, strerror(errno));
  290. ret = 1;
  291. goto error;
  292. }
  293. @@ -408,7 +654,44 @@
  294. goto error;
  295. }
  296.  
  297. - prepare_pipe();
  298. + // IPG-NB ///////////////////////////////////////////////
  299. + /* create socket server and socket thread
  300. + * to synchronize between inotify instances
  301. + */
  302. + std::string cfgPeer;
  303. + std::string cfgPort;
  304. + bool r1 = false, r2 = false;
  305. + r1 = IncronCfg::GetValue("peer", cfgPeer);
  306. + r2 = IncronCfg::GetValue("listen_port", cfgPort);
  307. + if (r1 && r2) {
  308. + syslog(LOG_CRIT, "starting in multiple instances mode (%s:%s)", cfgPeer.c_str(), cfgPort.c_str());
  309. + if (!g_daemon)
  310. + fprintf(stderr, "starting in multiple instances mode (%s:%s)\n", cfgPeer.c_str(), cfgPort.c_str());
  311. + // disable running as a master node by default
  312. + g_fMaster = false;
  313. + pthread_mutex_lock(&multiple_instance_mutex);
  314. + pthread_t serverThread;
  315. + if (pthread_create (&serverThread, NULL, networkServer, NULL)) {
  316. + syslog(LOG_CRIT, "failed to start in multiple instances mode");
  317. + if (!g_daemon)
  318. + fprintf(stderr, "failed to start multiple instances mode\n");
  319. + ret = 1;
  320. + goto error;
  321. + }
  322. + // server thread started
  323. + while (!g_threadRunning) {
  324. + pthread_cond_wait(&multiple_instance_cond, &multiple_instance_mutex);
  325. + }
  326. + pthread_mutex_unlock(&multiple_instance_mutex);
  327. + if (!g_daemon)
  328. + fprintf(stderr, "starting monitoring\n");
  329. + } else {
  330. + g_threadRunning = true;
  331. + }
  332. +
  333. + // IPG-NB END ////////////////////////////////////////////
  334. +
  335. + prepare_pipe();
  336.  
  337. Inotify in;
  338. in.SetNonBlock(true);
  339. @@ -443,7 +726,7 @@
  340.  
  341. int res = poll(ed.GetPollData(), ed.GetSize(), -1);
  342.  
  343. - if (res > 0) {
  344. + if (res > 0 && g_fMaster) {
  345. if (ed.ProcessEvents())
  346. UserTable::FinishDone();
  347. }
  348. diff -ru --exclude=.svn incron-svn/ict-main.cpp incron/ict-main.cpp
  349. --- incron-svn/ict-main.cpp 2012-02-03 11:34:52.257525597 +0100
  350. +++ incron/ict-main.cpp 2012-01-31 17:21:02.372338000 +0100
  351. @@ -49,7 +49,7 @@
  352.  
  353. /// incrontab description string
  354. #define INCRONTAB_DESCRIPTION "incrontab - inotify cron table manipulator\n" \
  355. - "(c) Lukas Jelinek, 2006, 2007, 208"
  356. + "(c) Lukas Jelinek, 2006, 2007, 2008"
  357.  
  358. /// incrontab help string
  359. #define INCRONTAB_HELP INCRONTAB_DESCRIPTION "\n\n" \
  360. diff -ru --exclude=.svn incron-svn/incroncfg.cpp incron/incroncfg.cpp
  361. --- incron-svn/incroncfg.cpp 2012-02-03 11:34:52.257525597 +0100
  362. +++ incron/incroncfg.cpp 2012-02-03 11:44:52.175537000 +0100
  363. @@ -41,6 +41,8 @@
  364. m_defaults.insert(CFG_MAP::value_type("lockfile_dir", "/var/run"));
  365. m_defaults.insert(CFG_MAP::value_type("lockfile_name", "incrond"));
  366. m_defaults.insert(CFG_MAP::value_type("editor", ""));
  367. + //m_defaults.insert(CFG_MAP::value_type("peer", "224.0.0.1"));
  368. + //m_defaults.insert(CFG_MAP::value_type("listen_port", "65001"));
  369. }
  370.  
  371. void IncronCfg::Load(const std::string& rPath)
  372. @@ -181,7 +183,7 @@
  373.  
  374. bool IncronCfg::IsComment(const char* s)
  375. {
  376. - char* sx = strchr(s, '#');
  377. + const char* sx = strchr(s, '#');
  378. if (sx == NULL)
  379. return false;
  380.  
  381. diff -ru --exclude=.svn incron-svn/incron.conf.example incron/incron.conf.example
  382. --- incron-svn/incron.conf.example 2012-02-03 11:34:52.258380615 +0100
  383. +++ incron/incron.conf.example 2012-01-20 23:58:47.464186298 +0100
  384. @@ -70,3 +70,15 @@
  385. # Example:
  386. # editor = nano
  387.  
  388. +# Parameter: peer
  389. +# Meaning: peer host watching the same filesystem / incrontable
  390. +# Description: Set this to an IP or FQDN of the incrond daemon running on
  391. +# another host, but watching same FS
  392. +# This is usefull if you need incrond running on 2 servers
  393. +# watching files on the same clusterFS
  394. +# Default: 127.0.0.1
  395. +#
  396. +# Example:
  397. +# peer = 192.168.1.2
  398. +
  399. +# listen_port = 65134
  400. diff -ru --exclude=.svn incron-svn/inotify-cxx.cpp incron/inotify-cxx.cpp
  401. --- incron-svn/inotify-cxx.cpp 2012-02-03 11:34:52.254394984 +0100
  402. +++ incron/inotify-cxx.cpp 2012-01-20 18:24:17.913326646 +0100
  403. @@ -23,6 +23,7 @@
  404. #include <errno.h>
  405. #include <unistd.h>
  406. #include <fcntl.h>
  407. +#include <stdio.h>
  408.  
  409. #include "inotify-cxx.h"
  410.  
  411. diff -ru --exclude=.svn incron-svn/Makefile incron/Makefile
  412. --- incron-svn/Makefile 2012-02-03 11:34:52.255380477 +0100
  413. +++ incron/Makefile 2012-02-03 11:48:59.914177655 +0100
  414. @@ -16,9 +16,11 @@
  415. DEBUG = -g0
  416. WARNINGS = -Wall
  417. CXXAUX = -pipe
  418. +OPTS = -DWITH_MULTICAST_SUPPORT=1
  419. +THREADS = -pthread
  420.  
  421. -CXXFLAGS = $(OPTIMIZE) $(DEBUG) $(WARNINGS) $(CXXAUX)
  422. -LDFLAGS = $(WARNINGS)
  423. +CXXFLAGS = $(OPTIMIZE) $(DEBUG) $(WARNINGS) $(CXXAUX) $(OPTS) $(THREADS)
  424. +LDFLAGS = $(WARNINGS) $(THREADS)
  425.  
  426. PROGRAMS = incrond incrontab
  427. diff -ru --exclude=.svn incron-svn/usertable.cpp incron/usertable.cpp
  428. --- incron-svn/usertable.cpp 2012-02-03 11:34:52.258380615 +0100
  429. +++ incron/usertable.cpp 2012-01-20 18:24:43.893078652 +0100
  430. @@ -27,6 +27,7 @@
  431. #include <stdlib.h>
  432. #include <sys/stat.h>
  433. #include <cstring>
  434. +#include <stdio.h>
  435.  
  436. #include "usertable.h"
  437. #include "incroncfg.h"
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement