Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- diff --git pom.xml pom.xml
- index af93442..9dd8583 100644
- --- pom.xml
- +++ pom.xml
- @@ -194,6 +194,7 @@ under the License.
- <exclude>**/.cache/**</exclude>
- <exclude>deploy/**</exclude>
- <exclude>**/.project</exclude>
- + <exclude>scripts/**</exclude>
- </excludes>
- </configuration>
- </plugin>
- diff --git src/main/config/wikipedia-parser.properties src/main/config/wikipedia-parser.properties
- index 38575b6..87db4ab 100644
- --- src/main/config/wikipedia-parser.properties
- +++ src/main/config/wikipedia-parser.properties
- @@ -24,7 +24,8 @@ yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-
- # Task
- task.class=samza.examples.wikipedia.task.WikipediaParserStreamTask
- -task.inputs=kafka.wikipedia-raw
- +#task.inputs=kafka.wikipedia-raw
- +task.inputs=kafka.myTopic
- task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
- task.checkpoint.system=kafka
- # Normally, this would be 3, but we have only one broker.
- @@ -39,6 +40,8 @@ metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
- # Serializers
- serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
- serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory
- +serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
- +systems.kafka.streams.myTopic.samza.msg.serde=string
- # Systems
- systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
- diff --git src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
- index 0505f58..6b7896f 100644
- --- src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
- +++ src/main/java/samza/examples/wikipedia/task/WikipediaParserStreamTask.java
- @@ -35,19 +35,21 @@ public class WikipediaParserStreamTask implements StreamTask {
- @SuppressWarnings("unchecked")
- @Override
- public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
- - Map<String, Object> jsonObject = (Map<String, Object>) envelope.getMessage();
- - WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject);
- + // Map<String, Object> jsonObject = (Map<String, Object>) envelope.getMessage();
- + //WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject);
- try {
- - Map<String, Object> parsedJsonObject = parse(event.getRawEvent());
- + System.out.println((String)envelope.getMessage());
- + //Map<String, Object> parsedJsonObject = parse(event.getRawEvent());
- - parsedJsonObject.put("channel", event.getChannel());
- - parsedJsonObject.put("source", event.getSource());
- - parsedJsonObject.put("time", event.getTime());
- + //parsedJsonObject.put("channel", event.getChannel());
- + //parsedJsonObject.put("source", event.getSource());
- + //parsedJsonObject.put("time", event.getTime());
- - collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wikipedia-edits"), parsedJsonObject));
- + //collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wikipedia-edits"), parsedJsonObject));
- } catch (Exception e) {
- - System.err.println("Unable to parse line: " + event);
- + //System.err.println("Unable to parse line: " + event);
- + System.err.println("Unable to parse line from envelope ");
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement