Guest User

Untitled

a guest
Mar 24th, 2018
358
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 11.14 KB | None | 0 0
  1. import moment = require('moment-timezone');
  2. import mysql = require('mysql');
  3.  
  4. import { CPUMetric, ICapture, IpcNode, IReplayIpcNodeDelegate, Logging, MemoryMetric,
  5. MetricsBackend, mycrtDbConfig, ReadMetric, ReplayDao, ReplayIpcNode, utils, WriteMetric } from '@lbt-mycrt/common';
  6. import { Subprocess } from '@lbt-mycrt/common/dist/capture-replay/subprocess';
  7. import { ByteToMegabyte, ChildProgramStatus, ChildProgramType, IChildProgram } from '@lbt-mycrt/common/dist/data';
  8. import { ICommand, IDbReference, IWorkload } from '@lbt-mycrt/common/dist/data';
  9. import { MetricsStorage } from '@lbt-mycrt/common/dist/metrics/metrics-storage';
  10. import { StorageBackend } from '@lbt-mycrt/common/dist/storage/backend';
  11. import { path as schema } from '@lbt-mycrt/common/dist/storage/backend-schema';
  12.  
  13. import { Moment } from 'moment';
  14. import { ReplayConfig } from './args';
  15. import { captureDao, replayDao } from './dao';
  16. import { settings } from './settings';
  17.  
  18. const logger = Logging.defaultLogger(__dirname);
  19.  
  20. export class Replay extends Subprocess implements IReplayIpcNodeDelegate {
  21.  
  22. private ipcNode: IpcNode;
  23. private capture: ICapture | null = null;
  24. private expectedEndTime?: Date;
  25. private firstLoop: boolean = true;
  26. private dbRef: IDbReference;
  27. private targetDb?: any;
  28. private workload?: IWorkload;
  29. private workloadStart?: Moment;
  30. private workloadEnd?: Moment;
  31. private replayStartTime?: Moment;
  32. private replayEndTime?: Moment;
  33. private workloadPath?: string;
  34. private workloadIndex: number = 0;
  35.  
  36. constructor(public config: ReplayConfig, storage: StorageBackend, metrics: MetricsBackend, db: IDbReference) {
  37. super(storage, metrics);
  38. this.ipcNode = new ReplayIpcNode(this.id, logger, this);
  39. this.dbRef = db;
  40. }
  41.  
  42. get id(): number {
  43. return this.config.id;
  44. }
  45.  
  46. get nameId(): string {
  47. return `replay ${this.id}`;
  48. }
  49.  
  50. get interval(): number {
  51. return this.config.interval;
  52. }
  53.  
  54. public asIChildProgram(): IChildProgram {
  55. return {
  56. id: this.id,
  57. type: ChildProgramType.REPLAY,
  58. status: this.status,
  59. start: this.startTime || undefined,
  60. };
  61. }
  62.  
  63. protected async setup(): Promise<void> {
  64. try {
  65.  
  66. logger.info(`Setting Replay ${this.id} status to 'starting'`);
  67. await replayDao.updateReplayStatus(this.id, ChildProgramStatus.STARTING);
  68.  
  69. logger.info(`Replay ${this.id}: setup`);
  70. this.ipcNode.start();
  71.  
  72. this.capture = await captureDao.getCapture(this.config.captureId);
  73.  
  74. this.targetDb = this.config.mock ? mycrtDbConfig : { database: this.dbRef.name,
  75. host: this.dbRef.host,
  76. password: this.dbRef.pass,
  77. user: this.dbRef.user };
  78.  
  79. this.workload = await this.getWorkload();
  80.  
  81. this.loop();
  82.  
  83. } catch (error) {
  84. this.selfDestruct(error);
  85. }
  86. }
  87.  
  88. protected async loop(): Promise<void> {
  89.  
  90. logger.info('-----------==[ LOOP ]==-----------');
  91. if (this.firstLoop === true) {
  92. await this.firstLoopInit();
  93. }
  94.  
  95. let finished = true;
  96.  
  97. logger.info(`-< Scheduling Commands >---------`);
  98. while (this.indexInInterval(this.workloadIndex)) {
  99.  
  100. const currentIndex = this.workloadIndex;
  101. const delay = this.getDelayForIndex(currentIndex);
  102. const currentQuery = this.workload!.commands[currentIndex];
  103.  
  104. logger.info(` * Scheduling command ${currentIndex + 1} of ${this.workload!.commands.length}`);
  105. logger.info(` delay = ${delay}`);
  106. setTimeout(async () => {
  107. try {
  108. await this.processQuery(currentQuery);
  109. } catch (error) {
  110. logger.info(`Error while processing query with index ${currentIndex}: ${error}`);
  111. }
  112. }, delay);
  113. logger.info(` scheduled!`);
  114.  
  115. // don't let the subprocess end because we still need to run these queries.
  116. finished = false;
  117. this.workloadIndex += 1;
  118. }
  119.  
  120. if (this.shouldWeContinue()) {
  121. finished = false;
  122. }
  123.  
  124. if (finished) {
  125. // in this case, the last logMetrics call happens after the replay finished
  126. // so, we want to set its status to stopping
  127. logger.info(`Setting status to 'stopping'`);
  128. await replayDao.updateReplayStatus(this.id, ChildProgramStatus.STOPPING);
  129. }
  130.  
  131. logger.info(`-< Logging Metrics >-------------`);
  132. await this.logMetrics();
  133.  
  134. if (finished) {
  135. this.onStop();
  136. }
  137. }
  138.  
  139. protected async teardown(): Promise<void> {
  140. logger.info(`Replay ${this.id}: teardown`);
  141.  
  142. logger.info(`Waiting for files to be prepared`);
  143. setTimeout(async () => {
  144.  
  145. logger.info(`Building final metrics file`);
  146. const metricsStorage = new MetricsStorage(this.storage);
  147. await metricsStorage.read(this.asIChildProgram(), false);
  148.  
  149. logger.info(`Setting status to done`);
  150. await replayDao.updateReplayStatus(this.id, ChildProgramStatus.DONE);
  151.  
  152. logger.info(`Setting replay end time`);
  153. await replayDao.updateReplayEndTime(this.id);
  154.  
  155. logger.info(`Stopping IPC node`);
  156. await this.ipcNode.stop();
  157.  
  158. }, this.config.filePrepDelay);
  159. }
  160.  
  161. protected async onStop(): Promise<any> {
  162. logger.info(`-< Stopping >------------------`);
  163.  
  164. this.stop(false); // just once for now
  165. }
  166.  
  167. protected async dontPanic(): Promise<void> {
  168. await replayDao.updateReplayStatus(this.id, ChildProgramStatus.FAILED);
  169. }
  170.  
  171. private async firstLoopInit() {
  172. try {
  173. this.firstLoop = false;
  174. this.startTime = new Date();
  175. this.replayStartTime = moment(this.startTime);
  176. await replayDao.updateReplayStartTime(this.id);
  177.  
  178. this.replayEndTime = this.replayStartTime.clone().add(this.workloadEnd!.diff(this.workloadStart));
  179. logger.info(`Replay ${this.id} startTime: ${this.replayStartTime.toJSON()}`);
  180. logger.info(`Replay ${this.id} endTime: ${this.replayEndTime.toJSON()}`);
  181.  
  182. await replayDao.updateReplayStatus(this.id, ChildProgramStatus.RUNNING);
  183.  
  184. } catch (error) {
  185. this.selfDestruct(error);
  186. }
  187. }
  188.  
  189. private async getWorkload(): Promise<IWorkload> {
  190.  
  191. logger.info(`Getting workload from storage`);
  192. this.workloadPath = schema.workload.getDoneKey({
  193. id: this.capture!.id,
  194. type: ChildProgramType.CAPTURE,
  195. });
  196.  
  197. const temp: IWorkload = await this.storage.readJson<IWorkload>(this.workloadPath);
  198.  
  199. if (temp) {
  200. logger.info(`Workload Retrieved`);
  201. this.workloadStart = moment(new Date(temp.start));
  202. this.workloadEnd = moment(new Date(temp.end));
  203.  
  204. logger.info(`workloadStart: ${this.workloadStart.format()}`);
  205. logger.info(`workloadEnd: ${this.workloadEnd.format()}`);
  206. }
  207. return temp;
  208. }
  209.  
  210. private indexInInterval(currentIndex: number): boolean {
  211.  
  212. return currentIndex < this.workload!.commands.length && this.queryInInterval(currentIndex);
  213. }
  214.  
  215. private shouldWeContinue(): boolean {
  216. return this.workloadIndex < this.workload!.commands.length || this.replayEndTime!.diff(moment()) > 0;
  217. }
  218.  
  219. private queryInInterval(index: number): boolean {
  220.  
  221. const delay = this.getDelayForIndex(index);
  222. return (delay >= 0 && delay < this.interval);
  223. }
  224.  
  225. private getDelayForIndex(index: number): number {
  226.  
  227. if (index >= 0 && index < this.workload!.commands.length) {
  228.  
  229. let queryStart: Moment;
  230.  
  231. if (this.config.mock) {
  232. queryStart = moment(this.workload!.commands[index].event_time);
  233. } else {
  234. queryStart = moment(this.workload!.commands[index].event_time);
  235. }
  236.  
  237. const delay = (queryStart.diff(this.workloadStart)) - (moment().diff(this.replayStartTime));
  238.  
  239. return delay;
  240. } else {
  241. return 0;
  242. }
  243. }
  244.  
  245. private validMockQuery(query: ICommand): boolean {
  246.  
  247. let valid = true;
  248. settings.invalidQueries.forEach((value) => {
  249. if (query.argument.toUpperCase().indexOf(value) !== -1) {
  250. valid = false;
  251. }
  252. });
  253. return valid;
  254. }
  255.  
  256. private processQuery(query: ICommand) {
  257.  
  258. if (this.config.mock && !this.validMockQuery(query)) {
  259. return null;
  260. }
  261.  
  262. const conn = mysql.createConnection(this.targetDb);
  263.  
  264. logger.info(`--< Running Query >------------------------------------`);
  265. logger.info(` "${query.argument.replace('\n', ' ')}"`);
  266. return new Promise<any>((resolve, reject) => {
  267. conn.connect((connErr) => {
  268. if (connErr) {
  269. reject(connErr);
  270. } else {
  271. const updateStr = mysql.format(query.argument, []);
  272. conn.query(updateStr, (updateErr, rows) => {
  273. conn.end();
  274. if (updateErr) {
  275. reject(updateErr);
  276. } else {
  277. resolve(rows);
  278. }
  279. });
  280. }
  281. });
  282. });
  283. }
  284.  
  285. private async logMetrics() {
  286.  
  287. const end = moment();
  288. let start = end.clone().subtract(this.interval + this.config.intervalOverlap);
  289.  
  290. if (start.diff(this.replayStartTime!) < 0) {
  291. start = this.replayStartTime!;
  292. }
  293.  
  294. if (end.toDate().getTime() - start.toDate().getTime() > this.config.intervalOverlap) {
  295.  
  296. logger.info(` waiting ${this.config.metricsDelay}ms before gathering metrics`);
  297. await utils.syncTimeout(async () => {
  298. logger.info(` retrieving metrics from ${start.toDate()} to ${end.toDate()}`);
  299. this.sendMetricsToS3(start.toDate(), end.toDate());
  300. }, this.config.metricsDelay);
  301.  
  302. } else {
  303. logger.info(` skipping metrics, not enough time has passed`);
  304. }
  305.  
  306. }
  307.  
  308. private async sendMetricsToS3(start: Date, end: Date) {
  309.  
  310. this.tryTwice(async () => {
  311.  
  312. logger.info(` * memory...`);
  313. const memoryMetrics = await this.metrics.getMetricsForType(MemoryMetric, start, end);
  314. const datapoints = memoryMetrics.dataPoints;
  315. datapoints.forEach((metric) => {
  316. metric.Unit = "Megabytes";
  317. metric.Maximum *= ByteToMegabyte;
  318. });
  319. logger.info(` * ${datapoints.length} datapoints`);
  320.  
  321. logger.info(` * cpu...`);
  322. const cpu = await this.metrics.getMetricsForType(CPUMetric, start, end);
  323. logger.info(` * ${cpu.dataPoints.length} datapoints`);
  324.  
  325. logger.info(` * read...`);
  326. const read = await this.metrics.getMetricsForType(ReadMetric, start, end);
  327. logger.info(` * ${read.dataPoints.length} datapoints`);
  328.  
  329. logger.info(` * write...`);
  330. const write = await this.metrics.getMetricsForType(WriteMetric, start, end);
  331. logger.info(` * ${write.dataPoints.length} datapoints`);
  332.  
  333. const data = [cpu, read, write, memoryMetrics];
  334.  
  335. const key = schema.metrics.getSingleSampleKey(this.asIChildProgram(), end);
  336. logger.info(` * saving metrics to ${key}`);
  337. await this.storage.writeJson(key, data);
  338. logger.info(` * done!`);
  339.  
  340. }, "Send Metrics to S3");
  341.  
  342. }
  343.  
  344. }
Add Comment
Please, Sign In to add comment