temirlan100

Untitled

Nov 14th, 2024
80
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 5.36 KB | None | 0 0
  1. import java.util.Properties;
  2. import org.apache.kafka.common.serialization.Serdes;
  3. import org.apache.kafka.streams.StreamsConfig;
  4. import org.apache.kafka.streams.KafkaStreams;
  5. import org.apache.kafka.streams.StreamsBuilder;
  6. import org.apache.kafka.streams.kstream.KStream;
  7.  
  8. public class StreamsApp {
  9.     private static final String APPLICATION_NAME = "stateless-processing-app";
  10.     private static final String BOOTSTRAP_SERVERS = "localhost:9092";
  11.  
  12.     public static Properties getStreamsConfig() {
  13.         var config = new Properties();
  14.  
  15.         config.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
  16.         config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
  17.         config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
  18.         config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
  19.  
  20.         return config;
  21.     }
  22.  
  23.     public static void main(String[] args) {
  24.         StreamsBuilder builder = new StreamsBuilder();
  25.  
  26.         KStream<String, String> sourceStream = builder.stream("patient-records");
  27.  
  28.         KStream<String, String> filteredStream = sourceStream.filter((key, value) -> {
  29.             PatientRecord record = PatientRecord.fromJson(value);
  30.             return record.getAge() >= 18;
  31.         });
  32.  
  33.         KStream<String, String> rekeyedStream = filteredStream.selectKey((key, value) -> {
  34.             PatientRecord record = PatientRecord.fromJson(value);
  35.             return record.getPatientId();
  36.         });
  37.  
  38.         KStream<String, String> augmentedStream = rekeyedStream.mapValues(value -> {
  39.             PatientRecord record = PatientRecord.fromJson(value);
  40.  
  41.             if (record.isFollowUpNeeded()) {
  42.                 record.setNextAppointmentDate(record.getAppointmentDate().plusDays(30));
  43.             }
  44.  
  45.             if (record.getAssignedDoctor() == null || record.getAssignedDoctor().isEmpty()) {
  46.                 record.setAssignedDoctor(null);
  47.             }
  48.  
  49.             return record.toJson();
  50.         });
  51.  
  52.         KStream<String, String>[] branchedStreams = augmentedStream.branch(
  53.                 (key, value) -> {
  54.                     PatientRecord record = PatientRecord.fromJson(value);
  55.                     return record.getDiagnosis() != null && !record.getDiagnosis().isEmpty();
  56.                 },
  57.                 (key, value) -> {
  58.                     PatientRecord record = PatientRecord.fromJson(value);
  59.                     return record.getDiagnosis() == null || record.getDiagnosis().isEmpty();
  60.                 }
  61.         );
  62.  
  63.         KStream<String, String> diagnosedStream = branchedStreams[0];
  64.         KStream<String, String> undiagnosedStream = branchedStreams[1];
  65.  
  66.         KStream<String, String> doctorNotifications = diagnosedStream.mapValues(value -> {
  67.             PatientRecord record = PatientRecord.fromJson(value);
  68.             Notification notification = createDoctorNotification(record);
  69.             return notification.toJson();
  70.         });
  71.  
  72.         KStream<String, String> patientReminders = undiagnosedStream.mapValues(value -> {
  73.             PatientRecord record = PatientRecord.fromJson(value);
  74.             Reminder reminder = createPatientReminder(record);
  75.             return reminder.toJson();
  76.         });
  77.  
  78.         KStream<String, String> enrichedDoctorNotifications = doctorNotifications.mapValues(value -> {
  79.             Notification notification = Notification.fromJson(value);
  80.             Doctor doctor = getAssignedDoctor(notification.getPatientId());
  81.             notification.setAssignedDoctor(doctor);
  82.             return notification.toJson();
  83.         });
  84.  
  85.         KStream<String, String> mergedStream = enrichedDoctorNotifications.merge(patientReminders);
  86.  
  87.         mergedStream.to("clinic-notifications-topic");
  88.  
  89.         // Запуск приложения
  90.         KafkaStreams streams = new KafkaStreams(builder.build(), getStreamsConfig());
  91.         streams.start();
  92.  
  93.         // Добавляем shutdown hook для корректной остановки
  94.         Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
  95.     }
  96.  
  97.     public static Notification createDoctorNotification(PatientRecord record) {
  98.         Notification notification = new Notification();
  99.         notification.setPatientId(record.getPatientId());
  100.         notification.setMessage("Проверьте план лечения пациента " + record.getName() + ".");
  101.         notification.setNextAppointmentDate(
  102.                 record.getNextAppointmentDate() != null ? record.getNextAppointmentDate().toString() : null
  103.         );
  104.         return notification;
  105.     }
  106.  
  107.     public static Doctor getAssignedDoctor(String patientId) {
  108.         // Здесь вы можете реализовать логику получения врача из локального справочника
  109.         // Например, вернуть фиктивного врача для демонстрации
  110.         return new Doctor("D001", "Др. Сергей Петров");
  111.     }
  112.  
  113.     public static Reminder createPatientReminder(PatientRecord record) {
  114.         Reminder reminder = new Reminder();
  115.         reminder.setPatientId(record.getPatientId());
  116.         reminder.setMessage("Уважаемый(ая) " + record.getName() + ", пожалуйста, запишитесь на первичный прием.");
  117.         return reminder;
  118.     }
  119. }
Advertisement
Add Comment
Please, Sign In to add comment