Advertisement
Guest User

Untitled

a guest
Dec 5th, 2016
190
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 17.23 KB | None | 0 0
  1. package ru.fix.cpapsm.subscription.processing.job.service.repeatable.singleton;
  2. import com.jcraft.jsch.*;
  3. import org.apache.commons.vfs2.FileObject;
  4. import org.apache.commons.vfs2.FileSystemException;
  5. import org.apache.commons.vfs2.FileSystemOptions;
  6. import org.apache.commons.vfs2.Selectors;
  7. import org.apache.commons.vfs2.impl.StandardFileSystemManager;
  8. import org.apache.commons.vfs2.provider.sftp.IdentityInfo;
  9. import org.apache.commons.vfs2.provider.sftp.SftpFileSystemConfigBuilder;
  10. import org.apache.ibatis.annotations.Param;
  11. import org.junit.Before;
  12. import org.junit.Test;
  13. import org.junit.runner.RunWith;
  14. import org.mockito.InjectMocks;
  15. import org.mockito.Spy;
  16. import org.mockito.runners.MockitoJUnitRunner;
  17. import ru.fix.commons.ssh.JschSftpUploader;
  18. import ru.fix.commons.ssh.SecureConnectionDetails;
  19. import ru.fix.commons.ssh.UploadableFile;
  20. import ru.fix.commons.ssh.exception.BindException;
  21. import ru.fix.commons.ssh.exception.RemoteFileRenameException;
  22. import ru.fix.commons.ssh.exception.UploadDataException;
  23. import ru.fix.commons.zkconfig.ZKConfigPropertyHolder;
  24. import ru.fix.cpapsm.commons.distributed.job.manager.DistributedMultiJobContext;
  25. import ru.fix.cpapsm.commons.distributed.job.manager.ShutdownListener;
  26. import ru.fix.cpapsm.kafka.client.OffsettedConsumer;
  27. import ru.fix.cpapsm.kafka.client.Record;
  28. import ru.fix.cpapsm.offices.dao.mappers.DataExtractorMapper;
  29. import java.io.*;
  30. import java.nio.file.Files;
  31. import java.nio.file.Path;
  32. import java.time.LocalDateTime;
  33. import java.util.*;
  34. import static org.mockito.Mockito.mock;
  35. import static org.mockito.Mockito.when;
  36. /**
  37.  * Created by mgizatullin on 16.11.16.
  38.  */
  39. @RunWith(MockitoJUnitRunner.class)
  40. public class FrodLogExportJobTest {
  41.     @InjectMocks
  42.     @Spy
  43.     private FrodLogExportJob frodLogExportJob;
  44.     private DistributedMultiJobContext distributedMultiJobContext;
  45.     @Before
  46.     public void setUp() throws Exception {
  47.         frodLogExportJob.consumerDisable = mockZKHolder(Boolean.FALSE);
  48.         frodLogExportJob.kafkaBootstrapServers = mockZKHolder("srv-5:9099,srv-6:9099,srv-7:9099,srv-8:9099");
  49.         frodLogExportJob.kafkaTopicPrefix = mockZKHolder("");
  50.         frodLogExportJob.kafkaBufferSize = mockZKHolder(20);
  51.         frodLogExportJob.kafkaBufferTimeout = mockZKHolder(20);
  52.         frodLogExportJob.kafkaSessionTimeout = mockZKHolder(60000);
  53.         frodLogExportJob.kafkaRequestTimeout = mockZKHolder(70000);
  54.         frodLogExportJob.kafkaRetriesCount = mockZKHolder(5);
  55.         frodLogExportJob.kafkaRetryPeriod = mockZKHolder(5000);
  56.         frodLogExportJob.frodLogExporterDelay = mockZKHolder(3600L);
  57.         frodLogExportJob.frodSendTopicName = mockZKHolder("sms_send_frod");
  58.         frodLogExportJob.dataExtractorMapper = new DataExtractorMapper() {
  59.             @Override
  60.             public String getProcessorInfo(@Param("processorName") String processorName) {
  61.                 return null;
  62.             }
  63.             @Override
  64.             public String getProcessorInfoByPartition(@Param("processorName") String processorName, @Param("partition") Integer partition) {
  65.                 return null;
  66.             }
  67.             @Override
  68.             public int createProcessorInfo(@Param("processorName") String processorName, @Param("data") String data, @Param("changeDate") LocalDateTime changeDate) {
  69.                 return 0;
  70.             }
  71.             @Override
  72.             public int createProcessorInfoByPartition(@Param("processorName") String processorName, @Param("data") String data, @Param("changeDate") LocalDateTime changeDate, @Param("partition") Integer partition) {
  73.                 return 0;
  74.             }
  75.             @Override
  76.             public int updateProcessorInfo(@Param("processorName") String processorName, @Param("data") String data, @Param("changeDate") LocalDateTime changeDate) {
  77.                 return 0;
  78.             }
  79.             @Override
  80.             public int updateProcessorInfoByPartition(@Param("processorName") String processorName, @Param("data") String data, @Param("changeDate") LocalDateTime changeDate, @Param("partition") Integer partition) {
  81.                 return 0;
  82.             }
  83.         };
  84.         frodLogExportJob.sftpSettingsJson = mockZKHolder("{\n" +
  85.                 "  \"host\": \"10.52.66.91\",\n" +
  86.                 "  \"port\": \"22\",\n" +
  87.                 "  \"key.file\": \"/home/cpapsm/.ssh/id_rsa\",\n" +
  88.                 "  \"username\": \"cpapsm\",\n" +
  89.                 "  \"upload.dir\": \"/opt/cpapsm/tmp\"\n" +
  90.                 "}");
  91.         frodLogExportJob.init();
  92.         distributedMultiJobContext = new DistributedMultiJobContext() {
  93.             @Override
  94.             public Set<String> getFairWorkShare() {
  95.                 Set<String> share = new HashSet<>();
  96.                 share.add("test");
  97.                 return share;
  98.             }
  99.             @Override
  100.             public boolean acquireWorkShare(String workShareName, long acquirePeriod, long timeout) throws Exception {
  101.                 return true;
  102.             }
  103.             @Override
  104.             public boolean checkAndProlongWorkShareIfExpiresIn(String workShareName, long prolongationPeriod, long expirationPeriod) throws Exception {
  105.                 return true;
  106.             }
  107.             @Override
  108.             public void releaseWorkShare(String workShareName) throws Exception {
  109.             }
  110.             @Override
  111.             public boolean isNeedToShutdown() {
  112.                 return false;
  113.             }
  114.             @Override
  115.             public void addShutdownListener(ShutdownListener o) {
  116.             }
  117.             @Override
  118.             public void removeShutdownListener(ShutdownListener o) {
  119.             }
  120.         };
  121.     }
  122.     @Test
  123.     public void shouldExportSms() throws Exception {
  124.         frodLogExportJob.parseLogsfromKafka(new OffsettedConsumer() {
  125.                                                 boolean firstTime = true;
  126.                                                 @Override
  127.                                                 public List<Record> poll() {
  128.                                                     List<Record> records = new ArrayList<>();
  129.                                                     if (firstTime) {
  130.                                                         records.add(new TestRecord(new Date().getTime(), "{\"serializedPdu\":\"(DELIVER_SM command_length:0x00000044 command_id:0x00000005 command_status:0x00000000 sequence_number:0x00000001) (body: (service_type:[] source_addr:[ton:0x00 npi:0x01 [79375727135]] destination_addr:[ton:0x01 npi:0x01 [5151]] esm_class:0x00 protocol_id:0x00 priority_flag:0x00 schedule_delivery_time:[] validity_period:[] registered_delivery:0x00 replace_if_present_flag:0x00 data_coding:0x08 sm_default_msg_id:0x00 sm_length:0x0014 short_message:[04230421042204170410041F0420041504220031])) (opts: )\",\"type\":\"PDU\",\"smppLogin\":\"cpapsm\",\"serverSessionId\":925,\"shortMessage\":\"УСТЗАПРЕТ1\",\"sourceAddress\":\"79375727135\",\"destAddress\":\"5151\",\"serverId\":24,\"timeStamp\":1479375524528}"));
  131.                                                         firstTime = false;
  132.                                                     }
  133.                                                     return records;
  134.                                                 }
  135.                                                 @Override
  136.                                                 public void commitPartitions(Map<Integer, Long> offsets) {
  137.                                                 }
  138.                                                 @Override
  139.                                                 public void commitPartitionsMaxOffsets(Collection<Record> records) {
  140.                                                 }
  141.                                                 @Override
  142.                                                 public void pause() {
  143.                                                 }
  144.                                                 @Override
  145.                                                 public void resume() {
  146.                                                 }
  147.                                                 @Override
  148.                                                 public boolean isPaused() {
  149.                                                     return false;
  150.                                                 }
  151.                                                 @Override
  152.                                                 public void close() throws Exception {
  153.                                                 }
  154.                                             },
  155.                 new JschSftpUploader() {
  156.                     @Override
  157.                     public UploadableFile getRemoteFileOutputStream(SecureConnectionDetails connectionDetails, String remoteFileName, String remotePath) throws BindException {
  158.                         return new UploadableFile() {
  159.                             @Override
  160.                             public OutputStream getOutputStream() throws UploadDataException {
  161.                                 try {
  162.                                     Path file = Files.createTempFile("smsExport", "csv");
  163.                                     return new FileOutputStream(file.toFile());
  164.                                 } catch (IOException e) {
  165.                                     e.printStackTrace();
  166.                                 }
  167.                                 return null;
  168.                             }
  169.                             @Override
  170.                             public void rename(String newName) throws RemoteFileRenameException {
  171.                             }
  172.                             @Override
  173.                             public void replace(String newName) throws RemoteFileRenameException {
  174.                             }
  175.                             @Override
  176.                             public void close() throws Exception {
  177.                             }
  178.                         };
  179.                     }
  180.                 },
  181.                 distributedMultiJobContext);
  182.     }
  183.     public void testSftp() {
  184.         String hostName = "srv-9:22022";
  185.         String username = "cpapsm";
  186.         String password = "PutYourPasswordForHostHere";
  187.         String localFilePath = "/home/mgizatullin/WikidPad_Error.log";
  188.         String remoteFilePath = "/FakeRemotePath/FakeRemoteFile.txt";
  189.         File file = new File(localFilePath);
  190.         if (!file.exists()) {
  191.             throw new RuntimeException("Error. Local file not found");
  192.         }
  193.         StandardFileSystemManager manager = new StandardFileSystemManager();
  194.         try {
  195.             manager.init();
  196.             // Create local file object
  197.             FileObject localFile = manager.resolveFile(file.getAbsolutePath());
  198.             // Create remote file object
  199.             FileObject remoteFile = manager.resolveFile(createConnectionString("srv-9", "cpapsm", "/home/cpapsm/eeee.ttt"), createDefaultOptions("/home/mgizatullin/.ssh/id_rsa", ""));
  200. //            BufferedOutputStream bufferedOutputStream =  new BufferedOutputStream(remoteFile.getContent().getOutputStream());
  201. //            bufferedOutputStream.write("Hello!!!! bla bla lba\n".getBytes("UTF-8"));
  202. //            bufferedOutputStream.flush();
  203. //            bufferedOutputStream.close();
  204.             /*
  205.              * use createDefaultOptions() in place of fsOptions for all default
  206.              * options - Ashok.
  207.              */
  208.             // Copy local file to sftp server
  209.             remoteFile.copyFrom(localFile, Selectors.SELECT_SELF);
  210.             System.out.println("File upload success");
  211.         } catch (Exception e) {
  212.             throw new RuntimeException(e);
  213.         } finally {
  214.             manager.close();
  215.         }
  216.     }
  217.     public void testSftpDownadlo() {
  218.         String hostName = "srv-9:22022";
  219.         String username = "cpapsm";
  220.         String password = "PutYourPasswordForHostHere";
  221.         String localFilePath = "/home/mgizatullin/WikidPad_Error.log";
  222.         String remoteFilePath = "/FakeRemotePath/FakeRemoteFile.txt";
  223.         File file = new File(localFilePath);
  224.         if (!file.exists()) {
  225.             throw new RuntimeException("Error. Local file not found");
  226.         }
  227.         StandardFileSystemManager manager = new StandardFileSystemManager();
  228.         try {
  229.             manager.init();
  230.             // Create local file object
  231.             FileObject localFile = manager.resolveFile(file.getAbsolutePath());
  232.             localFile.createFile();
  233.             // Create remote file object
  234.             FileObject remoteFile = manager.resolveFile(createConnectionString("srv-9", "cpapsm", "WikidPad_Error.log"), createDefaultOptions("/home/mgizatullin/.ssh/id_rsa", ""));
  235.             BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(remoteFile.getContent().getOutputStream());
  236.             bufferedOutputStream.write("Hello!!!! bla bla lba\n".getBytes("UTF-8"));
  237.             bufferedOutputStream.flush();
  238.             bufferedOutputStream.close();
  239.             /*
  240.              * use createDefaultOptions() in place of fsOptions for all default
  241.              * options - Ashok.
  242.              */
  243.             // Copy local file to sftp server
  244. //            remoteFile.copyFrom(localFile, Selectors.SELECT_SELF);
  245.             System.out.println("File upload success");
  246.         } catch (Exception e) {
  247.             throw new RuntimeException(e);
  248.         } finally {
  249.             manager.close();
  250.         }
  251.     }
  252.     public void testJSch() {
  253.         Session session = null;
  254.         Channel channel = null;
  255.         try {
  256.             JSch ssh = new JSch();
  257.             ssh.setKnownHosts("/path/of/known_hosts/file");
  258.             session = ssh.getSession("username", "host", 22);
  259.             session.setPassword("password");
  260.             session.connect();
  261.             channel = session.openChannel("sftp");
  262.             channel.connect();
  263.             ChannelSftp sftp = (ChannelSftp) channel;
  264.             sftp.put("/path/of/local/file", "/path/of/ftp/file");
  265.         } catch (JSchException e) {
  266.             e.printStackTrace();
  267.         } catch (SftpException e) {
  268.             e.printStackTrace();
  269.         } finally {
  270.             if (channel != null) {
  271.                 channel.disconnect();
  272.             }
  273.             if (session != null) {
  274.                 session.disconnect();
  275.             }
  276.         }
  277.     }
  278.     public static String createConnectionString(String hostName, String username, String remoteFilePath) {
  279.         return "sftp://" + username + "@" + hostName + "/" + remoteFilePath;
  280.     }
  281.     /**
  282.      * Method to setup default SFTP config
  283.      *
  284.      * @return the FileSystemOptions object containing the specified
  285.      * configuration options
  286.      * @throws FileSystemException
  287.      */
  288.     public static FileSystemOptions createDefaultOptions() throws FileSystemException {
  289.         // Create SFTP options
  290.         FileSystemOptions opts = new FileSystemOptions();
  291.         // SSH Key checking
  292.         SftpFileSystemConfigBuilder.getInstance().setStrictHostKeyChecking(opts, "no");
  293.         /*
  294.          * Using the following line will cause VFS to choose File System's Root
  295.          * as VFS's root. If I wanted to use User's home as VFS's root then set
  296.          * 2nd method parameter to "true"
  297.          */
  298.         // Root directory set to user home
  299.         SftpFileSystemConfigBuilder.getInstance().setUserDirIsRoot(opts, false);
  300.         // Timeout is count by Milliseconds
  301.         SftpFileSystemConfigBuilder.getInstance().setTimeout(opts, 10000);
  302.         return opts;
  303.     }
  304.     private static FileSystemOptions createDefaultOptions(final String keyPath, final String passphrase) throws FileSystemException {
  305.         //create options for sftp
  306.         FileSystemOptions options = new FileSystemOptions();
  307.         //ssh key
  308.         SftpFileSystemConfigBuilder.getInstance().setStrictHostKeyChecking(options, "no");
  309.         //set root directory to user home
  310.         SftpFileSystemConfigBuilder.getInstance().setUserDirIsRoot(options, true);
  311.         //timeout
  312.         SftpFileSystemConfigBuilder.getInstance().setTimeout(options, 10000);
  313.         if (keyPath != null) {
  314. //            SftpFileSystemConfigBuilder.getInstance().setUserInfo(options, new SftpPassphraseUserInfo(passphrase));
  315. //            SftpFileSystemConfigBuilder.getInstance().setIdentities(options, new File[] { new File(keyPath) });
  316.             SftpFileSystemConfigBuilder.getInstance().setIdentityInfo(options, new IdentityInfo(new File(keyPath)));
  317.         }
  318.         return options;
  319.     }
  320.     public static <T> ZKConfigPropertyHolder<T> mockZKHolder(T value) {
  321.         ZKConfigPropertyHolder<T> zkConfigPropertyHolder = mock(ZKConfigPropertyHolder.class);
  322.         when(zkConfigPropertyHolder.get()).thenReturn(value);
  323.         return zkConfigPropertyHolder;
  324.     }
  325.     class TestRecord implements Record {
  326.         long offset;
  327.         String payload;
  328.         public TestRecord(long offset, String payload) {
  329.             this.offset = offset;
  330.             this.payload = payload;
  331.         }
  332.         @Override
  333.         public String getPayload() {
  334.             return payload;
  335.         }
  336.         @Override
  337.         public int getPartition() {
  338.             return 0;
  339.         }
  340.         @Override
  341.         public long getOffset() {
  342.             return offset;
  343.         }
  344.         @Override
  345.         public long getKey() {
  346.             return 0;
  347.         }
  348.     }
  349. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement