Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.abc.argos.task;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.Set;
- import java.util.Iterator;
- import java.io.StringWriter;
- import org.json.simple.JSONValue;
- import org.codehaus.jackson.JsonGenerator;
- import kafka.producer.KeyedMessage;
- import org.codehaus.jackson.JsonNode;
- import org.codehaus.jackson.JsonParseException;
- import org.apache.samza.system.IncomingMessageEnvelope;
- import org.apache.samza.system.OutgoingMessageEnvelope;
- import org.apache.samza.system.SystemStream;
- import org.apache.samza.task.MessageCollector;
- import org.apache.samza.task.StreamTask;
- import org.apache.samza.task.TaskCoordinator;
- import org.apache.samza.task.TaskContext;
- import org.codehaus.jackson.map.JsonMappingException;
- import org.codehaus.jackson.map.ObjectMapper;
- import com.abc.argos.system.*;
- import org.apache.samza.config.Config;
- import org.apache.samza.storage.kv.KeyValueStore;
- import org.apache.samza.storage.kv.KeyValueIterator;
- import org.apache.samza.storage.kv.Entry;
- import org.apache.samza.task.InitableTask;
- import org.apache.samza.task.WindowableTask;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.codehaus.jackson.JsonParser;
- import redis.clients.jedis.Jedis;
- //import com.google.gson.Gson;
- import java.io.FileNotFoundException;
- import java.io.IOException;
- import java.io.FileInputStream;
- import java.util.Properties;
- import java.util.Set;
- //import java.util.Map;
- public class WindowTask implements StreamTask, InitableTask, WindowableTask {
- private KeyValueStore<String, String> store;
- Config config;
- TaskContext context;
- String val;
- String outTopic;
- HashMap propVals;
- String useProp;
- private int eventsSeen = 0;
- private String appName;
- public SystemStream OUTPUT_STREAM;
- private Jedis jedis;
- private String redis;
- private HashMap hm = new HashMap();
- Map<String, String> appMap = new HashMap();
- private static final Logger log = LoggerFactory.getLogger(WindowTask.class);
- /* Constructor */
- public HashMap readPropertyFile() {
- log.info("Reading the property file ...");
- Properties prop = new Properties();
- FileInputStream input;
- HashMap<String, String> propvals = new HashMap<String, String>();
- try {
- //adjust this to that of remote server
- input = new FileInputStream("./__package/config/abc.properties");
- prop.load(input);
- log.info("Property file loaded succesfully.");
- Set<String> propertyNames = prop.stringPropertyNames();
- log.debug("Property file content: ");
- for (String Property : propertyNames) {
- log.debug(Property + ":" + prop.getProperty(Property));
- propvals.put(Property, prop.getProperty(Property));
- }
- log.debug("HashMap generated: " + propvals);
- } catch (FileNotFoundException e) {
- log.warn(e.getMessage());
- e.printStackTrace();
- } catch (IOException e) {
- log.warn(e.getMessage());
- e.printStackTrace();
- } catch (Exception e) {
- log.warn(e.getMessage());
- e.printStackTrace();
- }
- return propvals;
- }
- public void init(Config config, TaskContext context) {
- log.info("IN WindowTask");
- this.redis = config.get("system.redis");
- this.outTopic = config.get("task.outputs");
- //this.store = (KeyValueStore<String, String>) context.getStore("store"); // We wont use it now
- OUTPUT_STREAM = new SystemStream("kafka", outTopic);
- this.useProp = config.get("load.property");
- //populate the propVals from Property File else pass in an empty HashMap
- if (Boolean.parseBoolean(useProp)) propVals = readPropertyFile();
- else propVals = new HashMap<String, String>();
- log.info("Connecting to redis at " + redis + "...");
- jedis = new Jedis(redis);
- try{
- jedis.connect();
- }catch (Exception e){
- log.warn ("Unable to connect to redis. Try checking with service-now directly (not implemented yet)");
- e.printStackTrace();
- }
- log.info("Connected to redis");
- }
- @SuppressWarnings("unchecked")
- @Override
- //public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws java.lang.InterruptedException, JsonParseException, JsonMappingException, IOException {
- public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws java.lang.InterruptedException {
- ObjectMapper mapper = new ObjectMapper();
- log.info("Envelope Key : " + envelope.getKey());
- mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
- JsonNode node;
- try {
- //log.info("Raw message: " + envelope.toString());
- node = mapper.readValue(envelope.getMessage().toString(), JsonNode.class);
- //log.info("Raw json: " + node.get("event").get("application_name").toString());
- //String appName=node.findValues("application_name").get(0).toString();
- String appName=node.get("event").get("application_name").toString();
- String environment=node.get("event").get("environment").toString();
- String business_unit_l1=node.get("event").get("business_unit_l1").toString();
- String business_unit_l2=node.get("event").get("business_unit_l2").toString();
- String business_unit_l3=node.get("event").get("business_unit_l3").toString();
- String business_unit_l4=node.get("event").get("business_unit_l4").toString();
- //String uniqueKey=business_unit_l1 + "." + business_unit_l2 + "." + business_unit_l3 + "." + business_unit_l4 + "." + environment + "." + appName;
- String uniqueKey=appName;
- //collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", outTopic), node));
- try{
- //eventsSeen=store.get(appName)appName;
- //eventsSeen=Integer.parseInt(store.get(appName));
- eventsSeen=Integer.parseInt(appMap.get(uniqueKey));
- eventsSeen++;
- log.info("Application Name: " + appName + "eventSeen " + eventsSeen);
- try{
- //hm.put(appName,eventsSeen);
- appMap.put(uniqueKey , eventsSeen + "");
- }catch (Exception e){
- log.info("Error while adding elements to locah Hash " + e.getMessage());
- e.printStackTrace();
- }
- /*
- try{
- store.put(appName, eventsSeen + ""); // Needs to be (string,string)
- log.info("Added new value " + store.get(appName));
- }catch (Exception e){
- log.info("No value found for application in kv store: " + e.getMessage());
- e.printStackTrace();
- }
- */
- }catch (Exception e){
- log.warn("No value found for application in kv store: " + e.getMessage());
- appMap.put(uniqueKey, "0");
- //store.put(appName, "0");
- //e.printStackTrace();
- }
- //eventsSeen++;
- } catch (JsonParseException e1) {
- // TODO Auto-generated catch block
- e1.printStackTrace();
- } catch (JsonMappingException e1) {
- // TODO Auto-generated catch block
- e1.printStackTrace();
- } catch (IOException e1) {
- // TODO Auto-generated catch block
- e1.printStackTrace();
- }
- }
- public void window(MessageCollector collector,
- TaskCoordinator coordinator) {
- //store.delete(appName);
- long milliseconds = System.currentTimeMillis();
- String receive_time_parsed=Long.toString(milliseconds);
- appMap.put("timestamp", receive_time_parsed);
- String jsonText = "";
- //StringWriter out = new StringWriter();
- //KeyedMessage<String, String> data = new KeyedMessage<String, String>(OUTPUT_STREAM, ip, appMap);
- ObjectMapper tempMapper = new ObjectMapper();
- tempMapper.configure(JsonGenerator.Feature.QUOTE_FIELD_NAMES, false);
- tempMapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
- try {
- //JSONValue.writeJSONString(appMap, out);
- //jsonText = tempMapper.writer().writeValueAsString(appMap);
- StringWriter stringWriter = new StringWriter();
- tempMapper.writer().writeValue(stringWriter, appMap);
- jsonText=stringWriter.toString();
- collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, jsonText));
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- /*
- KeyValueIterator<String, String> i = store.all();
- log.info("App 0 " + i.);
- //log.info(i.);
- while(i.hasNext()){
- Entry <String, String> next = i.next();
- log.info("Removed Key", next.toString());
- //i.remove();
- try{
- jedis.("AppAlertCount", );
- }catch(Exception e){
- log.warn("Unable to update redis with alert count per app");
- }
- store.put(next.getKey(), "0");
- }
- i.close();
- */
- Set set = appMap.entrySet();
- try {
- //jedis.hmset("appCount" , appMap);
- jedis.set("appCount", jsonText);
- }catch (Exception E){
- log.warn("Error setting value in Redis" + E.getMessage());
- E.getStackTrace();
- }
- /*
- // Get an iterator
- Iterator i = set.iterator();
- // Display elements
- while(i.hasNext()) {
- Map<String, String> tempMap = new HashMap<String, String>();
- Map.Entry appMap = (Map.Entry)i.next();
- tempMap.put(appMap.getKey().toString(), appMap.getValue().toString());
- log.info("App name: " + appMap.getKey() + " Count" + appMap.getValue());
- //jedis.hmset("appCount:" + appMap.getKey() , tempMap);
- try {
- jedis.hmset("appCount" + appMap.getKey() , tempMap);
- }catch (Exception E){
- log.warn("Error setting value in Redis" + E.getMessage());
- E.getStackTrace();
- }
- }
- */
- appMap.clear();
- eventsSeen = 0;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment