Guest User

Untitled

a guest
Jul 17th, 2018
72
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.79 KB | None | 0 0
  1. package ru.alfabank.api.afx.kafka.actualratesupdater.config;
  2.  
  3. import org.apache.kafka.streams.Topology;
  4. import org.apache.kafka.streams.TopologyDescription;
  5.  
  6. import java.util.Arrays;
  7. import java.util.Collection;
  8. import java.util.LinkedHashSet;
  9. import java.util.Set;
  10.  
  11. public class KafkaStreamTopologyPlantUMLPrinter {
  12. public static final String END_OF_LINE = "\n";
  13. public static final String ARROW = " --> ";
  14. public static final String ARROW_UP = " -up-> ";
  15. public static final String DOUBLE_HYPHEN = " -- ";
  16.  
  17. private Collection<String> topics = new LinkedHashSet<>();
  18.  
  19. private Topology topology;
  20.  
  21. public KafkaStreamTopologyPlantUMLPrinter(Topology topology) {
  22. this.topology = topology;
  23. }
  24.  
  25. public String print() throws Exception {
  26.  
  27. TopologyDescription describe = topology.describe();
  28.  
  29. StringBuilder diagram = new StringBuilder();
  30. diagram.append("@startuml").append(END_OF_LINE).append(END_OF_LINE);
  31.  
  32.  
  33. StringBuilder subtopologies = new StringBuilder();
  34. for (TopologyDescription.Subtopology subtopology : describe.subtopologies()) {
  35.  
  36. printSubtopologyAndAddTopics(subtopologies, subtopology);
  37. }
  38.  
  39. for (String topic : topics) {
  40. diagram.append("queue \"\\n ").append(topic).append("\" <<topic>> as ").append(removeBadCharacters(topic)).append(END_OF_LINE);
  41. }
  42. diagram.append(END_OF_LINE).append(END_OF_LINE);
  43.  
  44. diagram.append(subtopologies);
  45.  
  46. diagram.append("@enduml\n");
  47.  
  48. return diagram.toString();
  49. }
  50.  
  51. private void printSubtopologyAndAddTopics(StringBuilder subtopologies, TopologyDescription.Subtopology subtopology) {
  52. StringBuilder subtopologyRelations = new StringBuilder();
  53. subtopologies.append("package \"Sub-topology: ").append(subtopology.id()).append("\" {").append(END_OF_LINE);
  54. for (TopologyDescription.Node node : subtopology.nodes()) {
  55. subtopologies.append("agent \"").append(node.name()).append("\" <<").append(getNodeTypeName(node)).append(">> ").append("as ").append(removeBadCharacters(node.name())).append(END_OF_LINE);
  56.  
  57.  
  58. for (TopologyDescription.Node downstreamNode : node.successors()) {
  59. subtopologyRelations.append(removeBadCharacters(node.name())).append(ARROW).append(removeBadCharacters(downstreamNode.name())).append(END_OF_LINE);
  60. }
  61. processNodeSpecifics(node, subtopologies, subtopologyRelations);
  62. }
  63. subtopologies.append(END_OF_LINE).append(subtopologyRelations);
  64.  
  65. subtopologies.append("}").append(END_OF_LINE).append(END_OF_LINE);
  66. }
  67.  
  68.  
  69. private void processNodeSpecifics(TopologyDescription.Node node, StringBuilder declarations, StringBuilder relations) {
  70. if (node instanceof TopologyDescription.Source) {
  71. String topicsAsString = removeBrackets(
  72. ((TopologyDescription.Source) node).topics());
  73. Arrays.stream(topicsAsString.split(","))
  74. .map(String::trim)
  75. .forEach(topic -> {
  76. this.topics.add(topic);
  77. relations.append(removeBadCharacters(topic)).append(ARROW).append(removeBadCharacters(node.name())).append(END_OF_LINE);
  78. });
  79. } else if (node instanceof TopologyDescription.Processor) {
  80. Set<String> stores = ((TopologyDescription.Processor) node).stores();
  81. for (String store : stores) {
  82. declarations.append("database \"\\n ").append(store).append("\" <<state store>> as ").append(removeBadCharacters(store)).append(END_OF_LINE);
  83. relations.append(removeBadCharacters(node.name())).append(DOUBLE_HYPHEN).append(removeBadCharacters(store)).append(END_OF_LINE);
  84. }
  85. } else if (node instanceof TopologyDescription.Sink) {
  86. String topic = ((TopologyDescription.Sink) node).topic();
  87. relations.append(removeBadCharacters(node.name())).append(topics.contains(topic) ? ARROW_UP : ARROW).append(removeBadCharacters(topic)).append(END_OF_LINE);
  88. this.topics.add(topic);
  89. }
  90. }
  91.  
  92. private String removeBrackets(String topic) {
  93. if (topic.startsWith("[") && topic.endsWith("]")) {
  94. return topic.substring(1, topic.length() - 1);
  95. }
  96. return topic;
  97. }
  98.  
  99. private String getNodeTypeName(TopologyDescription.Node node) {
  100. if (node instanceof TopologyDescription.Source) {
  101. return "Source";
  102. } else if (node instanceof TopologyDescription.Processor) {
  103. return "Processor";
  104. } else if (node instanceof TopologyDescription.Sink) {
  105. return "Sink";
  106. } else {
  107. return node.getClass().getName();
  108. }
  109. }
  110.  
  111. private String removeBadCharacters(String source) {
  112. return source.replaceAll("[^A-Za-z0-9_]", "_");
  113. }
  114. }
Add Comment
Please, Sign In to add comment