Guest User

Untitled

a guest
Mar 17th, 2018
406
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 9.72 KB | None | 0 0
  1. import moment = require('moment-timezone');
  2. import mysql = require('mysql');
  3.  
  4. import { ICapture, IpcNode, IReplayIpcNodeDelegate, Logging } from '@lbt-mycrt/common';
  5. import { mycrtDbConfig, ReplayDao, ReplayIpcNode } from '@lbt-mycrt/common';
  6. import { CPUMetric, MemoryMetric, MetricsBackend, ReadMetric, WriteMetric } from '@lbt-mycrt/common';
  7. import { Subprocess } from '@lbt-mycrt/common/dist/capture-replay/subprocess';
  8. import { ChildProgramStatus, ChildProgramType, IChildProgram, IDbReference } from '@lbt-mycrt/common/dist/data';
  9. import { ICommand, IWorkload } from '@lbt-mycrt/common/dist/data';
  10. import { MetricsStorage } from '@lbt-mycrt/common/dist/metrics/metrics-storage';
  11. import { StorageBackend } from '@lbt-mycrt/common/dist/storage/backend';
  12. import { path as schema } from '@lbt-mycrt/common/dist/storage/backend-schema';
  13.  
  14. import { Moment } from 'moment';
  15. import { ReplayConfig } from './args';
  16. import { captureDao, replayDao } from './dao';
  17. import { settings } from './settings';
  18.  
  19. const logger = Logging.defaultLogger(__dirname);
  20.  
  21. export class Replay extends Subprocess implements IReplayIpcNodeDelegate {
  22.  
  23. private ipcNode: IpcNode;
  24. private capture?: ICapture | null;
  25. private expectedEndTime?: Date;
  26. private firstLoop: boolean = true;
  27. private targetDb: IDbReference;
  28. private workload?: IWorkload;
  29. private workloadStart?: Moment;
  30. private workloadEnd?: Moment;
  31. private replayStartTime?: Moment;
  32. private replayEndTime?: Moment;
  33. private workloadPath?: string;
  34. private workloadIndex: number = 0;
  35. private error: boolean = false;
  36.  
  37. constructor(public config: ReplayConfig, storage: StorageBackend, metrics: MetricsBackend, db: IDbReference) {
  38. super(storage, metrics);
  39. this.ipcNode = new ReplayIpcNode(this.id, logger, this);
  40. this.targetDb = db;
  41. }
  42.  
  43. get id(): number {
  44. return this.config.id;
  45. }
  46.  
  47. get nameId(): string {
  48. return `replay ${this.id}`;
  49. }
  50.  
  51. get interval(): number {
  52.  
  53. if (this.replayEndTime === undefined) {
  54. return this.config.interval;
  55. }
  56.  
  57. // uncomment when capture workloads have correct 'end' datetime
  58. // const timeToEnd = this.replayEndTime.diff(moment());
  59. // if (timeToEnd < this.config.interval ) {
  60. // return timeToEnd;
  61. // } else {
  62. // return this.config.interval;
  63. // }
  64. return this.config.interval;
  65. }
  66.  
  67. public asIChildProgram(): IChildProgram {
  68. return {
  69. id: this.id,
  70. type: ChildProgramType.REPLAY,
  71. status: this.status,
  72. start: this.startTime || undefined,
  73. };
  74. }
  75.  
  76. protected async setup(): Promise<void> {
  77. try {
  78.  
  79. logger.info(`Setting Replay ${this.id} status to 'starting'`);
  80. await replayDao.updateReplayStatus(this.id, ChildProgramStatus.STARTING);
  81.  
  82. logger.info(`Replay ${this.id}: setup`);
  83. this.ipcNode.start();
  84.  
  85. this.capture = await captureDao.getCapture(this.config.captureId);
  86.  
  87. await this.getWorkload();
  88.  
  89. } catch (error) {
  90. this.selfDestruct(error);
  91. }
  92. }
  93.  
  94. protected async loop(): Promise<void> {
  95.  
  96. logger.info('-----------==[ LOOP ]==-----------');
  97. if (this.firstLoop === true) {
  98. this.firstLoopInit();
  99. }
  100.  
  101. let finished = true;
  102.  
  103. while (this.workloadIndex < this.workload!.commands.length && this.queryInInterval(this.workloadIndex)) {
  104.  
  105. const delay = this.getDelayForIndex(this.workloadIndex);
  106. const currentIndex = this.workloadIndex;
  107. const currentQuery = this.workload!.commands[currentIndex];
  108.  
  109. setTimeout(() => {
  110. this.processQuery(currentQuery); }, delay);
  111. logger.info(`Scheduled query: ${this.workloadIndex}`);
  112.  
  113. // don't let the subprocess end because we still need to run these queries.
  114. finished = false;
  115. this.workloadIndex += 1;
  116. }
  117.  
  118. if (this.workloadIndex < this.workload!.commands.length || this.replayEndTime!.diff(moment()) > 0) {
  119. // don't let the subprocess end because we still have queries to que.
  120. finished = false;
  121. }
  122.  
  123. this.logMetrics();
  124.  
  125. if (finished) {
  126. this.stop(false); // just once for now
  127. }
  128. }
  129.  
  130. protected async teardown(): Promise<void> {
  131. logger.info(`Replay ${this.id}: teardown`);
  132.  
  133. if (this.error === false) {
  134. await replayDao.updateReplayStatus(this.id, ChildProgramStatus.DONE);
  135. }
  136.  
  137. await replayDao.updateReplayEndTime(this.id);
  138. this.ipcNode.stop();
  139. }
  140.  
  141. protected async dontPanic(): Promise<void> {
  142. await replayDao.updateReplayStatus(this.id, ChildProgramStatus.FAILED);
  143. }
  144.  
  145. private async firstLoopInit() {
  146. try {
  147. this.firstLoop = false;
  148. this.startTime = new Date();
  149. this.replayStartTime = moment();
  150. await replayDao.updateReplayStartTime(this.id);
  151.  
  152. this.replayEndTime = this.replayStartTime.clone().add(this.workloadEnd!.diff(this.workloadStart));
  153. logger.info(`Replay ${this.id} startTime: ${this.replayStartTime.toJSON()}`);
  154. logger.info(`Replay ${this.id} endTime: ${this.replayEndTime.toJSON()}`);
  155.  
  156. await replayDao.updateReplayStatus(this.id, ChildProgramStatus.RUNNING);
  157.  
  158. } catch (error) {
  159. this.selfDestruct(error);
  160. }
  161. }
  162.  
  163. private getCaptureWorkloadPath(id: number): string {
  164. return `capture${id}/workload.json`;
  165. }
  166.  
  167. private async getWorkload() {
  168.  
  169. logger.info(`Getting workload from storage`);
  170. this.workloadPath = this.getCaptureWorkloadPath(this.config.captureId);
  171. this.workload = await this.storage.readJson(this.workloadPath) as IWorkload;
  172.  
  173. if (this.workload) {
  174. logger.info(`Workload Retrieved`);
  175. this.workloadStart = moment(new Date(this.workload.start));
  176. this.workloadEnd = moment(new Date(this.workload.end));
  177.  
  178. logger.info(`workloadStart: ${this.workloadStart.format()}`);
  179. logger.info(`workloadEnd: ${this.workloadEnd.format()}`);
  180. }
  181. }
  182.  
  183. // queryInInterval takes the index of the query in the workload and the time
  184. // that next loop will begin in milliseconds and returns true
  185. // if the query should be scheduled for the current loop otherwise false.
  186. private queryInInterval(index: number): boolean {
  187.  
  188. const delay = this.getDelayForIndex(index);
  189. logger.info(`Delay for index: ${index} is ${delay}`);
  190. return (delay >= 0 && delay < this.interval) ? true : false ;
  191. }
  192.  
  193. // getDelayForIndex takes in an index and the current interval's start time and returns the
  194. // number of milliseconds to delay from the interval's start time.
  195. private getDelayForIndex(index: number): number {
  196. if (this.workload == null) {
  197. return 0;
  198. } else if (index >= 0 && index < this.workload!.commands.length) {
  199.  
  200. const queryStart: Moment = moment(this.workload!.commands[index].event_time).subtract(8, 'hours');
  201. const delay = (queryStart.diff(this.workloadStart)) - (moment().diff(this.replayStartTime));
  202.  
  203. return delay;
  204. } else {
  205. return 0;
  206. }
  207. }
  208.  
  209. private validMockQuery(query: ICommand): boolean {
  210.  
  211. let valid = true;
  212. settings.invalidQueries.forEach((value) => {
  213. if (query.argument.toUpperCase().indexOf(value) !== -1) {
  214. valid = false;
  215. }
  216. });
  217. return valid;
  218. }
  219.  
  220. private validQuery(query: ICommand) {
  221. return (query.command_type === "Query");
  222. }
  223.  
  224. private processQuery(query: ICommand) {
  225.  
  226. const db = this.config.mock ? mycrtDbConfig : { database: this.targetDb.name,
  227. host: this.targetDb.host,
  228. password: this.targetDb.pass,
  229. user: this.targetDb.user };
  230.  
  231. if (this.config.mock && this.validMockQuery(query) === false) {
  232. return null;
  233. }
  234.  
  235. if (this.validQuery(query)) {
  236.  
  237. const conn = mysql.createConnection(db);
  238.  
  239. return new Promise<any>((resolve, reject) => {
  240. conn.connect((connErr) => {
  241. if (connErr) {
  242. reject(connErr);
  243. } else {
  244. const updateStr = mysql.format(query.argument, []);
  245. conn.query(updateStr, (updateErr, rows) => {
  246. conn.end();
  247. if (updateErr) {
  248. reject(updateErr);
  249. } else {
  250. resolve(rows);
  251. }
  252. });
  253. }
  254. });
  255. });
  256.  
  257. } else { return null; }
  258. }
  259.  
  260. private logMetrics() {
  261.  
  262. const end = moment();
  263. let start = end.clone().subtract(this.interval + this.config.intervalOverlap);
  264.  
  265. if (start.diff(this.workloadStart) < 0) {
  266. start = this.workloadStart!;
  267. }
  268.  
  269. this.sendMetricsToS3(start.toDate(), end.toDate());
  270.  
  271. }
  272.  
  273. private async sendMetricsToS3(start: Date, end: Date, firstTry: boolean = true) {
  274. try {
  275.  
  276. const data = [
  277. await this.metrics.getMetricsForType(CPUMetric, start, end),
  278. await this.metrics.getMetricsForType(ReadMetric, start, end),
  279. await this.metrics.getMetricsForType(WriteMetric, start, end),
  280. await this.metrics.getMetricsForType(MemoryMetric, start, end),
  281. ];
  282.  
  283. const key = schema.metrics.getSingleSampleKey(this.asIChildProgram(), end);
  284. logger.info(`Saving metrics to ${key}`);
  285. await this.storage.writeJson(key, data);
  286.  
  287. } catch (error) {
  288. if (firstTry) {
  289. logger.warn(`Failed to get metrics: ${error}`);
  290. logger.warn("Trying again...");
  291. this.sendMetricsToS3(start, end, false);
  292.  
  293. } else {
  294. logger.error(`Failed to get metrics the second time: ${error}`);
  295. // TODO: mark capture as broken?
  296.  
  297. }
  298. }
  299. }
  300.  
  301. }
Add Comment
Please, Sign In to add comment