Advertisement
Guest User

Untitled

a guest
Oct 11th, 2016
87
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 9.05 KB | None | 0 0
  1. package main
  2.  
  3. import(
  4. "os"
  5. "encoding/gob"
  6. "net"
  7. "fmt"
  8. "crypto/sha1"
  9. "bytes"
  10. "bufio"
  11. "compress/gzip"
  12. "sync"
  13. "io"
  14. "syscall"
  15. "strings"
  16. "golang.org/x/crypto/ssh"
  17. "golang.org/x/crypto/ssh/terminal"
  18. "strconv"
  19. )
  20.  
  21. /* What we need to compress a chunk. Used before compression */
  22. type UncompressedChunkData struct {
  23. buffer []byte
  24. sendSize int64
  25. chunkSize int64
  26. orderNumber int64
  27. }
  28.  
  29. /* Describes a chunk and body. This is sent */
  30. type TempChunk struct {
  31. CHUNKSIZE int64
  32. ISCONTENT bool
  33. FP [20]byte
  34. COMPRESSED_SIZE int64
  35. DECOMPRESSED_SIZE int64
  36. ORDER int64
  37. }
  38.  
  39. /* Sent into last channel... */
  40. type Chunk struct {
  41. CHUNKSIZE int64
  42. ISCONTENT bool
  43. FP [20]byte
  44. COMPRESSED_SIZE int64
  45. DECOMPRESSED_SIZE int64
  46. ORDER int64
  47. BODY []byte
  48. }
  49.  
  50.  
  51. type File struct {
  52. NAME string
  53. SIZE int64
  54. CHUNKNUM int64
  55. }
  56.  
  57. func main() {
  58.  
  59. /* The only file to use - ever. Also need
  60. * password and username from user
  61. */
  62. path := "/data/inf2202/uniprot_sprot.dat"
  63. username, password := credentials()
  64.  
  65. /* Describes the session */
  66. sshConfig := &ssh.ClientConfig{
  67. User: username,
  68. Auth: []ssh.AuthMethod{
  69. ssh.Password(password)},
  70. }
  71. sshConfig.SetDefaults()
  72.  
  73. session := establishConnection(sshConfig)
  74.  
  75. /* Size of file in bytes */
  76. fileSize := getFileSize(session, path)
  77. session.Close()
  78.  
  79. /* Need to re-open it because it can only
  80. * handle one command per session...
  81. */
  82. session = establishConnection(sshConfig)
  83.  
  84. /* Reader from the remote file */
  85. fileReader := getFileStream(session, path)
  86.  
  87. /* Connection stuff */
  88. server, err := net.Listen("tcp", "localhost:8080")
  89. if err != nil {
  90. panic(err)
  91. }
  92. var chunkSize int64 = 8000
  93. cache := make(map[[20]byte]bytes.Buffer);
  94. nChunks := fileSize / chunkSize + 1
  95.  
  96. fmt.Printf("chunks to send: %d\n", fileSize / chunkSize + 1);
  97.  
  98. connection, err := server.Accept()
  99. encoder := gob.NewEncoder(connection)
  100. sendFileName(connection, path, fileSize, nChunks, encoder)
  101. runCompressionEngine(connection, cache, &fileReader, chunkSize, fileSize, nChunks, encoder);
  102. }
  103.  
  104.  
  105. func sendFileName(connection net.Conn, fileName string, fileSize int64, chunknum int64, encoder *gob.Encoder){
  106. p := &File{NAME:fileName, SIZE:fileSize, CHUNKNUM:chunknum}
  107. encoder.Encode(p)
  108.  
  109. }
  110.  
  111.  
  112. /* Maps SHA fingerprints to chunks. The values are compressed chunks.
  113. */
  114. func runCompressionEngine(connection net.Conn, cache map[[20]byte]bytes.Buffer, reader *bufio.Reader, chunkSize int64, fileSize int64, nChunks int64, encoder *gob.Encoder) {
  115. var threadNum int = 8
  116. var k int = 0
  117. var wg sync.WaitGroup
  118. var l sync.Mutex
  119. send_channel := make(chan *Chunk, nChunks)
  120. comp_channel := make(chan *UncompressedChunkData, nChunks)
  121. var trailingSize, restSize int64
  122.  
  123.  
  124. /* Whole file can fit into chunk */
  125. if fileSize <= chunkSize {
  126. firstBuf := make([]byte, chunkSize)
  127. io.ReadAtLeast(reader, firstBuf, int(chunkSize))
  128. chunk := UncompressedChunkData{buffer: firstBuf, sendSize: fileSize, chunkSize: fileSize, orderNumber: 0}
  129. comp_channel<-&chunk
  130.  
  131. /* We have to iterate
  132. * over whole file chunk by chunk. We might need to read the trailing chunk
  133. * which is smaller than chunkSize.
  134. */
  135. } else {
  136. i := int64(0)
  137. midBuffer := make([]byte, chunkSize)
  138. io.ReadAtLeast(reader, midBuffer, int(chunkSize))
  139. chunk := UncompressedChunkData{buffer:midBuffer, sendSize: chunkSize, chunkSize: chunkSize, orderNumber: i}
  140. comp_channel<-&chunk
  141.  
  142. for i < nChunks - 1{
  143. i++
  144. restSize = fileSize - (i * chunkSize)
  145.  
  146. /* Used for trailing chunk which is
  147. * smaller than chunkSize. Maybe we don't
  148. * need lastBuf? Rare case
  149. */
  150. if restSize <= chunkSize {
  151. trailingSize = restSize
  152. lastBuf := make([]byte, trailingSize)
  153. io.ReadAtLeast(reader, lastBuf, int(trailingSize))
  154. chunk := UncompressedChunkData{buffer:lastBuf, sendSize: trailingSize, chunkSize: chunkSize, orderNumber: i}
  155. comp_channel<-&chunk
  156. } else{
  157. mBuffer := make([]byte, chunkSize)
  158. io.ReadAtLeast(reader, mBuffer, int(chunkSize))
  159. chunk := UncompressedChunkData{buffer:mBuffer, sendSize: chunkSize, chunkSize: chunkSize, orderNumber: i}
  160. comp_channel<-&chunk
  161. }
  162. }
  163.  
  164. close(comp_channel)
  165.  
  166. /* Concurrent mapping and compression
  167. */
  168. for k < threadNum {
  169. wg.Add(1)
  170. go mapAndSend(cache, connection, comp_channel, send_channel, &wg, &l, encoder)
  171. k++
  172. }
  173.  
  174. wg.Wait()
  175.  
  176. close(send_channel)
  177. sendChunk(connection, send_channel, &l, encoder)
  178. }
  179. }
  180.  
  181.  
  182. /* Compresses 'buffer' and maps it to the given 'cache' using 'chunkSize'.
  183. * Also send the data through the socket.
  184. */
  185. func mapAndSend(cache map[[20]byte]bytes.Buffer, connection net.Conn, compChan chan *UncompressedChunkData, sendChan chan *Chunk, wg *sync.WaitGroup, l *sync.Mutex, encoder *gob.Encoder) {
  186.  
  187. /* Loop through every chunk in the compression channel */
  188.  
  189. for chunk := range compChan{
  190.  
  191. fpInput := make([]byte, chunk.chunkSize);
  192. copy(fpInput[:], chunk.buffer[:]);
  193. idx := sha1.Sum(fpInput);
  194.  
  195. /* Critical section */
  196. l.Lock()
  197. _, ok := cache[idx];
  198.  
  199. /* Already in cache. Send just fingerprint */
  200. if ok {
  201. l.Unlock()
  202. cSize := int64(0)
  203. pkg := Chunk{CHUNKSIZE: chunk.chunkSize, ISCONTENT:false, FP:idx, COMPRESSED_SIZE:cSize, DECOMPRESSED_SIZE:chunk.sendSize, ORDER:chunk.orderNumber}
  204.  
  205. /* Add the new header to the send channel */
  206. sendChan<-&pkg
  207. sendChunk(connection, sendChan, l, encoder)
  208.  
  209. /* Not in cache. Compress buffer, map it using
  210. * the fingerpring, send header and compressed buffer
  211. */
  212. } else {
  213. l.Unlock()
  214. b := compressedBuffer(chunk.buffer)
  215. l.Lock()
  216. cache[idx] = b
  217. l.Unlock()
  218. cSize := int64(b.Len())
  219. body := make([]byte, cSize)
  220. copy(body, b.Bytes())
  221.  
  222. pkg := Chunk{CHUNKSIZE: chunk.chunkSize, ISCONTENT:true, FP:idx, COMPRESSED_SIZE:cSize, DECOMPRESSED_SIZE:chunk.sendSize, ORDER:chunk.orderNumber, BODY:body}
  223.  
  224. /* Add the new header to the send channel */
  225. sendChan<-&pkg
  226. sendChunk(connection, sendChan, l, encoder)
  227. }
  228. }
  229. wg.Done()
  230. }
  231.  
  232.  
  233. /* Compresses and returns 'buffer' */
  234. func compressedBuffer(buffer []byte) bytes.Buffer {
  235. var b bytes.Buffer
  236. gz := gzip.NewWriter(&b)
  237.  
  238. if _, err := gz.Write(buffer); err != nil {
  239. panic(err)
  240. }
  241. if err := gz.Flush(); err != nil {
  242. panic(err)
  243. }
  244. if err := gz.Close(); err != nil {
  245. panic(err)
  246. }
  247. return b
  248. }
  249.  
  250.  
  251. /* Sends 'header' through the socket in an encoded format */
  252. func sendChunk(connection net.Conn, channel chan *Chunk, l *sync.Mutex, encoder *gob.Encoder){
  253.  
  254. //Loop through and send every element in the channel
  255. for p := range channel{
  256. l.Lock()
  257. chunk := &TempChunk{CHUNKSIZE: p.CHUNKSIZE, ISCONTENT: p.ISCONTENT, FP: p.FP, COMPRESSED_SIZE: p.COMPRESSED_SIZE, DECOMPRESSED_SIZE: p.DECOMPRESSED_SIZE, ORDER:p.ORDER}
  258. c := make([]byte, p.COMPRESSED_SIZE)
  259. c = p.BODY[:]
  260.  
  261. err := encoder.Encode(chunk)
  262. if err != nil {
  263. panic(err)
  264. }
  265.  
  266. err = encoder.Encode(c)
  267. if err != nil {
  268. panic(err)
  269. }
  270.  
  271. l.Unlock()
  272. if len(channel) == 0 {
  273. return
  274. }
  275. }
  276. }
  277.  
  278.  
  279.  
  280.  
  281.  
  282. /* Returns a file stream from the file on a remote server */
  283. func getFileStream(session *ssh.Session, path string) (bufio.Reader) {
  284. r, err := session.StdoutPipe()
  285. if err != nil {
  286. panic(err)
  287. }
  288.  
  289. err = session.Start("cat " + path)
  290. if err != nil {
  291. panic(err)
  292. }
  293.  
  294. return *bufio.NewReader(r)
  295. }
  296.  
  297.  
  298. /* Returns the filesize from the file located at the SSH server */
  299. func getFileSize(session *ssh.Session, path string) int64 {
  300. r, err := session.StdoutPipe()
  301. if err != nil {
  302. panic(err)
  303. }
  304.  
  305. /* Get size of file using wc from UNIX */
  306. err = session.Start("wc -c "+ path)
  307. if err != nil {
  308. panic(err)
  309. }
  310.  
  311. reader := bufio.NewReader(r)
  312. b, err := reader.ReadBytes(' ')
  313. if err != nil {
  314. panic(err)
  315. }
  316.  
  317. /* Don't want the trailing whitespace */
  318. k := string(b[:len(b) - 1])
  319.  
  320. /* From string to int64 */
  321. size, err := strconv.ParseInt(k, 10, 64)
  322. if err != nil {
  323. panic(err)
  324. }
  325.  
  326. return size
  327. }
  328.  
  329. /* Establishes a connection to the SSH server, thus making it
  330. * possible to run commands on it.
  331. */
  332. func establishConnection(config *ssh.ClientConfig) *ssh.Session {
  333. connection, err := ssh.Dial("tcp", "lgserv3.stud.cs.uit.no:22", config)
  334. if err != nil {
  335. panic(err)
  336. }
  337.  
  338. session, err := connection.NewSession()
  339. if err != nil {
  340. panic(err)
  341. }
  342.  
  343. return session
  344. }
  345.  
  346.  
  347. /* Prompts for username and password used to SSH session */
  348. func credentials() (string, string) {
  349. reader := bufio.NewReader(os.Stdin)
  350.  
  351. fmt.Printf("==== COMPRESSION ENGINE ====\n")
  352. fmt.Print("Username: ")
  353. username, _ := reader.ReadString('\n')
  354.  
  355. fmt.Print("Password: ")
  356. bytePassword, _ := terminal.ReadPassword(int(syscall.Stdin))
  357.  
  358. password := string(bytePassword)
  359.  
  360. return strings.TrimSpace(username), strings.TrimSpace(password)
  361. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement