Guest User

Untitled

a guest
Jul 17th, 2015
251
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 9.43 KB | None | 0 0
  1. package com.abc.argos.task;
  2.  
  3. import java.util.HashMap;
  4. import java.util.Map;
  5. import java.util.Set;
  6. import java.util.Iterator;
  7. import java.io.StringWriter;
  8. import org.json.simple.JSONValue;
  9. import org.codehaus.jackson.JsonGenerator;
  10. import kafka.producer.KeyedMessage;
  11. import org.codehaus.jackson.JsonNode;
  12. import org.codehaus.jackson.JsonParseException;
  13. import org.apache.samza.system.IncomingMessageEnvelope;
  14. import org.apache.samza.system.OutgoingMessageEnvelope;
  15. import org.apache.samza.system.SystemStream;
  16. import org.apache.samza.task.MessageCollector;
  17. import org.apache.samza.task.StreamTask;
  18. import org.apache.samza.task.TaskCoordinator;
  19. import org.apache.samza.task.TaskContext;
  20. import org.codehaus.jackson.map.JsonMappingException;
  21. import org.codehaus.jackson.map.ObjectMapper;
  22.  
  23.  
  24. import com.abc.argos.system.*;
  25.  
  26. import org.apache.samza.config.Config;
  27. import org.apache.samza.storage.kv.KeyValueStore;
  28. import org.apache.samza.storage.kv.KeyValueIterator;
  29. import org.apache.samza.storage.kv.Entry;
  30. import org.apache.samza.task.InitableTask;
  31. import org.apache.samza.task.WindowableTask;
  32. import org.slf4j.Logger;
  33. import org.slf4j.LoggerFactory;
  34. import org.codehaus.jackson.JsonParser;
  35.  
  36. import redis.clients.jedis.Jedis;
  37.  
  38.  
  39.  
  40.  
  41. //import com.google.gson.Gson;
  42. import java.io.FileNotFoundException;
  43. import java.io.IOException;
  44. import java.io.FileInputStream;
  45. import java.util.Properties;
  46. import java.util.Set;
  47. //import java.util.Map;
  48.  
  49.  
  50. public class WindowTask implements StreamTask, InitableTask, WindowableTask {
  51.  
  52. private KeyValueStore<String, String> store;
  53. Config config;
  54. TaskContext context;
  55. String val;
  56. String outTopic;
  57. HashMap propVals;
  58. String useProp;
  59. private int eventsSeen = 0;
  60. private String appName;
  61. public SystemStream OUTPUT_STREAM;
  62. private Jedis jedis;
  63. private String redis;
  64. private HashMap hm = new HashMap();
  65. Map<String, String> appMap = new HashMap();
  66.  
  67.  
  68. private static final Logger log = LoggerFactory.getLogger(WindowTask.class);
  69.  
  70. /* Constructor */
  71.  
  72.  
  73. public HashMap readPropertyFile() {
  74. log.info("Reading the property file ...");
  75. Properties prop = new Properties();
  76. FileInputStream input;
  77. HashMap<String, String> propvals = new HashMap<String, String>();
  78. try {
  79. //adjust this to that of remote server
  80. input = new FileInputStream("./__package/config/abc.properties");
  81. prop.load(input);
  82. log.info("Property file loaded succesfully.");
  83. Set<String> propertyNames = prop.stringPropertyNames();
  84. log.debug("Property file content: ");
  85. for (String Property : propertyNames) {
  86. log.debug(Property + ":" + prop.getProperty(Property));
  87. propvals.put(Property, prop.getProperty(Property));
  88. }
  89. log.debug("HashMap generated: " + propvals);
  90. } catch (FileNotFoundException e) {
  91. log.warn(e.getMessage());
  92. e.printStackTrace();
  93. } catch (IOException e) {
  94. log.warn(e.getMessage());
  95. e.printStackTrace();
  96. } catch (Exception e) {
  97. log.warn(e.getMessage());
  98. e.printStackTrace();
  99. }
  100. return propvals;
  101. }
  102.  
  103. public void init(Config config, TaskContext context) {
  104. log.info("IN WindowTask");
  105. this.redis = config.get("system.redis");
  106. this.outTopic = config.get("task.outputs");
  107. //this.store = (KeyValueStore<String, String>) context.getStore("store"); // We wont use it now
  108. OUTPUT_STREAM = new SystemStream("kafka", outTopic);
  109. this.useProp = config.get("load.property");
  110. //populate the propVals from Property File else pass in an empty HashMap
  111. if (Boolean.parseBoolean(useProp)) propVals = readPropertyFile();
  112. else propVals = new HashMap<String, String>();
  113. log.info("Connecting to redis at " + redis + "...");
  114. jedis = new Jedis(redis);
  115. try{
  116. jedis.connect();
  117. }catch (Exception e){
  118. log.warn ("Unable to connect to redis. Try checking with service-now directly (not implemented yet)");
  119. e.printStackTrace();
  120. }
  121. log.info("Connected to redis");
  122. }
  123. @SuppressWarnings("unchecked")
  124. @Override
  125. //public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws java.lang.InterruptedException, JsonParseException, JsonMappingException, IOException {
  126. public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws java.lang.InterruptedException {
  127.  
  128. ObjectMapper mapper = new ObjectMapper();
  129. log.info("Envelope Key : " + envelope.getKey());
  130. mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
  131. JsonNode node;
  132. try {
  133. //log.info("Raw message: " + envelope.toString());
  134. node = mapper.readValue(envelope.getMessage().toString(), JsonNode.class);
  135. //log.info("Raw json: " + node.get("event").get("application_name").toString());
  136. //String appName=node.findValues("application_name").get(0).toString();
  137. String appName=node.get("event").get("application_name").toString();
  138. String environment=node.get("event").get("environment").toString();
  139. String business_unit_l1=node.get("event").get("business_unit_l1").toString();
  140. String business_unit_l2=node.get("event").get("business_unit_l2").toString();
  141. String business_unit_l3=node.get("event").get("business_unit_l3").toString();
  142. String business_unit_l4=node.get("event").get("business_unit_l4").toString();
  143. //String uniqueKey=business_unit_l1 + "." + business_unit_l2 + "." + business_unit_l3 + "." + business_unit_l4 + "." + environment + "." + appName;
  144. String uniqueKey=appName;
  145.  
  146. //collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", outTopic), node));
  147. try{
  148. //eventsSeen=store.get(appName)appName;
  149.  
  150. //eventsSeen=Integer.parseInt(store.get(appName));
  151. eventsSeen=Integer.parseInt(appMap.get(uniqueKey));
  152. eventsSeen++;
  153. log.info("Application Name: " + appName + "eventSeen " + eventsSeen);
  154. try{
  155. //hm.put(appName,eventsSeen);
  156. appMap.put(uniqueKey , eventsSeen + "");
  157. }catch (Exception e){
  158. log.info("Error while adding elements to locah Hash " + e.getMessage());
  159. e.printStackTrace();
  160. }
  161. /*
  162. try{
  163. store.put(appName, eventsSeen + ""); // Needs to be (string,string)
  164. log.info("Added new value " + store.get(appName));
  165. }catch (Exception e){
  166. log.info("No value found for application in kv store: " + e.getMessage());
  167. e.printStackTrace();
  168. }
  169. */
  170. }catch (Exception e){
  171. log.warn("No value found for application in kv store: " + e.getMessage());
  172. appMap.put(uniqueKey, "0");
  173. //store.put(appName, "0");
  174. //e.printStackTrace();
  175. }
  176. //eventsSeen++;
  177.  
  178. } catch (JsonParseException e1) {
  179. // TODO Auto-generated catch block
  180. e1.printStackTrace();
  181. } catch (JsonMappingException e1) {
  182. // TODO Auto-generated catch block
  183. e1.printStackTrace();
  184. } catch (IOException e1) {
  185. // TODO Auto-generated catch block
  186. e1.printStackTrace();
  187. }
  188. }
  189.  
  190. public void window(MessageCollector collector,
  191. TaskCoordinator coordinator) {
  192. //store.delete(appName);
  193. long milliseconds = System.currentTimeMillis();
  194. String receive_time_parsed=Long.toString(milliseconds);
  195. appMap.put("timestamp", receive_time_parsed);
  196. String jsonText = "";
  197. //StringWriter out = new StringWriter();
  198. //KeyedMessage<String, String> data = new KeyedMessage<String, String>(OUTPUT_STREAM, ip, appMap);
  199.  
  200. ObjectMapper tempMapper = new ObjectMapper();
  201. tempMapper.configure(JsonGenerator.Feature.QUOTE_FIELD_NAMES, false);
  202. tempMapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
  203.  
  204. try {
  205. //JSONValue.writeJSONString(appMap, out);
  206. //jsonText = tempMapper.writer().writeValueAsString(appMap);
  207. StringWriter stringWriter = new StringWriter();
  208. tempMapper.writer().writeValue(stringWriter, appMap);
  209. jsonText=stringWriter.toString();
  210. collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, jsonText));
  211. } catch (IOException e) {
  212. // TODO Auto-generated catch block
  213. e.printStackTrace();
  214. }
  215.  
  216. /*
  217. KeyValueIterator<String, String> i = store.all();
  218.  
  219. log.info("App 0 " + i.);
  220.  
  221. //log.info(i.);
  222. while(i.hasNext()){
  223. Entry <String, String> next = i.next();
  224. log.info("Removed Key", next.toString());
  225. //i.remove();
  226.  
  227. try{
  228. jedis.("AppAlertCount", );
  229. }catch(Exception e){
  230. log.warn("Unable to update redis with alert count per app");
  231. }
  232.  
  233. store.put(next.getKey(), "0");
  234. }
  235. i.close();
  236. */
  237.  
  238. Set set = appMap.entrySet();
  239. try {
  240. //jedis.hmset("appCount" , appMap);
  241. jedis.set("appCount", jsonText);
  242.  
  243.  
  244. }catch (Exception E){
  245. log.warn("Error setting value in Redis" + E.getMessage());
  246. E.getStackTrace();
  247. }
  248. /*
  249. // Get an iterator
  250. Iterator i = set.iterator();
  251. // Display elements
  252. while(i.hasNext()) {
  253. Map<String, String> tempMap = new HashMap<String, String>();
  254. Map.Entry appMap = (Map.Entry)i.next();
  255. tempMap.put(appMap.getKey().toString(), appMap.getValue().toString());
  256. log.info("App name: " + appMap.getKey() + " Count" + appMap.getValue());
  257. //jedis.hmset("appCount:" + appMap.getKey() , tempMap);
  258. try {
  259. jedis.hmset("appCount" + appMap.getKey() , tempMap);
  260. }catch (Exception E){
  261. log.warn("Error setting value in Redis" + E.getMessage());
  262. E.getStackTrace();
  263. }
  264.  
  265. }
  266. */
  267. appMap.clear();
  268. eventsSeen = 0;
  269. }
  270. }
Advertisement
Add Comment
Please, Sign In to add comment