Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- flowFile = session.get();
- function getRandomInt(min, max) {
- min = Math.ceil(min);
- max = Math.floor(max);
- return Math.floor(Math.random() * (max - min)) + min; //The maximum is exclusive and the minimum is inclusive
- }
- if (flowFile != null) {
- var StreamCallback = Java.type("org.apache.nifi.processor.io.StreamCallback");
- var IOUtils = Java.type("org.apache.commons.io.IOUtils");
- var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
- var transformed_message = {};
- var error = false;
- var line = influxdbmeasurement.evaluateAttributeExpressions().getValue();
- // Get attributes
- flowFile = session.write(flowFile, new StreamCallback(function (inputStream, outputStream) {
- // Read input FlowFile content
- var content = IOUtils.toString(inputStream, StandardCharsets.UTF_8); // message or content
- try {
- line = line + ",providerId=" + providerid.evaluateAttributeExpressions().getValue() +
- ",type=" + type.evaluateAttributeExpressions().getValue() +
- " value=" + "1"
- + " " + (Date.now() * 1000000 + getRandomInt(0,1000));
- // Write output content
- if (transformed_message) {
- outputStream.write(line.getBytes(StandardCharsets.UTF_8));
- }
- } catch (e) {
- error = true;
- outputStream.write(content.getBytes(StandardCharsets.UTF_8));
- }
- }));
- if (error) {
- session.transfer(flowFile, REL_FAILURE)
- } else {
- session.transfer(flowFile, REL_SUCCESS)
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement