Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- "use strict";
- const AWS = require("aws-sdk")
- const Uuid = require("uuid/v4");
- const MAX_BATCH_COUNT = 5;
- const MAX_BATCH_SIZE = 25;
- // Stream our S3 object (in this case, a CSV file) and write it to DynamoDB
- ((region, bucket, key, tableName) => {
- AWS.config.setPromisesDependency(require("bluebird"));
- AWS.config.update({ region });
- const s3 = new AWS.S3();
- const ddb = new AWS.DynamoDB();
- let count = 0;
- let batches = [];
- const s3Params = {
- Bucket: bucket,
- Key: key
- }
- const csvStream = require("fast-csv").fromStream(
- s3.getObject(s3Params).createReadStream(),
- { headers: true }
- );
- csvStream.on("data", async (row) => {
- // If we have no batches or our current batch is full, add one
- if (shouldWrite(batches, MAX_BATCH_COUNT, MAX_BATCH_SIZE)) {
- csvStream.pause();
- await writeToDynamo(ddb);
- csvStream.resume();
- batches = [];
- }
- addItem(row, tableName);
- count++;
- }).on("end", _ => {
- console.log(`Done! Processed ${count} records`);
- });
- /**
- * Writes our batches to DynamoDB
- */
- const writeToDynamo = async (dynamo) =>
- Promise.all(batches.map(batch => {
- return dynamo.batchWriteItem(batch).promise();
- })).then(_ => {
- // Drain our set of batches
- batches = [];
- console.log("Total records uploaded so far: " + count);
- }).catch(err => {
- console.error(err);
- });
- /**
- * Determines if it"s time to write to DynamoDB
- *
- * @param {*} batches Our set of batches
- * @param {*} MAX_BATCH_COUNT The max number of batches in a set
- * @param {*} MAX_BATCH_SIZE The capacity of a batch
- */
- const shouldWrite = (batches, MAX_BATCH_COUNT, MAX_BATCH_SIZE) =>
- batches.length == MAX_BATCH_COUNT && batchFull(batches[batches.length - 1], MAX_BATCH_SIZE);
- /**
- * Determines if our batch is full
- * @param {*} batch Given batch
- * @param {*} MAX_BATCH_SIZE The capacity of a batch
- */
- const batchFull = (batch, MAX_BATCH_SIZE) => batch.RequestItems[tableName].length == MAX_BATCH_SIZE;
- /**
- * Adds an item to our tail batch
- * @param {*} data The object to add to our batch
- */
- const addItem = (csvRowData, tableName) => {
- if (batches.length == 0 || (batchFull(batches[batches.length - 1], MAX_BATCH_SIZE) && batches.length < MAX_BATCH_COUNT)) {
- batches.push(
- {
- RequestItems: {
- [tableName]: []
- }
- }
- );
- }
- /** Modify to fit your CSV/Dynamo schema accordingly */
- let item = {
- PutRequest: {
- Item: {
- "id": {
- "S": Uuid()
- },
- "foo": {
- "S": "foo",
- },
- "bar": {
- "S": "bar",
- },
- }
- }
- }
- batches[batches.length - 1].RequestItems[tableName].push(item);
- }
- })(
- "<region>",
- "<bucket>",
- "<csv_s3_path>",
- "<table_name>"
- );
Add Comment
Please, Sign In to add comment