Guest User

Untitled

a guest
Jul 21st, 2018
86
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.85 KB | None | 0 0
  1. "use strict";
  2.  
  3. const AWS = require("aws-sdk")
  4. const Uuid = require("uuid/v4");
  5.  
  6. const MAX_BATCH_COUNT = 5;
  7. const MAX_BATCH_SIZE = 25;
  8.  
  9. // Stream our S3 object (in this case, a CSV file) and write it to DynamoDB
  10. ((region, bucket, key, tableName) => {
  11.  
  12. AWS.config.setPromisesDependency(require("bluebird"));
  13. AWS.config.update({ region });
  14.  
  15. const s3 = new AWS.S3();
  16. const ddb = new AWS.DynamoDB();
  17.  
  18. let count = 0;
  19. let batches = [];
  20.  
  21. const s3Params = {
  22. Bucket: bucket,
  23. Key: key
  24. }
  25.  
  26. const csvStream = require("fast-csv").fromStream(
  27. s3.getObject(s3Params).createReadStream(),
  28. { headers: true }
  29. );
  30.  
  31. csvStream.on("data", async (row) => {
  32. // If we have no batches or our current batch is full, add one
  33. if (shouldWrite(batches, MAX_BATCH_COUNT, MAX_BATCH_SIZE)) {
  34. csvStream.pause();
  35. await writeToDynamo(ddb);
  36. csvStream.resume();
  37. batches = [];
  38. }
  39.  
  40. addItem(row, tableName);
  41. count++;
  42. }).on("end", _ => {
  43. console.log(`Done! Processed ${count} records`);
  44. });
  45.  
  46. /**
  47. * Writes our batches to DynamoDB
  48. */
  49. const writeToDynamo = async (dynamo) =>
  50. Promise.all(batches.map(batch => {
  51. return dynamo.batchWriteItem(batch).promise();
  52. })).then(_ => {
  53. // Drain our set of batches
  54. batches = [];
  55. console.log("Total records uploaded so far: " + count);
  56. }).catch(err => {
  57. console.error(err);
  58. });
  59.  
  60. /**
  61. * Determines if it"s time to write to DynamoDB
  62. *
  63. * @param {*} batches Our set of batches
  64. * @param {*} MAX_BATCH_COUNT The max number of batches in a set
  65. * @param {*} MAX_BATCH_SIZE The capacity of a batch
  66. */
  67. const shouldWrite = (batches, MAX_BATCH_COUNT, MAX_BATCH_SIZE) =>
  68. batches.length == MAX_BATCH_COUNT && batchFull(batches[batches.length - 1], MAX_BATCH_SIZE);
  69.  
  70. /**
  71. * Determines if our batch is full
  72. * @param {*} batch Given batch
  73. * @param {*} MAX_BATCH_SIZE The capacity of a batch
  74. */
  75. const batchFull = (batch, MAX_BATCH_SIZE) => batch.RequestItems[tableName].length == MAX_BATCH_SIZE;
  76.  
  77. /**
  78. * Adds an item to our tail batch
  79. * @param {*} data The object to add to our batch
  80. */
  81. const addItem = (csvRowData, tableName) => {
  82. if (batches.length == 0 || (batchFull(batches[batches.length - 1], MAX_BATCH_SIZE) && batches.length < MAX_BATCH_COUNT)) {
  83. batches.push(
  84. {
  85. RequestItems: {
  86. [tableName]: []
  87. }
  88. }
  89. );
  90. }
  91.  
  92. /** Modify to fit your CSV/Dynamo schema accordingly */
  93. let item = {
  94. PutRequest: {
  95. Item: {
  96. "id": {
  97. "S": Uuid()
  98. },
  99. "foo": {
  100. "S": "foo",
  101. },
  102. "bar": {
  103. "S": "bar",
  104. },
  105. }
  106. }
  107. }
  108.  
  109. batches[batches.length - 1].RequestItems[tableName].push(item);
  110. }
  111. })(
  112. "<region>",
  113. "<bucket>",
  114. "<csv_s3_path>",
  115. "<table_name>"
  116. );
Add Comment
Please, Sign In to add comment