Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /**
- * Create and BulkLoad batches using a CSV file. The file into the appropriate
- * size batch files.
- */
- private static List<BatchInfo> createBatchesFromCSVFile(BulkConnection connection,
- JobInfo jobInfo, String csvFileName
- ) throws IOException,
- AsyncApiException {
- List<BatchInfo> batchInfos = new ArrayList<BatchInfo>();
- BufferedReader rdr = new BufferedReader(new InputStreamReader(
- new FileInputStream(csvFileName)));
- // read the CSV header row
- String hdr = rdr.readLine();
- byte[] headerBytes = (hdr + "\n").getBytes("US-ASCII");
- int headerBytesLength = headerBytes.length;
- File tmpFile = File.createTempFile("bulkAPIInsert", ".csv");
- // Split the CSV file into multiple batches
- try {
- FileOutputStream tmpOut = new FileOutputStream(tmpFile);
- int maxBytesPerBatch = 10000000; // 10 million bytes per batch
- int maxRowsPerBatch = 10000; // 10 thousand rows per batch 90001
- int currentBytes = 0;
- int currentLines = 0;
- String nextLine;
- while ((nextLine = rdr.readLine()) != null) {
- byte[] bytes = (nextLine + "\n").getBytes("US-ASCII"); //TODO
- if (currentBytes + bytes.length > maxBytesPerBatch
- || currentLines > maxRowsPerBatch) {
- createBatch(new FileOutputStream(tmpFile), tmpFile, batchInfos, connection, jobInfo);
- currentBytes = 0;
- currentLines = 0;
- }
- if (currentBytes == 0) { // This is read first time or everytime i think
- tmpOut = new FileOutputStream(tmpFile);
- tmpOut.write(headerBytes);
- currentBytes = headerBytesLength;
- currentLines = 1;
- }
- tmpOut.write(bytes);
- currentBytes += bytes.length;
- currentLines++;
- }
- if (currentLines > 1) {
- createBatch(new FileOutputStream(tmpFile), tmpFile, batchInfos, connection, jobInfo);
- }
- } finally {
- if(!tmpFile.delete())
- tmpFile.deleteOnExit();
- rdr.close();
- }
- //System.out.println("BatchInfos created is: "+batchInfos);
- return batchInfos;
- } // END private List<BatchInfo> createBatchesFromCSVFile()
- /**
- * Create a batch by BulkLoading the contents of the file. This closes the
- * output stream.
- */
- private static void createBatch(FileOutputStream tmpOut, File tmpFile,
- List<BatchInfo> batchInfos, BulkConnection connection, JobInfo jobInfo)
- throws IOException, AsyncApiException {
- tmpOut.flush();
- tmpOut.close();
- FileInputStream tmpInputStream = new FileInputStream(tmpFile);
- try {
- Map<String, InputStream> attachments = new HashMap<String, InputStream>();
- attachments.put("MyFile1.txt", new FileInputStream("C:\\Users\\user\\Desktop\\upload\\2014-04-18_231314.png"));
- BatchInfo batchInfo = connection.createBatchWithInputStreamAttachments(jobInfo, tmpInputStream, attachments);
- //connection.createBatchFromStream(jobInfo, tmpInputStream);
- System.out.println(batchInfo!=null ? "BatchInfo is not null :)" : " BatchInfo is Null :(");
- batchInfos.add(batchInfo);
- // System.out.println("batchInfo: "+batchInfo);
- } finally {
- tmpInputStream.close();
- }
- } // END private void createBatch()
- private static JobInfo createJob(String fileName, BulkConnection bc)
- throws AsyncApiException {
- JobInfo job = new JobInfo();
- // operation = OperationEnum.Insert
- job.setOperation(OperationEnum.insert);
- // object = Attachment
- job.setObject("Attachment");
- // contentType = ZIP_CSV
- job.setContentType(ContentType.ZIP_CSV);
- job = bc.createJob(job);
- return job;
- } // END public static void createJob(String fileName, BulkConnection bc)
- /**
- * Wait for a job to complete by polling the Bulk API.
- */
- private static void awaitCompletion(BulkConnection connection, JobInfo job,
- List<BatchInfo> batchInfoList) throws AsyncApiException {
- long sleepTime = 0L;
- Set<String> incomplete = new HashSet<String>();
- for (BatchInfo bi : batchInfoList) {
- incomplete.add(bi.getId());
- }
- while (!incomplete.isEmpty()) {
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {}
- sleepTime = 10000L;
- BatchInfo[] statusList = connection.getBatchInfoList(job.getId())
- .getBatchInfo();
- for (BatchInfo b : statusList) {
- if (b.getState() == BatchStateEnum.Completed) {
- if (incomplete.remove(b.getId())) {}
- }
- else if(b.getState() == BatchStateEnum.Failed){ //TODO TODO TODO its to be notified every time
- System.out.println("\n Reason of Failure: "+b.getStateMessage()+".\n " +
- "Number of Records Processed: "+b.getNumberRecordsProcessed());
- }
- } // END for (BatchInfo b : statusList)
- } // END while (!incomplete.isEmpty())
- } // END private static void awaitCompletion()
- private static void checkResults(BulkConnection connection, JobInfo job,
- List<BatchInfo> batchInfoList, String fileName) throws AsyncApiException, IOException {
- for (BatchInfo b : batchInfoList) {
- CSVReader rdr = new CSVReader(
- new InputStreamReader(connection.getBatchResultStream(
- job.getId(), b.getId())));
- List<String> resultHeader = new LinkedList<String>(
- Arrays.asList(rdr.readNext()));
- int resultCols = resultHeader.size();
- List<String> row;
- String[] rows;
- List<String> prevRow;
- while ((rows = rdr.readNext()) != null) {
- row = new LinkedList<String>(Arrays.asList(rows));
- Map<String, String> resultInfo = new HashMap<String, String>();
- for (int i = 0; i < resultCols; i++) {
- resultInfo.put(resultHeader.get(i), row.get(i));
- }
- boolean success = Boolean
- .valueOf(resultInfo.get("Success"));
- boolean created = Boolean
- .valueOf(resultInfo.get("Created"));
- String id = resultInfo.get("Id");
- String error = resultInfo.get("Error");
- if (success && created) {
- System.out.println("The record id: "+id+" is successfully created.");
- } else if (!success) {
- System.out.println("Error Occurred. Error: "+error);
- }
- if (success && !created) {
- System.out.println("The record is updated with id: "+id);
- }
- row.clear();
- resultInfo.clear();
- } // END while
- rdr.close();
- resultHeader.clear();
- } // END for
- } // END private static void checkResults()
- public static void uploadBinaryAttachmentUsingBulkAPI(BulkConnection bConnection)
- throws AsyncApiException, IOException {
- // Create a Job for Batches with Binary Attachments
- String fileName = "C:\\Users\\user\\Desktop\\upload\\request1.CSV";
- // Call the createJob()
- JobInfo job = createJob(fileName, bConnection);
- // Create a Batch with Binary Attachments
- if (job != null) {
- System.out.println("Job is created.");
- List<BatchInfo> batchInfoList = new ArrayList<BatchInfo>();
- Map<String, InputStream> attachments = new HashMap<String, InputStream>();
- attachments.put("MyFile1.txt", new FileInputStream("C:\\Users\\user\\Desktop\\upload\\2014-04-18_231314.png"));
- try {
- batchInfoList = createBatchesFromCSVFile(bConnection, job,
- fileName);
- System.out.println(batchInfoList.size()>0 ? "batchInfoList have some elements :) " : "No elements there in batchInfoList :<");
- } catch (AsyncApiException ex){}
- if (batchInfoList.size() > 0) {
- System.out.println("closing the Job.");
- closeJob(bConnection, job.getId());
- System.out.println("waiting for completion of Job ...");
- awaitCompletion(bConnection, job, batchInfoList);
- System.out.println("Checking for the results ...");
- checkResults(bConnection, job, batchInfoList, fileName);
- }
- } // END if (job != null)
- // You send data in batches in separate HTTP POST requests.
- // Make the upload using BulkAPI
- } // END public static void createRequestTextFile(BulkConnection bConnection)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement