Advertisement
Guest User

Untitled

a guest
Oct 11th, 2016
71
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 15.37 KB | None | 0 0
  1. SENDER
  2.  
  3.  
  4. package main
  5.  
  6. import(
  7. "os"
  8. "encoding/gob"
  9. "net"
  10. "fmt"
  11. "crypto/sha1"
  12. "bytes"
  13. "bufio"
  14. "compress/gzip"
  15. "sync"
  16. "io"
  17. "syscall"
  18. "strings"
  19. "golang.org/x/crypto/ssh"
  20. "golang.org/x/crypto/ssh/terminal"
  21. "strconv"
  22. "time"
  23. )
  24.  
  25. // What we need to compress a chunk
  26. type ChunkData struct {
  27. buffer []byte
  28. sendSize int64
  29. chunkSize int64
  30. orderNumber int64
  31. }
  32.  
  33. // Describes a chunk and body (compressed stuff)
  34. type Header struct {
  35. CHUNKSIZE int64
  36. ISCONTENT bool
  37. FP [20]byte
  38. COMPRESSED_SIZE int64
  39. package main
  40. import(
  41. "fmt"
  42. "bytes"
  43. "net"
  44. "strings"
  45. "encoding/gob"
  46. "compress/gzip"
  47. // "io"
  48. "os"
  49. )
  50.  
  51. type File struct {
  52. NAME string
  53. SIZE int64
  54. CHUNKNUM int64
  55. }
  56.  
  57. type Chunk struct {
  58. CHUNKSIZE int64
  59. ISCONTENT bool
  60. FP [20]byte
  61. COMPRESSED_SIZE int64
  62. DECOMPRESSED_SIZE int64
  63. ORDER int64
  64. }
  65.  
  66. func main () {
  67. cache := make(map[[20]byte]bytes.Buffer)
  68. runClient(cache)
  69. }
  70.  
  71.  
  72. func runClient(cache map[[20]byte]bytes.Buffer) {
  73. conn, _ := net.Dial("tcp", "localhost:8080")
  74. var fileName string
  75. var fileSize int64
  76. var chunkNum int64
  77. var i int64 = int64(0)
  78. /* Need filename and size */
  79. for {
  80. fileName, fileSize, chunkNum = fileInfo(conn)
  81. break
  82. }
  83.  
  84. /* File to be written to */
  85. file, err := os.Create("output.dat")
  86. if err != nil {
  87. panic(err)
  88. }
  89. fmt.Printf("File \"%s\" created. Expected size = %d number of chunks: %d. \n", fileName, fileSize, chunkNum)
  90.  
  91. /* Recieve all file content there is */
  92. fmt.Printf("CHUNKNUM is %d\n", chunkNum)
  93.  
  94. decoder := gob.NewDecoder(conn)
  95.  
  96. for i < chunkNum{
  97. i++
  98. recvFile(cache, conn, file, decoder)
  99. }
  100.  
  101. file.Close()
  102. conn.Close()
  103. }
  104.  
  105.  
  106. func fileInfo(conn net.Conn) (string, int64, int64) {
  107. dec := gob.NewDecoder(conn)
  108. f := &File{}
  109. dec.Decode(f)
  110. tmp1 := f.NAME
  111. tmp2 := strings.Split(tmp1, ".")
  112. tmp3 := tmp2[1]
  113. tmp4 := tmp3 + "_output_" + f.NAME
  114. return tmp4, f.SIZE, f.CHUNKNUM
  115. }
  116.  
  117.  
  118. func recvFile(cache map[[20]byte]bytes.Buffer, conn net.Conn, file *os.File, dec *gob.Decoder) {
  119. /* Used for decoding the header from socket */
  120. newChunk := &Chunk{}
  121.  
  122. dec.Decode(newChunk)
  123. if newChunk.DECOMPRESSED_SIZE == 0 {
  124. return
  125. }
  126.  
  127. b := make([]byte, newChunk.COMPRESSED_SIZE)
  128. dec.Decode(&b)
  129.  
  130. /* If the chunk already exists in cache, output content to file.
  131. * Nothing more to be done
  132. */
  133. _, ok := cache[newChunk.FP]
  134. if(ok == true) {
  135. offset := (newChunk.ORDER * newChunk.CHUNKSIZE)
  136. outputToFile(cache, newChunk.FP, newChunk.DECOMPRESSED_SIZE, file, offset)
  137. return
  138. }
  139. /* 'buf' is used for the compressed file content.
  140. * Need to decompress it. Make an entry in cache used for outputting
  141. * the correct content
  142. */
  143. buf := bytes.NewBuffer(b)
  144. cache[newChunk.FP] = *buf
  145. offset := (newChunk.ORDER * newChunk.CHUNKSIZE)
  146. outputToFile(cache, newChunk.FP, newChunk.DECOMPRESSED_SIZE, file, offset)
  147. }
  148.  
  149.  
  150. func decompress(cache map[[20]byte]bytes.Buffer, idx [20]byte, size int64) []byte{
  151. decompressed := make([]byte, size)
  152. compressed := cache[idx]
  153.  
  154. gz, err := gzip.NewReader(&compressed)
  155. if err != nil {
  156. panic(err)
  157. }
  158. gz.Read(decompressed)
  159. return decompressed
  160. }
  161.  
  162.  
  163. func outputToFile(cache map[[20]byte]bytes.Buffer, idx [20]byte, size int64, file *os.File, offset int64) {
  164. cont := decompress(cache, idx, size)
  165. file.WriteAt(cont, offset)
  166. }
  167.  
  168. DECOMPRESSED_SIZE int64
  169. ORDER int64
  170. }
  171.  
  172. // What is to send
  173. type Chunk struct {
  174. CHUNKSIZE int64
  175. ISCONTENT bool
  176. FP [20]byte
  177. COMPRESSED_SIZE int64
  178. DECOMPRESSED_SIZE int64
  179. ORDER int64
  180. BODY []byte
  181. }
  182.  
  183.  
  184. type File struct {
  185. NAME string
  186. SIZE int64
  187. CHUNKNUM int64
  188. }
  189.  
  190. //8Zaynab8
  191.  
  192. func main() {
  193.  
  194. /* The only file to use - ever. Also need
  195. * password and username from user
  196. */
  197. path := "/data/inf2202/uniprot_sprot.dat"
  198. username, password := credentials()
  199.  
  200. /* Describes the session */
  201. sshConfig := &ssh.ClientConfig{
  202. User: username,
  203. Auth: []ssh.AuthMethod{
  204. ssh.Password(password)},
  205. }
  206. sshConfig.SetDefaults()
  207.  
  208. session := establishConnection(sshConfig)
  209.  
  210. /* Size of file in bytes */
  211. fileSize := getFileSize(session, path)
  212. session.Close()
  213.  
  214. /* Need to re-open it because it can only
  215. * handle one command per session...
  216. */
  217. session = establishConnection(sshConfig)
  218.  
  219. /* Reader from the remote file */
  220. fileReader := getFileStream(session, path)
  221.  
  222. /* Connection stuff */
  223. server, err := net.Listen("tcp", "localhost:8080")
  224. if err != nil {
  225. panic(err)
  226. }
  227. var chunkSize int64 = 6000
  228. cache := make(map[[20]byte]bytes.Buffer);
  229. nChunks := fileSize / chunkSize + 1
  230.  
  231. fmt.Printf("chunks to send: %d\n", fileSize / chunkSize + 1);
  232.  
  233. connection, err := server.Accept()
  234. encoder := gob.NewEncoder(connection)
  235. sendFileName(connection, path, fileSize, nChunks)
  236. runCompressionEngine(connection, cache, &fileReader, chunkSize, fileSize, nChunks, encoder);
  237. }
  238.  
  239.  
  240. func sendFileName(connection net.Conn, fileName string, fileSize int64, chunknum int64){
  241. p := &File{NAME:fileName, SIZE:fileSize, CHUNKNUM:chunknum}
  242. encoder := gob.NewEncoder(connection)
  243. encoder.Encode(p)
  244.  
  245. }
  246.  
  247.  
  248. /* Maps SHA fingerprints to chunks. The values are compressed chunks.
  249. */
  250. func runCompressionEngine(connection net.Conn, cache map[[20]byte]bytes.Buffer, reader *bufio.Reader, chunkSize int64, fileSize int64, nChunks int64, encoder *gob.Encoder) {
  251. /* Can fit whole file into buffer or we have to iterate
  252. * over whole file chunk by chunk. We might need to read the trailing chunk
  253. * which is smaller than chunkSize. THIS OUGHT TO WORK
  254. */
  255. var threadNum int = 5
  256. var k int = 0
  257.  
  258. var wg sync.WaitGroup
  259. var l sync.Mutex
  260. send_channel := make(chan *Chunk, nChunks)
  261. comp_channel := make(chan *ChunkData, nChunks)
  262. var trailingSize, restSize int64
  263.  
  264. start := time.Now()
  265.  
  266. if fileSize <= chunkSize {
  267. firstBuf := make([]byte, chunkSize)
  268. io.ReadAtLeast(reader, firstBuf, int(chunkSize))
  269. chunk := ChunkData{buffer: firstBuf, sendSize: fileSize, chunkSize: fileSize, orderNumber: 0}
  270. comp_channel<-&chunk
  271.  
  272. } else {
  273. i := int64(0)
  274. midBuffer := make([]byte, chunkSize)
  275. io.ReadAtLeast(reader, midBuffer, int(chunkSize))
  276. chunk := ChunkData{buffer:midBuffer, sendSize: chunkSize, chunkSize: chunkSize, orderNumber: i}
  277. comp_channel<-&chunk
  278.  
  279. for i < nChunks - 1{
  280. i++
  281. restSize = fileSize - (i * chunkSize)
  282.  
  283. /* Used for trailing chunk which is
  284. * smaller than chunkSize. Maybe we don't
  285. * need lastBuf? Rare case
  286. */
  287. if restSize <= chunkSize {
  288. trailingSize = restSize
  289. lastBuf := make([]byte, trailingSize)
  290. io.ReadAtLeast(reader, lastBuf, int(trailingSize))
  291. chunk := ChunkData{buffer:lastBuf, sendSize: trailingSize, chunkSize: chunkSize, orderNumber: i}
  292. comp_channel<-&chunk
  293. } else{
  294. mBuffer := make([]byte, chunkSize)
  295. io.ReadAtLeast(reader, mBuffer, int(chunkSize))
  296. chunk := ChunkData{buffer:mBuffer, sendSize: chunkSize, chunkSize: chunkSize, orderNumber: i}
  297. comp_channel<-&chunk
  298. }
  299. }
  300.  
  301. close(comp_channel)
  302.  
  303. //do concurrent mapping and compression
  304. for k < threadNum {
  305. wg.Add(1)
  306. go mapAndSend(cache, connection, comp_channel, send_channel, &wg, &l, encoder)
  307. k++
  308. }
  309.  
  310. wg.Wait()
  311. //close(send_channel)
  312. //send all elements in the sending channel
  313.  
  314. sendHeader(connection, send_channel, &l, encoder)
  315. elapsed := time.Since(start)
  316. fmt.Printf("time: %s", elapsed)
  317. }
  318. }
  319.  
  320.  
  321. /* Compresses 'buffer' and maps it to the given 'cache' using 'chunkSize'.
  322. * Also send the data through the socket.
  323. */
  324. func mapAndSend(cache map[[20]byte]bytes.Buffer, connection net.Conn, compChan chan *ChunkData, sendChan chan *Chunk, wg *sync.WaitGroup, l *sync.Mutex, encoder *gob.Encoder) {
  325.  
  326. /* Loop through every chunk in the compression channel */
  327.  
  328. for chunk := range compChan{
  329.  
  330. fpInput := make([]byte, chunk.chunkSize);
  331. copy(fpInput[:], chunk.buffer[:]);
  332. idx := sha1.Sum(fpInput);
  333. /* Already in cache. Send just fingerprint */
  334.  
  335. l.Lock()
  336. _, ok := cache[idx];
  337. if ok {
  338. l.Unlock()
  339. cSize := int64(0)
  340. pkg := Chunk{CHUNKSIZE: chunk.chunkSize, ISCONTENT:false, FP:idx, COMPRESSED_SIZE:cSize, DECOMPRESSED_SIZE:chunk.sendSize, ORDER:chunk.orderNumber}
  341.  
  342. //add the new header to the send channel
  343. sendChan<-&pkg
  344. sendHeader(connection, sendChan, l, encoder)
  345.  
  346. /* Not in cache. Compress buffer, map it using
  347. * the fingerpring, send header and compressed buffer
  348. */
  349. } else {
  350. l.Unlock()
  351. b := compressedBuffer(chunk.buffer)
  352. l.Lock()
  353. cache[idx] = b
  354. l.Unlock()
  355. cSize := int64(b.Len())
  356. body := make([]byte, cSize)
  357. copy(body, b.Bytes())
  358.  
  359. pkg := Chunk{CHUNKSIZE: chunk.chunkSize, ISCONTENT:true, FP:idx, COMPRESSED_SIZE:cSize, DECOMPRESSED_SIZE:chunk.sendSize, ORDER:chunk.orderNumber, BODY:body}
  360.  
  361. //add the new header to the send channel
  362. sendChan<-&pkg
  363. sendHeader(connection, sendChan, l, encoder)
  364. }
  365. }
  366. wg.Done()
  367. }
  368.  
  369.  
  370. /* Compresses and returns 'buffer' */
  371. func compressedBuffer(buffer []byte) bytes.Buffer {
  372. var b bytes.Buffer
  373. gz := gzip.NewWriter(&b)
  374.  
  375. if _, err := gz.Write(buffer); err != nil {
  376. panic(err)
  377. }
  378. if err := gz.Flush(); err != nil {
  379. panic(err)
  380. }
  381. if err := gz.Close(); err != nil {
  382. panic(err)
  383. }
  384. return b
  385. }
  386.  
  387.  
  388. /* Sends 'header' through the socket in an encoded format */
  389. func sendHeader(connection net.Conn, channel chan *Chunk, l *sync.Mutex, encoder *gob.Encoder){
  390.  
  391. //Loop through and send every element in the channel
  392. for p := range channel{
  393. l.Lock()
  394. chunk := &Header{CHUNKSIZE: p.CHUNKSIZE, ISCONTENT: p.ISCONTENT, FP: p.FP, COMPRESSED_SIZE: p.COMPRESSED_SIZE, DECOMPRESSED_SIZE: p.DECOMPRESSED_SIZE, ORDER:p.ORDER}
  395. c := make([]byte, p.COMPRESSED_SIZE)
  396. c = p.BODY[:]
  397.  
  398. err := encoder.Encode(chunk)
  399. if err != nil {
  400. panic(err)
  401. }
  402.  
  403. err = encoder.Encode(c)
  404. if err != nil {
  405. panic(err)
  406. }
  407.  
  408. l.Unlock()
  409. if len(channel) == 0 {
  410. return
  411. }
  412. }
  413. }
  414.  
  415.  
  416.  
  417.  
  418.  
  419. /* Returns a file stream from the file on a remote server */
  420. func getFileStream(session *ssh.Session, path string) (bufio.Reader) {
  421. r, err := session.StdoutPipe()
  422. if err != nil {
  423. panic(err)
  424. }
  425.  
  426. err = session.Start("cat " + path)
  427. if err != nil {
  428. panic(err)
  429. }
  430.  
  431. return *bufio.NewReader(r)
  432. }
  433.  
  434.  
  435. /* Returns the filesize from the file located at the SSH server */
  436. func getFileSize(session *ssh.Session, path string) int64 {
  437. r, err := session.StdoutPipe()
  438. if err != nil {
  439. panic(err)
  440. }
  441.  
  442. /* Get size of file using wc from UNIX */
  443. err = session.Start("wc -c "+ path)
  444. if err != nil {
  445. panic(err)
  446. }
  447.  
  448. reader := bufio.NewReader(r)
  449. b, err := reader.ReadBytes(' ')
  450. if err != nil {
  451. panic(err)
  452. }
  453.  
  454. /* Don't want the trailing whitespace */
  455. k := string(b[:len(b) - 1])
  456.  
  457. /* From string to int64 */
  458. size, err := strconv.ParseInt(k, 10, 64)
  459. if err != nil {
  460. panic(err)
  461. }
  462.  
  463. return size
  464. }
  465.  
  466. /* Establishes a connection to the SSH server, thus making it
  467. * possible to run commands on it.
  468. */
  469. func establishConnection(config *ssh.ClientConfig) *ssh.Session {
  470. connection, err := ssh.Dial("tcp", "lgserv3.stud.cs.uit.no:22", config)
  471. if err != nil {
  472. panic(err)
  473. } else {
  474. fmt.Printf("\nSession OK\n")
  475. }
  476.  
  477. session, err := connection.NewSession()
  478. if err != nil {
  479. panic(err)
  480. }
  481.  
  482. return session
  483. }
  484.  
  485.  
  486. /* Prompts for username and password used to SSH session */
  487. func credentials() (string, string) {
  488. reader := bufio.NewReader(os.Stdin)
  489.  
  490. fmt.Printf("==== COMPRESSION ENGINE ====\n")
  491. fmt.Print("Username: ")
  492. username, _ := reader.ReadString('\n')
  493.  
  494. fmt.Print("Password: ")
  495. bytePassword, _ := terminal.ReadPassword(int(syscall.Stdin))
  496.  
  497. password := string(bytePassword)
  498.  
  499. return strings.TrimSpace(username), strings.TrimSpace(password)
  500. }
  501.  
  502.  
  503. RECEIVER
  504.  
  505.  
  506. package main
  507. import(
  508. "fmt"
  509. "bytes"
  510. "net"
  511. "strings"
  512. "encoding/gob"
  513. "compress/gzip"
  514. // "io"
  515. "os"
  516. )
  517.  
  518. type File struct {
  519. NAME string
  520. SIZE int64
  521. CHUNKNUM int64
  522. }
  523.  
  524. type Chunk struct {
  525. CHUNKSIZE int64
  526. ISCONTENT bool
  527. FP [20]byte
  528. COMPRESSED_SIZE int64
  529. DECOMPRESSED_SIZE int64
  530. ORDER int64
  531. }
  532.  
  533. func main () {
  534. cache := make(map[[20]byte]bytes.Buffer)
  535. runClient(cache)
  536. }
  537.  
  538.  
  539. func runClient(cache map[[20]byte]bytes.Buffer) {
  540. conn, _ := net.Dial("tcp", "localhost:8080")
  541. var fileName string
  542. var fileSize int64
  543. var chunkNum int64
  544. var i int64 = int64(0)
  545. /* Need filename and size */
  546. for {
  547. fileName, fileSize, chunkNum = fileInfo(conn)
  548. break
  549. }
  550.  
  551. /* File to be written to */
  552. file, err := os.Create("output.dat")
  553. if err != nil {
  554. panic(err)
  555. }
  556. fmt.Printf("File \"%s\" created. Expected size = %d number of chunks: %d. \n", fileName, fileSize, chunkNum)
  557.  
  558. /* Recieve all file content there is */
  559. fmt.Printf("CHUNKNUM is %d\n", chunkNum)
  560.  
  561. decoder := gob.NewDecoder(conn)
  562.  
  563. for i < chunkNum{
  564. i++
  565. recvFile(cache, conn, file, decoder)
  566. }
  567.  
  568. file.Close()
  569. conn.Close()
  570. }
  571.  
  572.  
  573. func fileInfo(conn net.Conn) (string, int64, int64) {
  574. dec := gob.NewDecoder(conn)
  575. f := &File{}
  576. dec.Decode(f)
  577. tmp1 := f.NAME
  578. tmp2 := strings.Split(tmp1, ".")
  579. tmp3 := tmp2[1]
  580. tmp4 := tmp3 + "_output_" + f.NAME
  581. return tmp4, f.SIZE, f.CHUNKNUM
  582. }
  583.  
  584.  
  585. func recvFile(cache map[[20]byte]bytes.Buffer, conn net.Conn, file *os.File, dec *gob.Decoder) {
  586. /* Used for decoding the header from socket */
  587. newChunk := &Chunk{}
  588.  
  589. dec.Decode(newChunk)
  590. if newChunk.DECOMPRESSED_SIZE == 0 {
  591. return
  592. }
  593.  
  594. b := make([]byte, newChunk.COMPRESSED_SIZE)
  595. dec.Decode(&b)
  596.  
  597. /* If the chunk already exists in cache, output content to file.
  598. * Nothing more to be done
  599. */
  600. _, ok := cache[newChunk.FP]
  601. if(ok == true) {
  602. offset := (newChunk.ORDER * newChunk.CHUNKSIZE)
  603. outputToFile(cache, newChunk.FP, newChunk.DECOMPRESSED_SIZE, file, offset)
  604. return
  605. }
  606. /* 'buf' is used for the compressed file content.
  607. * Need to decompress it. Make an entry in cache used for outputting
  608. * the correct content
  609. */
  610. buf := bytes.NewBuffer(b)
  611. cache[newChunk.FP] = *buf
  612. offset := (newChunk.ORDER * newChunk.CHUNKSIZE)
  613. outputToFile(cache, newChunk.FP, newChunk.DECOMPRESSED_SIZE, file, offset)
  614. }
  615.  
  616.  
  617. func decompress(cache map[[20]byte]bytes.Buffer, idx [20]byte, size int64) []byte{
  618. decompressed := make([]byte, size)
  619. compressed := cache[idx]
  620.  
  621. gz, err := gzip.NewReader(&compressed)
  622. if err != nil {
  623. panic(err)
  624. }
  625. gz.Read(decompressed)
  626. return decompressed
  627. }
  628.  
  629.  
  630. func outputToFile(cache map[[20]byte]bytes.Buffer, idx [20]byte, size int64, file *os.File, offset int64) {
  631. cont := decompress(cache, idx, size)
  632. file.WriteAt(cont, offset)
  633. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement