Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public void processClientsInfoFromZip(String nameArchive) throws IOException {
- log.debug("Start work with the archive");
- File inputArchiveFile = new File(path, nameArchive);
- // создаем файл и записываем туда данные из файла в архиве
- writeFile(inputArchiveFile);
- //обработка данных по каждому клиенту и запись в кафку
- processClientsInfo(inputArchiveFile);
- }
- public void writeFile(File inputArchiveFile) throws IOException {
- log.debug("Write to file");
- //получаем файл из архива
- File outputFile = getNewFileFromArchive(inputArchiveFile);
- // BufferedWriter это буферизированная запись в файл, то есть пишем не побайтно
- BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(outputFile), StandardCharsets.UTF_8));
- InputStream stream = getInputStream(inputArchiveFile);
- for (int oneByte = stream.read(); oneByte != -1; oneByte = stream.read()) {
- writer.write(oneByte);
- }
- writer.flush();
- }
- public void processClientsInfo(File nameArchive) throws IOException {
- log.debug("Process Clients Info");
- // получение потока данных в файле
- InputStream stream = getInputStream(nameArchive);
- //BufferedReader Считывает текст из потока ввода символов, буферизует символы
- //InputStreamReader это мост между символьным и байтовым потоками:
- //он считывает байты и декодирует их в символы
- BufferedReader reader1 = new BufferedReader(new InputStreamReader(stream));
- CSVParser csvParser1 = new CSVParser(reader1, CSVFormat.DEFAULT.withFirstRecordAsHeader().withDelimiter(delimiter));
- for (CSVRecord csvRecord1 : csvParser1) {
- writeOneClientInfo(csvRecord1.toMap());
- }
- }
- public InputStream getInputStream(File inputArchiveFile) throws IOException {
- ZipInputStream zipInputStream = new ZipInputStream(new FileInputStream(inputArchiveFile.getAbsolutePath()));
- ZipEntry entry = zipInputStream.getNextEntry();
- ZipFile zipFile = new ZipFile(inputArchiveFile);
- Enumeration<? extends ZipEntry> entries = zipFile.entries();
- entries.hasMoreElements();
- InputStream stream = zipFile.getInputStream(entry);
- return stream;
- }
- public File getNewFileFromArchive(File inputArchiveFile) throws IOException {
- // ZipInputStream потоковое(побайтовое) чтение архива
- ZipInputStream zipInputStream = new ZipInputStream(new FileInputStream(inputArchiveFile.getAbsolutePath()));
- //getNextEntry функция, которая перемещает указатель на начало следующего файла в архиве
- ZipEntry entry = zipInputStream.getNextEntry();
- File outputFile = new File(newPath, entry.getName());
- return outputFile;
- }
- private void writeOneClientInfo(Map<String, String> clientRawInfo) {
- log.debug("Write to kafka");
- ClientInfo clientInfo = new ClientInfo(new LinkedHashMap<>(clientRawInfo));
- log.debug("Write JSON-messages to the Kafka topic, message:{}", clientInfo);
- kafkaTemplate.send(topic, clientInfo);
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement