Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import java.util.Properties;
- import org.apache.kafka.common.serialization.Serdes;
- import org.apache.kafka.streams.StreamsConfig;
- import org.apache.kafka.streams.KafkaStreams;
- import org.apache.kafka.streams.StreamsBuilder;
- import org.apache.kafka.streams.kstream.KStream;
- public class StreamsApp {
- private static final String APPLICATION_NAME = "stateless-processing-app";
- private static final String BOOTSTRAP_SERVERS = "localhost:9092";
- public static Properties getStreamsConfig() {
- var config = new Properties();
- config.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
- config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
- config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
- config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
- return config;
- }
- public static void main(String[] args) {
- StreamsBuilder builder = new StreamsBuilder();
- KStream<String, String> sourceStream = builder.stream("patient-records");
- KStream<String, String> filteredStream = sourceStream.filter((key, value) -> {
- PatientRecord record = PatientRecord.fromJson(value);
- return record.getAge() >= 18;
- });
- KStream<String, String> rekeyedStream = filteredStream.selectKey((key, value) -> {
- PatientRecord record = PatientRecord.fromJson(value);
- return record.getPatientId();
- });
- KStream<String, String> augmentedStream = rekeyedStream.mapValues(value -> {
- PatientRecord record = PatientRecord.fromJson(value);
- if (record.isFollowUpNeeded()) {
- record.setNextAppointmentDate(record.getAppointmentDate().plusDays(30));
- }
- if (record.getAssignedDoctor() == null || record.getAssignedDoctor().isEmpty()) {
- record.setAssignedDoctor(null);
- }
- return record.toJson();
- });
- KStream<String, String>[] branchedStreams = augmentedStream.branch(
- (key, value) -> {
- PatientRecord record = PatientRecord.fromJson(value);
- return record.getDiagnosis() != null && !record.getDiagnosis().isEmpty();
- },
- (key, value) -> {
- PatientRecord record = PatientRecord.fromJson(value);
- return record.getDiagnosis() == null || record.getDiagnosis().isEmpty();
- }
- );
- KStream<String, String> diagnosedStream = branchedStreams[0];
- KStream<String, String> undiagnosedStream = branchedStreams[1];
- KStream<String, String> doctorNotifications = diagnosedStream.mapValues(value -> {
- PatientRecord record = PatientRecord.fromJson(value);
- Notification notification = createDoctorNotification(record);
- return notification.toJson();
- });
- KStream<String, String> patientReminders = undiagnosedStream.mapValues(value -> {
- PatientRecord record = PatientRecord.fromJson(value);
- Reminder reminder = createPatientReminder(record);
- return reminder.toJson();
- });
- KStream<String, String> enrichedDoctorNotifications = doctorNotifications.mapValues(value -> {
- Notification notification = Notification.fromJson(value);
- Doctor doctor = getAssignedDoctor(notification.getPatientId());
- notification.setAssignedDoctor(doctor);
- return notification.toJson();
- });
- KStream<String, String> mergedStream = enrichedDoctorNotifications.merge(patientReminders);
- mergedStream.to("clinic-notifications-topic");
- // Запуск приложения
- KafkaStreams streams = new KafkaStreams(builder.build(), getStreamsConfig());
- streams.start();
- // Добавляем shutdown hook для корректной остановки
- Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
- }
- public static Notification createDoctorNotification(PatientRecord record) {
- Notification notification = new Notification();
- notification.setPatientId(record.getPatientId());
- notification.setMessage("Проверьте план лечения пациента " + record.getName() + ".");
- notification.setNextAppointmentDate(
- record.getNextAppointmentDate() != null ? record.getNextAppointmentDate().toString() : null
- );
- return notification;
- }
- public static Doctor getAssignedDoctor(String patientId) {
- // Здесь вы можете реализовать логику получения врача из локального справочника
- // Например, вернуть фиктивного врача для демонстрации
- return new Doctor("D001", "Др. Сергей Петров");
- }
- public static Reminder createPatientReminder(PatientRecord record) {
- Reminder reminder = new Reminder();
- reminder.setPatientId(record.getPatientId());
- reminder.setMessage("Уважаемый(ая) " + record.getName() + ", пожалуйста, запишитесь на первичный прием.");
- return reminder;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment