Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package ru.alfabank.api.afx.kafka.actualratesupdater.config;
- import org.apache.kafka.streams.Topology;
- import org.apache.kafka.streams.TopologyDescription;
- import java.util.Arrays;
- import java.util.Collection;
- import java.util.LinkedHashSet;
- import java.util.Set;
- public class KafkaStreamTopologyPlantUMLPrinter {
- public static final String END_OF_LINE = "\n";
- public static final String ARROW = " --> ";
- public static final String ARROW_UP = " -up-> ";
- public static final String DOUBLE_HYPHEN = " -- ";
- private Collection<String> topics = new LinkedHashSet<>();
- private Topology topology;
- public KafkaStreamTopologyPlantUMLPrinter(Topology topology) {
- this.topology = topology;
- }
- public String print() throws Exception {
- TopologyDescription describe = topology.describe();
- StringBuilder diagram = new StringBuilder();
- diagram.append("@startuml").append(END_OF_LINE).append(END_OF_LINE);
- StringBuilder subtopologies = new StringBuilder();
- for (TopologyDescription.Subtopology subtopology : describe.subtopologies()) {
- printSubtopologyAndAddTopics(subtopologies, subtopology);
- }
- for (String topic : topics) {
- diagram.append("queue \"\\n ").append(topic).append("\" <<topic>> as ").append(removeBadCharacters(topic)).append(END_OF_LINE);
- }
- diagram.append(END_OF_LINE).append(END_OF_LINE);
- diagram.append(subtopologies);
- diagram.append("@enduml\n");
- return diagram.toString();
- }
- private void printSubtopologyAndAddTopics(StringBuilder subtopologies, TopologyDescription.Subtopology subtopology) {
- StringBuilder subtopologyRelations = new StringBuilder();
- subtopologies.append("package \"Sub-topology: ").append(subtopology.id()).append("\" {").append(END_OF_LINE);
- for (TopologyDescription.Node node : subtopology.nodes()) {
- subtopologies.append("agent \"").append(node.name()).append("\" <<").append(getNodeTypeName(node)).append(">> ").append("as ").append(removeBadCharacters(node.name())).append(END_OF_LINE);
- for (TopologyDescription.Node downstreamNode : node.successors()) {
- subtopologyRelations.append(removeBadCharacters(node.name())).append(ARROW).append(removeBadCharacters(downstreamNode.name())).append(END_OF_LINE);
- }
- processNodeSpecifics(node, subtopologies, subtopologyRelations);
- }
- subtopologies.append(END_OF_LINE).append(subtopologyRelations);
- subtopologies.append("}").append(END_OF_LINE).append(END_OF_LINE);
- }
- private void processNodeSpecifics(TopologyDescription.Node node, StringBuilder declarations, StringBuilder relations) {
- if (node instanceof TopologyDescription.Source) {
- String topicsAsString = removeBrackets(
- ((TopologyDescription.Source) node).topics());
- Arrays.stream(topicsAsString.split(","))
- .map(String::trim)
- .forEach(topic -> {
- this.topics.add(topic);
- relations.append(removeBadCharacters(topic)).append(ARROW).append(removeBadCharacters(node.name())).append(END_OF_LINE);
- });
- } else if (node instanceof TopologyDescription.Processor) {
- Set<String> stores = ((TopologyDescription.Processor) node).stores();
- for (String store : stores) {
- declarations.append("database \"\\n ").append(store).append("\" <<state store>> as ").append(removeBadCharacters(store)).append(END_OF_LINE);
- relations.append(removeBadCharacters(node.name())).append(DOUBLE_HYPHEN).append(removeBadCharacters(store)).append(END_OF_LINE);
- }
- } else if (node instanceof TopologyDescription.Sink) {
- String topic = ((TopologyDescription.Sink) node).topic();
- relations.append(removeBadCharacters(node.name())).append(topics.contains(topic) ? ARROW_UP : ARROW).append(removeBadCharacters(topic)).append(END_OF_LINE);
- this.topics.add(topic);
- }
- }
- private String removeBrackets(String topic) {
- if (topic.startsWith("[") && topic.endsWith("]")) {
- return topic.substring(1, topic.length() - 1);
- }
- return topic;
- }
- private String getNodeTypeName(TopologyDescription.Node node) {
- if (node instanceof TopologyDescription.Source) {
- return "Source";
- } else if (node instanceof TopologyDescription.Processor) {
- return "Processor";
- } else if (node instanceof TopologyDescription.Sink) {
- return "Sink";
- } else {
- return node.getClass().getName();
- }
- }
- private String removeBadCharacters(String source) {
- return source.replaceAll("[^A-Za-z0-9_]", "_");
- }
- }
Add Comment
Please, Sign In to add comment