Advertisement
nazar_art

CarPark demo#2 with shutdown

Jul 28th, 2020 (edited)
926
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 5.45 KB | None | 0 0
  1. import com.google.gson.Gson;
  2. import com.siemens.domain.remote.IotEntity;
  3. import com.siemens.repository.IotEntityRepository;
  4. import com.siemens.websocket.WebSocketService;
  5. import com.siemens.websocket.model.CameraResultResponse;
  6. import com.siemens.websocket.model.RecognitionRecord;
  7. import lombok.Getter;
  8. import lombok.RequiredArgsConstructor;
  9. import lombok.Setter;
  10. import lombok.extern.slf4j.Slf4j;
  11. import org.springframework.beans.factory.annotation.Value;
  12. import org.springframework.stereotype.Service;
  13. import org.springframework.web.socket.*;
  14. import org.springframework.web.socket.client.WebSocketClient;
  15. import org.springframework.web.socket.client.standard.StandardWebSocketClient;
  16. import org.springframework.web.socket.handler.TextWebSocketHandler;
  17.  
  18. import javax.annotation.PreDestroy;
  19. import java.net.URI;
  20. import java.time.Instant;
  21. import java.util.List;
  22. import java.util.Objects;
  23. import java.util.Optional;
  24. import java.util.concurrent.ExecutorService;
  25. import java.util.concurrent.Executors;
  26. import java.util.stream.Collectors;
  27.  
  28.  
  29. @Slf4j
  30. @Service
  31. @RequiredArgsConstructor
  32. public class WebSocketServiceImpl implements WebSocketService {
  33.  
  34.     private final IotEntityRepository entityRepository;
  35.  
  36.     private ExecutorService executors;
  37.     private List<WebSocketTask> tasks;
  38.  
  39.     @Override
  40.     public void connectToCameras() {
  41.         List<IotEntity> allCameras = entityRepository.getAllCameras();
  42.         log.info("ALL_CAMERAS: {} : {}", allCameras.size(), allCameras);
  43.  
  44.         tasks = allCameras.stream()
  45.                 .map(WebSocketTask::new)
  46.                 .collect(Collectors.toList());
  47.  
  48.         executors = Executors.newFixedThreadPool(allCameras.size());
  49.         for (WebSocketTask task : tasks) {
  50.             executors.execute(task);
  51.         }
  52.     }
  53.  
  54.     @Override
  55.     @PreDestroy
  56.     public void shutdownQuietly() {
  57.         log.info("shutdown called");
  58.         if (tasks != null && !tasks.isEmpty()) {
  59.             tasks.forEach(t -> t.setRunning(false));
  60.         }
  61.  
  62.         if (executors != null) {
  63.             executors.shutdownNow();
  64.         }
  65.     }
  66. }
  67.  
  68. @Slf4j
  69. class WebSocketTask implements Runnable {
  70.  
  71.     private final WebSocketClient webSocketClient = new StandardWebSocketClient();
  72.     private final WebSocketHttpHeaders headers = new WebSocketHttpHeaders();
  73.     private final CameraWebSocketHandler handler;
  74.  
  75.     @Value(value = "camera.call-frequency")
  76.     private int callFrequency;
  77.  
  78.     @Setter
  79.     private volatile boolean running = true;
  80.     private final String url;
  81.  
  82.     @Getter
  83.     private final IotEntity camera;
  84.  
  85.     @Getter
  86.     @Setter
  87.     private Instant maxTimeStamp;
  88.  
  89.     public WebSocketTask(IotEntity camera) {
  90.         this.camera = camera;
  91.         handler = new CameraWebSocketHandler(this);
  92.         maxTimeStamp = Instant.now();
  93.  
  94. //        url = String.format("https://%s/vision/resultdatabase/websocket.cgi", camera.getIpAddress1());
  95.         url = "ws://echo.websocket.org";
  96.         Thread.currentThread().setName(String.valueOf(camera.getId()));
  97.     }
  98.  
  99.     @Override
  100.     public void run() {
  101.         while (true) {
  102.             try (WebSocketSession session = webSocketClient
  103.                     .doHandshake(handler, headers, URI.create(url)).get()) {
  104.  
  105.                 // inner while loop -> check that you aren't stopped
  106.                 while (running) {
  107.                     String sqlQuery = String.format("select * from results where timestamp > %s;", maxTimeStamp);
  108.                     TextMessage message = new TextMessage(sqlQuery);
  109.                     session.sendMessage(message);
  110.  
  111.                     log.info("SQL_QUERY: {} : {}", Thread.currentThread().getName(), message.getPayload());
  112.                     Thread.sleep(callFrequency);
  113.                 }
  114.  
  115.                 if (!running) {
  116.                     log.debug("stop execution for: {}", Thread.currentThread().getName());
  117.                     return;
  118.                 }
  119.             } catch (Exception e) { // end of external while loop -> reconnect again with "exponential back off"
  120.                 log.error("Exception while accessing websockets", e);
  121.             }
  122.         }
  123.     }
  124. }
  125.  
  126. @Slf4j
  127. @RequiredArgsConstructor
  128. class CameraWebSocketHandler extends TextWebSocketHandler {
  129.  
  130.     private final WebSocketTask task;
  131.     private Gson gson = new Gson();
  132.  
  133.     @Override
  134.     public void afterConnectionEstablished(WebSocketSession session) {
  135.         log.info("connection established {}", session.getId());
  136.     }
  137.  
  138.     @Override
  139.     public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) {
  140.         /*CameraResultResponse json = gson.fromJson((String) message.getPayload(), CameraResultResponse.class);
  141.         log.info("response: {}", json);
  142.         Optional<Instant> maxDT = json.getResult().stream()
  143.                 .filter(Objects::nonNull)
  144.                 .map(RecognitionRecord::getTimestamp)
  145.                 .max(Instant::compareTo);
  146.  
  147.         if (maxDT.isPresent() && maxDT.get().isAfter(task.getMaxTimeStamp())) {
  148.             task.setMaxTimeStamp(maxDT.get());
  149.         }*/
  150.  
  151.         log.info("response message: {}; thread: [{}]", message.getPayload(), Thread.currentThread().getName());
  152.     }
  153.  
  154.     @Override
  155.     public void handleTransportError(WebSocketSession session, Throwable exception) {
  156.         log.info("transport error");
  157.     }
  158.  
  159.     @Override
  160.     public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
  161.         log.info("connection closed");
  162.     }
  163. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement