Advertisement
nuclearfossil

hello-samza diff for Simple consumption of a topic

Mar 23rd, 2015
288
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Diff 3.50 KB | None | 0 0
  1. diff --git pom.xml pom.xml
  2. index af93442..9dd8583 100644
  3. --- pom.xml
  4. +++ pom.xml
  5. @@ -194,6 +194,7 @@ under the License.
  6.                <exclude>**/.cache/**</exclude>
  7.                <exclude>deploy/**</exclude>
  8.                <exclude>**/.project</exclude>
  9. +              <exclude>scripts/**</exclude>
  10.              </excludes>
  11.            </configuration>
  12.          </plugin>
  13. diff --git src/main/config/wikipedia-parser.properties src/main/config/wikipedia-parser.properties
  14. index 38575b6..87db4ab 100644
  15. --- src/main/config/wikipedia-parser.properties
  16. +++ src/main/config/wikipedia-parser.properties
  17. @@ -24,7 +24,8 @@ yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-
  18.  
  19.  # Task
  20.  task.class=samza.examples.wikipedia.task.WikipediaParserStreamTask
  21. -task.inputs=kafka.wikipedia-raw
  22. +#task.inputs=kafka.wikipedia-raw
  23. +task.inputs=kafka.myTopic
  24.  task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
  25.  task.checkpoint.system=kafka
  26.  # Normally, this would be 3, but we have only one broker.
  27. @@ -39,6 +40,8 @@ metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
  28.  # Serializers
  29.  serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
  30.  serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory
  31. +serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
  32. +systems.kafka.streams.myTopic.samza.msg.serde=string
  33.  
  34.  # Systems
  35.  systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
  36. diff --git src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
  37. index 0505f58..6b7896f 100644
  38. --- src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
  39. +++ src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
  40. @@ -35,19 +35,21 @@ public class WikipediaParserStreamTask implements StreamTask {
  41.    @SuppressWarnings("unchecked")
  42.    @Override
  43.    public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
  44. -    Map<String, Object> jsonObject = (Map<String, Object>) envelope.getMessage();
  45. -    WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject);
  46. +      //    Map<String, Object> jsonObject = (Map<String, Object>) envelope.getMessage();
  47. +      //WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject);
  48.  
  49.      try {
  50. -      Map<String, Object> parsedJsonObject = parse(event.getRawEvent());
  51. +       System.out.println((String)envelope.getMessage());
  52. +       //Map<String, Object> parsedJsonObject = parse(event.getRawEvent());
  53.  
  54. -      parsedJsonObject.put("channel", event.getChannel());
  55. -      parsedJsonObject.put("source", event.getSource());
  56. -      parsedJsonObject.put("time", event.getTime());
  57. +       //parsedJsonObject.put("channel", event.getChannel());
  58. +       //parsedJsonObject.put("source", event.getSource());
  59. +       //parsedJsonObject.put("time", event.getTime());
  60.  
  61. -      collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wikipedia-edits"), parsedJsonObject));
  62. +       //collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wikipedia-edits"), parsedJsonObject));
  63.      } catch (Exception e) {
  64. -      System.err.println("Unable to parse line: " + event);
  65. +       //System.err.println("Unable to parse line: " + event);
  66. +      System.err.println("Unable to parse line from envelope ");
  67.      }
  68.    }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement