Advertisement
Guest User

spacey.cmwA.cc

a guest
Sep 16th, 2023
64
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.93 KB | None | 0 0
  1. #include "credentials.hh"
  2. #include <cmwBuffer.hh>
  3.  
  4. #include <cassert>
  5. #include <cstdio>
  6. #include <ctime>
  7. #include <deque>
  8. #include <liburing.h>
  9. #include <linux/sctp.h>
  10. #include <poll.h>
  11. #include <signal.h>
  12. #include <syslog.h>
  13.  
  14. using namespace ::cmw;
  15.  
  16. struct Socky {
  17. ::sockaddr_in6 addr;
  18. ::socklen_t len = sizeof addr;
  19. };
  20.  
  21. struct cmwRequest {
  22. Socky const frnt;
  23.  
  24. private:
  25. static inline ::int32_t prevTime;
  26. ::int32_t const bday;
  27. MarshallingInt const acctNbr;
  28. FixedString120 path;
  29. char *mdlFile;
  30. FileWrapper fl;
  31.  
  32. static bool marshalFile(char const *name, auto &buf) {
  33. struct ::stat sb;
  34. if (::stat(name, &sb) < 0)
  35. raise("stat", name, errno);
  36. if (sb.st_mtime <= prevTime)
  37. return false;
  38.  
  39. receive(buf,
  40. {'.' == name[0] || name[0] == '/' ? ::std::strrchr(name, '/') + 1
  41. : name,
  42. {"\0", 1}});
  43. buf.receiveFile(name, sb.st_size);
  44. return true;
  45. }
  46.  
  47. public:
  48. cmwRequest(Socky const &ft, auto &buf)
  49. : frnt{ft}, bday(::std::time(nullptr)), acctNbr{buf}, path{buf} {
  50. if (path.bytesAvailable() < 3)
  51. raise("No room for file suffix");
  52. mdlFile = ::std::strrchr(path(), '/');
  53. if (!mdlFile)
  54. raise("cmwRequest didn't find /");
  55. *mdlFile = 0;
  56. setDirectory(path());
  57. *mdlFile = '/';
  58. char last[60];
  59. ::std::snprintf(last, sizeof last, ".%s.last", ++mdlFile);
  60. fl = FileWrapper{last, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP};
  61. switch (::pread(fl(), &prevTime, sizeof prevTime, 0)) {
  62. case 0:
  63. prevTime = 0;
  64. break;
  65. case -1:
  66. raise("pread", errno);
  67. }
  68. }
  69.  
  70. void marshal(auto &buf) const {
  71. acctNbr.marshal(buf);
  72. if (auto ind = buf.reserveBytes(1);
  73. !buf.receiveAt(ind, marshalFile(mdlFile, buf)))
  74. receive(buf, {mdlFile, ::std::strlen(mdlFile) + 1});
  75.  
  76. ::int8_t updatedFiles = 0;
  77. auto const idx = buf.reserveBytes(sizeof updatedFiles);
  78. FileBuffer f{mdlFile, O_RDONLY};
  79. while (auto tok = f.getline().data()) {
  80. if (!::std::strncmp(tok, "//", 2) || !::std::strncmp(tok, "define", 6))
  81. continue;
  82. if (!::std::strncmp(tok, "--", 2))
  83. break;
  84. if (marshalFile(tok, buf))
  85. ++updatedFiles;
  86. }
  87. buf.receiveAt(idx, updatedFiles);
  88. }
  89.  
  90. auto getFileName() {
  91. Write(fl(), &bday, sizeof bday);
  92. return path.append(".hh");
  93. }
  94. };
  95. #include "cmwA.mdl.hh"
  96.  
  97. void bail(char const *fmt, auto... t) {
  98. ::syslog(LOG_ERR, fmt, t...);
  99. exitFailure();
  100. }
  101.  
  102. void checkField(char const *fld, ::std::string_view actl) {
  103. if (actl != fld)
  104. bail("Expected %s", fld);
  105. }
  106.  
  107. BufferCompressed<SameFormat, ::std::int32_t, 1101000> cmwBuf;
  108.  
  109. void login(cmwCredentials const &cred, bool signUp = false) {
  110. static SockaddrWrapper const sa("127.0.0.1", 56789);
  111. signUp ? ::back::marshal<::messageID::signup>(cmwBuf, cred)
  112. : ::back::marshal<::messageID::login>(cmwBuf, cred, cmwBuf.getSize());
  113. cmwBuf.sock_ = ::socket(AF_INET, SOCK_STREAM, IPPROTO_SCTP);
  114. while (0 != ::connect(cmwBuf.sock_, (sockaddr *)&sa, sizeof(sa))) {
  115. ::std::printf("connect %d\n", errno);
  116. ::sleep(30);
  117. }
  118.  
  119. while (!cmwBuf.flush())
  120. ;
  121. ::sctp_paddrparams pad{};
  122. pad.spp_address.ss_family = AF_INET;
  123. pad.spp_hbinterval = 240000;
  124. pad.spp_flags = SPP_HB_ENABLE;
  125. if (::setsockopt(cmwBuf.sock_, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS, &pad,
  126. sizeof pad) == -1)
  127. bail("setsockopt %d", errno);
  128. while (!cmwBuf.gotPacket())
  129. ;
  130. if (!giveBool(cmwBuf))
  131. bail("Login:%s", cmwBuf.giveStringView().data());
  132. }
  133.  
  134. ::uint64_t const reedTag = 1;
  135. class ioUring {
  136. ::io_uring rng;
  137. ::io_uring_cqe *cq = nullptr;
  138.  
  139. auto getSqe() {
  140. if (auto e = ::io_uring_get_sqe(&rng); e)
  141. return e;
  142. raise("getSqe");
  143. }
  144.  
  145. public:
  146. void multishot(int sock) {
  147. ::io_uring_prep_poll_multishot(getSqe(), sock, POLLIN);
  148. }
  149.  
  150. ioUring(int sock) {
  151. ::io_uring_params ps{};
  152. ps.flags = IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_DEFER_TASKRUN;
  153. if (int const rc = ::io_uring_queue_init_params(16, &rng, &ps); rc < 0)
  154. raise("ioUring", rc);
  155. multishot(sock);
  156. reed();
  157. }
  158.  
  159. auto submit() {
  160. if (cq)
  161. ::io_uring_cq_advance(&rng, 1);
  162. a:
  163. if (int rc =
  164. ::io_uring_submit_and_wait_timeout(&rng, &cq, 1, nullptr, nullptr);
  165. rc < 0) {
  166. if (-EINTR == rc)
  167. goto a;
  168. raise("waitCqe", rc);
  169. }
  170. return *cq;
  171. }
  172.  
  173. void reed() {
  174. auto e = getSqe();
  175. auto sp = cmwBuf.getDuo();
  176. ::io_uring_prep_recv(e, cmwBuf.sock_, sp.data(), sp.size(), 0);
  177. ::io_uring_sqe_set_data64(e, reedTag);
  178. e->ioprio = IORING_RECVSEND_POLL_FIRST;
  179. }
  180.  
  181. void writ() {
  182. auto e = getSqe();
  183. ::io_uring_prep_send(e, cmwBuf.sock_, cmwBuf.compBuf, cmwBuf.compIndex, 0);
  184. ::io_uring_sqe_set_data64(e, ~reedTag);
  185. }
  186. };
  187.  
  188. BufferStack<SameFormat> frntBuf;
  189.  
  190. void toFront(Socky const &s, auto... t) {
  191. frntBuf.reset();
  192. ::front::marshal<udpPacketMax>(frntBuf, {t...});
  193. frntBuf.send((::sockaddr *)&s.addr, s.len);
  194. }
  195.  
  196. int main(int ac, char **av) try {
  197. ::openlog(av[0], LOG_PID | LOG_NDELAY, LOG_USER);
  198. if (ac < 2 || ac > 3)
  199. bail("Usage: cmwA config-file [-signup]");
  200. FileBuffer cfg{av[1], O_RDONLY};
  201. checkField("AmbassadorID", cfg.getline(' '));
  202. cmwCredentials cred(cfg.getline());
  203. checkField("Password", cfg.getline(' '));
  204. cred.password = cfg.getline();
  205. ::signal(SIGPIPE, SIG_IGN);
  206. login(cred, ac == 3);
  207. if (ac == 3) {
  208. ::std::printf("Signup was successful\n");
  209. ::std::exit(0);
  210. }
  211.  
  212. checkField("UDP-port-number", cfg.getline(' '));
  213. ioUring ring{frntBuf.sock_ = udpServer(cfg.getline().data())};
  214. ::std::deque<cmwRequest> pendingRequests;
  215.  
  216. for (;;) {
  217. auto const cq = ring.submit();
  218. if (cq.res <= 0) {
  219. if (-EPIPE == cq.res || 0 == cq.res) {
  220. ::syslog(LOG_ERR, "Back tier vanished");
  221. frntBuf.reset();
  222. ::front::marshal<udpPacketMax>(frntBuf, {"Back tier vanished"});
  223. for (auto &r : pendingRequests) {
  224. frntBuf.send((::sockaddr *)&r.frnt.addr, r.frnt.len);
  225. }
  226. pendingRequests.clear();
  227. cmwBuf.compressedReset();
  228. login(cred);
  229. ring.reed();
  230. continue;
  231. }
  232. bail("op failed: %d", cq.res);
  233. }
  234.  
  235. if (0 == cq.user_data) {
  236. if (!(cq.flags & IORING_CQE_F_MORE)) {
  237. ::syslog(LOG_ERR, "Multishot");
  238. ring.multishot(frntBuf.sock_);
  239. }
  240. Socky frnt;
  241. bool gotAddr = false;
  242. cmwRequest *req = nullptr;
  243. try {
  244. gotAddr = frntBuf.getPacket((::sockaddr *)&frnt.addr, &frnt.len);
  245. req = &pendingRequests.emplace_back(frnt, frntBuf);
  246. ::back::marshal<::messageID::generate, 700000>(cmwBuf, *req);
  247. cmwBuf.compress();
  248. ring.writ();
  249. } catch (::std::exception &e) {
  250. ::syslog(LOG_ERR, "Accept request:%s", e.what());
  251. if (gotAddr)
  252. toFront(frnt, e.what());
  253. if (req)
  254. pendingRequests.pop_back();
  255. }
  256. continue;
  257. }
  258.  
  259. if (cq.user_data & reedTag) {
  260. try {
  261. if (cmwBuf.gotIt(cq.res)) {
  262. do {
  263. assert(!pendingRequests.empty());
  264. auto &req = pendingRequests.front();
  265. if (giveBool(cmwBuf)) {
  266. cmwBuf.giveFile(req.getFileName());
  267. toFront(req.frnt);
  268. } else
  269. toFront(req.frnt, "CMW:", cmwBuf.giveStringView());
  270. pendingRequests.pop_front();
  271. } while (cmwBuf.nextMessage());
  272. }
  273. } catch (::std::exception &e) {
  274. ::syslog(LOG_ERR, "Reply from CMW %s", e.what());
  275. assert(!pendingRequests.empty());
  276. toFront(pendingRequests.front().frnt, e.what());
  277. pendingRequests.pop_front();
  278. }
  279. ring.reed();
  280. } else if (!cmwBuf.all(cq.res))
  281. ring.writ();
  282. }
  283. } catch (::std::exception &e) {
  284. bail("Oops:%s", e.what());
  285. }
  286.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement