Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package ru.fix.cpapsm.subscription.processing.job.service.repeatable.singleton;
- import com.jcraft.jsch.*;
- import org.apache.commons.vfs2.FileObject;
- import org.apache.commons.vfs2.FileSystemException;
- import org.apache.commons.vfs2.FileSystemOptions;
- import org.apache.commons.vfs2.Selectors;
- import org.apache.commons.vfs2.impl.StandardFileSystemManager;
- import org.apache.commons.vfs2.provider.sftp.IdentityInfo;
- import org.apache.commons.vfs2.provider.sftp.SftpFileSystemConfigBuilder;
- import org.apache.ibatis.annotations.Param;
- import org.junit.Before;
- import org.junit.Test;
- import org.junit.runner.RunWith;
- import org.mockito.InjectMocks;
- import org.mockito.Spy;
- import org.mockito.runners.MockitoJUnitRunner;
- import ru.fix.commons.ssh.JschSftpUploader;
- import ru.fix.commons.ssh.SecureConnectionDetails;
- import ru.fix.commons.ssh.UploadableFile;
- import ru.fix.commons.ssh.exception.BindException;
- import ru.fix.commons.ssh.exception.RemoteFileRenameException;
- import ru.fix.commons.ssh.exception.UploadDataException;
- import ru.fix.commons.zkconfig.ZKConfigPropertyHolder;
- import ru.fix.cpapsm.commons.distributed.job.manager.DistributedMultiJobContext;
- import ru.fix.cpapsm.commons.distributed.job.manager.ShutdownListener;
- import ru.fix.cpapsm.kafka.client.OffsettedConsumer;
- import ru.fix.cpapsm.kafka.client.Record;
- import ru.fix.cpapsm.offices.dao.mappers.DataExtractorMapper;
- import java.io.*;
- import java.nio.file.Files;
- import java.nio.file.Path;
- import java.time.LocalDateTime;
- import java.util.*;
- import static org.mockito.Mockito.mock;
- import static org.mockito.Mockito.when;
- /**
- * Created by mgizatullin on 16.11.16.
- */
- @RunWith(MockitoJUnitRunner.class)
- public class FrodLogExportJobTest {
- @InjectMocks
- @Spy
- private FrodLogExportJob frodLogExportJob;
- private DistributedMultiJobContext distributedMultiJobContext;
- @Before
- public void setUp() throws Exception {
- frodLogExportJob.consumerDisable = mockZKHolder(Boolean.FALSE);
- frodLogExportJob.kafkaBootstrapServers = mockZKHolder("srv-5:9099,srv-6:9099,srv-7:9099,srv-8:9099");
- frodLogExportJob.kafkaTopicPrefix = mockZKHolder("");
- frodLogExportJob.kafkaBufferSize = mockZKHolder(20);
- frodLogExportJob.kafkaBufferTimeout = mockZKHolder(20);
- frodLogExportJob.kafkaSessionTimeout = mockZKHolder(60000);
- frodLogExportJob.kafkaRequestTimeout = mockZKHolder(70000);
- frodLogExportJob.kafkaRetriesCount = mockZKHolder(5);
- frodLogExportJob.kafkaRetryPeriod = mockZKHolder(5000);
- frodLogExportJob.frodLogExporterDelay = mockZKHolder(3600L);
- frodLogExportJob.frodSendTopicName = mockZKHolder("sms_send_frod");
- frodLogExportJob.dataExtractorMapper = new DataExtractorMapper() {
- @Override
- public String getProcessorInfo(@Param("processorName") String processorName) {
- return null;
- }
- @Override
- public String getProcessorInfoByPartition(@Param("processorName") String processorName, @Param("partition") Integer partition) {
- return null;
- }
- @Override
- public int createProcessorInfo(@Param("processorName") String processorName, @Param("data") String data, @Param("changeDate") LocalDateTime changeDate) {
- return 0;
- }
- @Override
- public int createProcessorInfoByPartition(@Param("processorName") String processorName, @Param("data") String data, @Param("changeDate") LocalDateTime changeDate, @Param("partition") Integer partition) {
- return 0;
- }
- @Override
- public int updateProcessorInfo(@Param("processorName") String processorName, @Param("data") String data, @Param("changeDate") LocalDateTime changeDate) {
- return 0;
- }
- @Override
- public int updateProcessorInfoByPartition(@Param("processorName") String processorName, @Param("data") String data, @Param("changeDate") LocalDateTime changeDate, @Param("partition") Integer partition) {
- return 0;
- }
- };
- frodLogExportJob.sftpSettingsJson = mockZKHolder("{\n" +
- " \"host\": \"10.52.66.91\",\n" +
- " \"port\": \"22\",\n" +
- " \"key.file\": \"/home/cpapsm/.ssh/id_rsa\",\n" +
- " \"username\": \"cpapsm\",\n" +
- " \"upload.dir\": \"/opt/cpapsm/tmp\"\n" +
- "}");
- frodLogExportJob.init();
- distributedMultiJobContext = new DistributedMultiJobContext() {
- @Override
- public Set<String> getFairWorkShare() {
- Set<String> share = new HashSet<>();
- share.add("test");
- return share;
- }
- @Override
- public boolean acquireWorkShare(String workShareName, long acquirePeriod, long timeout) throws Exception {
- return true;
- }
- @Override
- public boolean checkAndProlongWorkShareIfExpiresIn(String workShareName, long prolongationPeriod, long expirationPeriod) throws Exception {
- return true;
- }
- @Override
- public void releaseWorkShare(String workShareName) throws Exception {
- }
- @Override
- public boolean isNeedToShutdown() {
- return false;
- }
- @Override
- public void addShutdownListener(ShutdownListener o) {
- }
- @Override
- public void removeShutdownListener(ShutdownListener o) {
- }
- };
- }
- @Test
- public void shouldExportSms() throws Exception {
- frodLogExportJob.parseLogsfromKafka(new OffsettedConsumer() {
- boolean firstTime = true;
- @Override
- public List<Record> poll() {
- List<Record> records = new ArrayList<>();
- if (firstTime) {
- records.add(new TestRecord(new Date().getTime(), "{\"serializedPdu\":\"(DELIVER_SM command_length:0x00000044 command_id:0x00000005 command_status:0x00000000 sequence_number:0x00000001) (body: (service_type:[] source_addr:[ton:0x00 npi:0x01 [79375727135]] destination_addr:[ton:0x01 npi:0x01 [5151]] esm_class:0x00 protocol_id:0x00 priority_flag:0x00 schedule_delivery_time:[] validity_period:[] registered_delivery:0x00 replace_if_present_flag:0x00 data_coding:0x08 sm_default_msg_id:0x00 sm_length:0x0014 short_message:[04230421042204170410041F0420041504220031])) (opts: )\",\"type\":\"PDU\",\"smppLogin\":\"cpapsm\",\"serverSessionId\":925,\"shortMessage\":\"УСТЗАПРЕТ1\",\"sourceAddress\":\"79375727135\",\"destAddress\":\"5151\",\"serverId\":24,\"timeStamp\":1479375524528}"));
- firstTime = false;
- }
- return records;
- }
- @Override
- public void commitPartitions(Map<Integer, Long> offsets) {
- }
- @Override
- public void commitPartitionsMaxOffsets(Collection<Record> records) {
- }
- @Override
- public void pause() {
- }
- @Override
- public void resume() {
- }
- @Override
- public boolean isPaused() {
- return false;
- }
- @Override
- public void close() throws Exception {
- }
- },
- new JschSftpUploader() {
- @Override
- public UploadableFile getRemoteFileOutputStream(SecureConnectionDetails connectionDetails, String remoteFileName, String remotePath) throws BindException {
- return new UploadableFile() {
- @Override
- public OutputStream getOutputStream() throws UploadDataException {
- try {
- Path file = Files.createTempFile("smsExport", "csv");
- return new FileOutputStream(file.toFile());
- } catch (IOException e) {
- e.printStackTrace();
- }
- return null;
- }
- @Override
- public void rename(String newName) throws RemoteFileRenameException {
- }
- @Override
- public void replace(String newName) throws RemoteFileRenameException {
- }
- @Override
- public void close() throws Exception {
- }
- };
- }
- },
- distributedMultiJobContext);
- }
- public void testSftp() {
- String hostName = "srv-9:22022";
- String username = "cpapsm";
- String password = "PutYourPasswordForHostHere";
- String localFilePath = "/home/mgizatullin/WikidPad_Error.log";
- String remoteFilePath = "/FakeRemotePath/FakeRemoteFile.txt";
- File file = new File(localFilePath);
- if (!file.exists()) {
- throw new RuntimeException("Error. Local file not found");
- }
- StandardFileSystemManager manager = new StandardFileSystemManager();
- try {
- manager.init();
- // Create local file object
- FileObject localFile = manager.resolveFile(file.getAbsolutePath());
- // Create remote file object
- FileObject remoteFile = manager.resolveFile(createConnectionString("srv-9", "cpapsm", "/home/cpapsm/eeee.ttt"), createDefaultOptions("/home/mgizatullin/.ssh/id_rsa", ""));
- // BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(remoteFile.getContent().getOutputStream());
- // bufferedOutputStream.write("Hello!!!! bla bla lba\n".getBytes("UTF-8"));
- // bufferedOutputStream.flush();
- // bufferedOutputStream.close();
- /*
- * use createDefaultOptions() in place of fsOptions for all default
- * options - Ashok.
- */
- // Copy local file to sftp server
- remoteFile.copyFrom(localFile, Selectors.SELECT_SELF);
- System.out.println("File upload success");
- } catch (Exception e) {
- throw new RuntimeException(e);
- } finally {
- manager.close();
- }
- }
- public void testSftpDownadlo() {
- String hostName = "srv-9:22022";
- String username = "cpapsm";
- String password = "PutYourPasswordForHostHere";
- String localFilePath = "/home/mgizatullin/WikidPad_Error.log";
- String remoteFilePath = "/FakeRemotePath/FakeRemoteFile.txt";
- File file = new File(localFilePath);
- if (!file.exists()) {
- throw new RuntimeException("Error. Local file not found");
- }
- StandardFileSystemManager manager = new StandardFileSystemManager();
- try {
- manager.init();
- // Create local file object
- FileObject localFile = manager.resolveFile(file.getAbsolutePath());
- localFile.createFile();
- // Create remote file object
- FileObject remoteFile = manager.resolveFile(createConnectionString("srv-9", "cpapsm", "WikidPad_Error.log"), createDefaultOptions("/home/mgizatullin/.ssh/id_rsa", ""));
- BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(remoteFile.getContent().getOutputStream());
- bufferedOutputStream.write("Hello!!!! bla bla lba\n".getBytes("UTF-8"));
- bufferedOutputStream.flush();
- bufferedOutputStream.close();
- /*
- * use createDefaultOptions() in place of fsOptions for all default
- * options - Ashok.
- */
- // Copy local file to sftp server
- // remoteFile.copyFrom(localFile, Selectors.SELECT_SELF);
- System.out.println("File upload success");
- } catch (Exception e) {
- throw new RuntimeException(e);
- } finally {
- manager.close();
- }
- }
- public void testJSch() {
- Session session = null;
- Channel channel = null;
- try {
- JSch ssh = new JSch();
- ssh.setKnownHosts("/path/of/known_hosts/file");
- session = ssh.getSession("username", "host", 22);
- session.setPassword("password");
- session.connect();
- channel = session.openChannel("sftp");
- channel.connect();
- ChannelSftp sftp = (ChannelSftp) channel;
- sftp.put("/path/of/local/file", "/path/of/ftp/file");
- } catch (JSchException e) {
- e.printStackTrace();
- } catch (SftpException e) {
- e.printStackTrace();
- } finally {
- if (channel != null) {
- channel.disconnect();
- }
- if (session != null) {
- session.disconnect();
- }
- }
- }
- public static String createConnectionString(String hostName, String username, String remoteFilePath) {
- return "sftp://" + username + "@" + hostName + "/" + remoteFilePath;
- }
- /**
- * Method to setup default SFTP config
- *
- * @return the FileSystemOptions object containing the specified
- * configuration options
- * @throws FileSystemException
- */
- public static FileSystemOptions createDefaultOptions() throws FileSystemException {
- // Create SFTP options
- FileSystemOptions opts = new FileSystemOptions();
- // SSH Key checking
- SftpFileSystemConfigBuilder.getInstance().setStrictHostKeyChecking(opts, "no");
- /*
- * Using the following line will cause VFS to choose File System's Root
- * as VFS's root. If I wanted to use User's home as VFS's root then set
- * 2nd method parameter to "true"
- */
- // Root directory set to user home
- SftpFileSystemConfigBuilder.getInstance().setUserDirIsRoot(opts, false);
- // Timeout is count by Milliseconds
- SftpFileSystemConfigBuilder.getInstance().setTimeout(opts, 10000);
- return opts;
- }
- private static FileSystemOptions createDefaultOptions(final String keyPath, final String passphrase) throws FileSystemException {
- //create options for sftp
- FileSystemOptions options = new FileSystemOptions();
- //ssh key
- SftpFileSystemConfigBuilder.getInstance().setStrictHostKeyChecking(options, "no");
- //set root directory to user home
- SftpFileSystemConfigBuilder.getInstance().setUserDirIsRoot(options, true);
- //timeout
- SftpFileSystemConfigBuilder.getInstance().setTimeout(options, 10000);
- if (keyPath != null) {
- // SftpFileSystemConfigBuilder.getInstance().setUserInfo(options, new SftpPassphraseUserInfo(passphrase));
- // SftpFileSystemConfigBuilder.getInstance().setIdentities(options, new File[] { new File(keyPath) });
- SftpFileSystemConfigBuilder.getInstance().setIdentityInfo(options, new IdentityInfo(new File(keyPath)));
- }
- return options;
- }
- public static <T> ZKConfigPropertyHolder<T> mockZKHolder(T value) {
- ZKConfigPropertyHolder<T> zkConfigPropertyHolder = mock(ZKConfigPropertyHolder.class);
- when(zkConfigPropertyHolder.get()).thenReturn(value);
- return zkConfigPropertyHolder;
- }
- class TestRecord implements Record {
- long offset;
- String payload;
- public TestRecord(long offset, String payload) {
- this.offset = offset;
- this.payload = payload;
- }
- @Override
- public String getPayload() {
- return payload;
- }
- @Override
- public int getPartition() {
- return 0;
- }
- @Override
- public long getOffset() {
- return offset;
- }
- @Override
- public long getKey() {
- return 0;
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement