Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- SENDER
- package main
- import(
- "os"
- "encoding/gob"
- "net"
- "fmt"
- "crypto/sha1"
- "bytes"
- "bufio"
- "compress/gzip"
- "sync"
- "io"
- "syscall"
- "strings"
- "golang.org/x/crypto/ssh"
- "golang.org/x/crypto/ssh/terminal"
- "strconv"
- "time"
- )
- // What we need to compress a chunk
- type ChunkData struct {
- buffer []byte
- sendSize int64
- chunkSize int64
- orderNumber int64
- }
- // Describes a chunk and body (compressed stuff)
- type Header struct {
- CHUNKSIZE int64
- ISCONTENT bool
- FP [20]byte
- COMPRESSED_SIZE int64
- package main
- import(
- "fmt"
- "bytes"
- "net"
- "strings"
- "encoding/gob"
- "compress/gzip"
- // "io"
- "os"
- )
- type File struct {
- NAME string
- SIZE int64
- CHUNKNUM int64
- }
- type Chunk struct {
- CHUNKSIZE int64
- ISCONTENT bool
- FP [20]byte
- COMPRESSED_SIZE int64
- DECOMPRESSED_SIZE int64
- ORDER int64
- }
- func main () {
- cache := make(map[[20]byte]bytes.Buffer)
- runClient(cache)
- }
- func runClient(cache map[[20]byte]bytes.Buffer) {
- conn, _ := net.Dial("tcp", "localhost:8080")
- var fileName string
- var fileSize int64
- var chunkNum int64
- var i int64 = int64(0)
- /* Need filename and size */
- for {
- fileName, fileSize, chunkNum = fileInfo(conn)
- break
- }
- /* File to be written to */
- file, err := os.Create("output.dat")
- if err != nil {
- panic(err)
- }
- fmt.Printf("File \"%s\" created. Expected size = %d number of chunks: %d. \n", fileName, fileSize, chunkNum)
- /* Recieve all file content there is */
- fmt.Printf("CHUNKNUM is %d\n", chunkNum)
- decoder := gob.NewDecoder(conn)
- for i < chunkNum{
- i++
- recvFile(cache, conn, file, decoder)
- }
- file.Close()
- conn.Close()
- }
- func fileInfo(conn net.Conn) (string, int64, int64) {
- dec := gob.NewDecoder(conn)
- f := &File{}
- dec.Decode(f)
- tmp1 := f.NAME
- tmp2 := strings.Split(tmp1, ".")
- tmp3 := tmp2[1]
- tmp4 := tmp3 + "_output_" + f.NAME
- return tmp4, f.SIZE, f.CHUNKNUM
- }
- func recvFile(cache map[[20]byte]bytes.Buffer, conn net.Conn, file *os.File, dec *gob.Decoder) {
- /* Used for decoding the header from socket */
- newChunk := &Chunk{}
- dec.Decode(newChunk)
- if newChunk.DECOMPRESSED_SIZE == 0 {
- return
- }
- b := make([]byte, newChunk.COMPRESSED_SIZE)
- dec.Decode(&b)
- /* If the chunk already exists in cache, output content to file.
- * Nothing more to be done
- */
- _, ok := cache[newChunk.FP]
- if(ok == true) {
- offset := (newChunk.ORDER * newChunk.CHUNKSIZE)
- outputToFile(cache, newChunk.FP, newChunk.DECOMPRESSED_SIZE, file, offset)
- return
- }
- /* 'buf' is used for the compressed file content.
- * Need to decompress it. Make an entry in cache used for outputting
- * the correct content
- */
- buf := bytes.NewBuffer(b)
- cache[newChunk.FP] = *buf
- offset := (newChunk.ORDER * newChunk.CHUNKSIZE)
- outputToFile(cache, newChunk.FP, newChunk.DECOMPRESSED_SIZE, file, offset)
- }
- func decompress(cache map[[20]byte]bytes.Buffer, idx [20]byte, size int64) []byte{
- decompressed := make([]byte, size)
- compressed := cache[idx]
- gz, err := gzip.NewReader(&compressed)
- if err != nil {
- panic(err)
- }
- gz.Read(decompressed)
- return decompressed
- }
- func outputToFile(cache map[[20]byte]bytes.Buffer, idx [20]byte, size int64, file *os.File, offset int64) {
- cont := decompress(cache, idx, size)
- file.WriteAt(cont, offset)
- }
- DECOMPRESSED_SIZE int64
- ORDER int64
- }
- // What is to send
- type Chunk struct {
- CHUNKSIZE int64
- ISCONTENT bool
- FP [20]byte
- COMPRESSED_SIZE int64
- DECOMPRESSED_SIZE int64
- ORDER int64
- BODY []byte
- }
- type File struct {
- NAME string
- SIZE int64
- CHUNKNUM int64
- }
- //8Zaynab8
- func main() {
- /* The only file to use - ever. Also need
- * password and username from user
- */
- path := "/data/inf2202/uniprot_sprot.dat"
- username, password := credentials()
- /* Describes the session */
- sshConfig := &ssh.ClientConfig{
- User: username,
- Auth: []ssh.AuthMethod{
- ssh.Password(password)},
- }
- sshConfig.SetDefaults()
- session := establishConnection(sshConfig)
- /* Size of file in bytes */
- fileSize := getFileSize(session, path)
- session.Close()
- /* Need to re-open it because it can only
- * handle one command per session...
- */
- session = establishConnection(sshConfig)
- /* Reader from the remote file */
- fileReader := getFileStream(session, path)
- /* Connection stuff */
- server, err := net.Listen("tcp", "localhost:8080")
- if err != nil {
- panic(err)
- }
- var chunkSize int64 = 6000
- cache := make(map[[20]byte]bytes.Buffer);
- nChunks := fileSize / chunkSize + 1
- fmt.Printf("chunks to send: %d\n", fileSize / chunkSize + 1);
- connection, err := server.Accept()
- encoder := gob.NewEncoder(connection)
- sendFileName(connection, path, fileSize, nChunks)
- runCompressionEngine(connection, cache, &fileReader, chunkSize, fileSize, nChunks, encoder);
- }
- func sendFileName(connection net.Conn, fileName string, fileSize int64, chunknum int64){
- p := &File{NAME:fileName, SIZE:fileSize, CHUNKNUM:chunknum}
- encoder := gob.NewEncoder(connection)
- encoder.Encode(p)
- }
- /* Maps SHA fingerprints to chunks. The values are compressed chunks.
- */
- func runCompressionEngine(connection net.Conn, cache map[[20]byte]bytes.Buffer, reader *bufio.Reader, chunkSize int64, fileSize int64, nChunks int64, encoder *gob.Encoder) {
- /* Can fit whole file into buffer or we have to iterate
- * over whole file chunk by chunk. We might need to read the trailing chunk
- * which is smaller than chunkSize. THIS OUGHT TO WORK
- */
- var threadNum int = 5
- var k int = 0
- var wg sync.WaitGroup
- var l sync.Mutex
- send_channel := make(chan *Chunk, nChunks)
- comp_channel := make(chan *ChunkData, nChunks)
- var trailingSize, restSize int64
- start := time.Now()
- if fileSize <= chunkSize {
- firstBuf := make([]byte, chunkSize)
- io.ReadAtLeast(reader, firstBuf, int(chunkSize))
- chunk := ChunkData{buffer: firstBuf, sendSize: fileSize, chunkSize: fileSize, orderNumber: 0}
- comp_channel<-&chunk
- } else {
- i := int64(0)
- midBuffer := make([]byte, chunkSize)
- io.ReadAtLeast(reader, midBuffer, int(chunkSize))
- chunk := ChunkData{buffer:midBuffer, sendSize: chunkSize, chunkSize: chunkSize, orderNumber: i}
- comp_channel<-&chunk
- for i < nChunks - 1{
- i++
- restSize = fileSize - (i * chunkSize)
- /* Used for trailing chunk which is
- * smaller than chunkSize. Maybe we don't
- * need lastBuf? Rare case
- */
- if restSize <= chunkSize {
- trailingSize = restSize
- lastBuf := make([]byte, trailingSize)
- io.ReadAtLeast(reader, lastBuf, int(trailingSize))
- chunk := ChunkData{buffer:lastBuf, sendSize: trailingSize, chunkSize: chunkSize, orderNumber: i}
- comp_channel<-&chunk
- } else{
- mBuffer := make([]byte, chunkSize)
- io.ReadAtLeast(reader, mBuffer, int(chunkSize))
- chunk := ChunkData{buffer:mBuffer, sendSize: chunkSize, chunkSize: chunkSize, orderNumber: i}
- comp_channel<-&chunk
- }
- }
- close(comp_channel)
- //do concurrent mapping and compression
- for k < threadNum {
- wg.Add(1)
- go mapAndSend(cache, connection, comp_channel, send_channel, &wg, &l, encoder)
- k++
- }
- wg.Wait()
- //close(send_channel)
- //send all elements in the sending channel
- sendHeader(connection, send_channel, &l, encoder)
- elapsed := time.Since(start)
- fmt.Printf("time: %s", elapsed)
- }
- }
- /* Compresses 'buffer' and maps it to the given 'cache' using 'chunkSize'.
- * Also send the data through the socket.
- */
- func mapAndSend(cache map[[20]byte]bytes.Buffer, connection net.Conn, compChan chan *ChunkData, sendChan chan *Chunk, wg *sync.WaitGroup, l *sync.Mutex, encoder *gob.Encoder) {
- /* Loop through every chunk in the compression channel */
- for chunk := range compChan{
- fpInput := make([]byte, chunk.chunkSize);
- copy(fpInput[:], chunk.buffer[:]);
- idx := sha1.Sum(fpInput);
- /* Already in cache. Send just fingerprint */
- l.Lock()
- _, ok := cache[idx];
- if ok {
- l.Unlock()
- cSize := int64(0)
- pkg := Chunk{CHUNKSIZE: chunk.chunkSize, ISCONTENT:false, FP:idx, COMPRESSED_SIZE:cSize, DECOMPRESSED_SIZE:chunk.sendSize, ORDER:chunk.orderNumber}
- //add the new header to the send channel
- sendChan<-&pkg
- sendHeader(connection, sendChan, l, encoder)
- /* Not in cache. Compress buffer, map it using
- * the fingerpring, send header and compressed buffer
- */
- } else {
- l.Unlock()
- b := compressedBuffer(chunk.buffer)
- l.Lock()
- cache[idx] = b
- l.Unlock()
- cSize := int64(b.Len())
- body := make([]byte, cSize)
- copy(body, b.Bytes())
- pkg := Chunk{CHUNKSIZE: chunk.chunkSize, ISCONTENT:true, FP:idx, COMPRESSED_SIZE:cSize, DECOMPRESSED_SIZE:chunk.sendSize, ORDER:chunk.orderNumber, BODY:body}
- //add the new header to the send channel
- sendChan<-&pkg
- sendHeader(connection, sendChan, l, encoder)
- }
- }
- wg.Done()
- }
- /* Compresses and returns 'buffer' */
- func compressedBuffer(buffer []byte) bytes.Buffer {
- var b bytes.Buffer
- gz := gzip.NewWriter(&b)
- if _, err := gz.Write(buffer); err != nil {
- panic(err)
- }
- if err := gz.Flush(); err != nil {
- panic(err)
- }
- if err := gz.Close(); err != nil {
- panic(err)
- }
- return b
- }
- /* Sends 'header' through the socket in an encoded format */
- func sendHeader(connection net.Conn, channel chan *Chunk, l *sync.Mutex, encoder *gob.Encoder){
- //Loop through and send every element in the channel
- for p := range channel{
- l.Lock()
- chunk := &Header{CHUNKSIZE: p.CHUNKSIZE, ISCONTENT: p.ISCONTENT, FP: p.FP, COMPRESSED_SIZE: p.COMPRESSED_SIZE, DECOMPRESSED_SIZE: p.DECOMPRESSED_SIZE, ORDER:p.ORDER}
- c := make([]byte, p.COMPRESSED_SIZE)
- c = p.BODY[:]
- err := encoder.Encode(chunk)
- if err != nil {
- panic(err)
- }
- err = encoder.Encode(c)
- if err != nil {
- panic(err)
- }
- l.Unlock()
- if len(channel) == 0 {
- return
- }
- }
- }
- /* Returns a file stream from the file on a remote server */
- func getFileStream(session *ssh.Session, path string) (bufio.Reader) {
- r, err := session.StdoutPipe()
- if err != nil {
- panic(err)
- }
- err = session.Start("cat " + path)
- if err != nil {
- panic(err)
- }
- return *bufio.NewReader(r)
- }
- /* Returns the filesize from the file located at the SSH server */
- func getFileSize(session *ssh.Session, path string) int64 {
- r, err := session.StdoutPipe()
- if err != nil {
- panic(err)
- }
- /* Get size of file using wc from UNIX */
- err = session.Start("wc -c "+ path)
- if err != nil {
- panic(err)
- }
- reader := bufio.NewReader(r)
- b, err := reader.ReadBytes(' ')
- if err != nil {
- panic(err)
- }
- /* Don't want the trailing whitespace */
- k := string(b[:len(b) - 1])
- /* From string to int64 */
- size, err := strconv.ParseInt(k, 10, 64)
- if err != nil {
- panic(err)
- }
- return size
- }
- /* Establishes a connection to the SSH server, thus making it
- * possible to run commands on it.
- */
- func establishConnection(config *ssh.ClientConfig) *ssh.Session {
- connection, err := ssh.Dial("tcp", "lgserv3.stud.cs.uit.no:22", config)
- if err != nil {
- panic(err)
- } else {
- fmt.Printf("\nSession OK\n")
- }
- session, err := connection.NewSession()
- if err != nil {
- panic(err)
- }
- return session
- }
- /* Prompts for username and password used to SSH session */
- func credentials() (string, string) {
- reader := bufio.NewReader(os.Stdin)
- fmt.Printf("==== COMPRESSION ENGINE ====\n")
- fmt.Print("Username: ")
- username, _ := reader.ReadString('\n')
- fmt.Print("Password: ")
- bytePassword, _ := terminal.ReadPassword(int(syscall.Stdin))
- password := string(bytePassword)
- return strings.TrimSpace(username), strings.TrimSpace(password)
- }
- RECEIVER
- package main
- import(
- "fmt"
- "bytes"
- "net"
- "strings"
- "encoding/gob"
- "compress/gzip"
- // "io"
- "os"
- )
- type File struct {
- NAME string
- SIZE int64
- CHUNKNUM int64
- }
- type Chunk struct {
- CHUNKSIZE int64
- ISCONTENT bool
- FP [20]byte
- COMPRESSED_SIZE int64
- DECOMPRESSED_SIZE int64
- ORDER int64
- }
- func main () {
- cache := make(map[[20]byte]bytes.Buffer)
- runClient(cache)
- }
- func runClient(cache map[[20]byte]bytes.Buffer) {
- conn, _ := net.Dial("tcp", "localhost:8080")
- var fileName string
- var fileSize int64
- var chunkNum int64
- var i int64 = int64(0)
- /* Need filename and size */
- for {
- fileName, fileSize, chunkNum = fileInfo(conn)
- break
- }
- /* File to be written to */
- file, err := os.Create("output.dat")
- if err != nil {
- panic(err)
- }
- fmt.Printf("File \"%s\" created. Expected size = %d number of chunks: %d. \n", fileName, fileSize, chunkNum)
- /* Recieve all file content there is */
- fmt.Printf("CHUNKNUM is %d\n", chunkNum)
- decoder := gob.NewDecoder(conn)
- for i < chunkNum{
- i++
- recvFile(cache, conn, file, decoder)
- }
- file.Close()
- conn.Close()
- }
- func fileInfo(conn net.Conn) (string, int64, int64) {
- dec := gob.NewDecoder(conn)
- f := &File{}
- dec.Decode(f)
- tmp1 := f.NAME
- tmp2 := strings.Split(tmp1, ".")
- tmp3 := tmp2[1]
- tmp4 := tmp3 + "_output_" + f.NAME
- return tmp4, f.SIZE, f.CHUNKNUM
- }
- func recvFile(cache map[[20]byte]bytes.Buffer, conn net.Conn, file *os.File, dec *gob.Decoder) {
- /* Used for decoding the header from socket */
- newChunk := &Chunk{}
- dec.Decode(newChunk)
- if newChunk.DECOMPRESSED_SIZE == 0 {
- return
- }
- b := make([]byte, newChunk.COMPRESSED_SIZE)
- dec.Decode(&b)
- /* If the chunk already exists in cache, output content to file.
- * Nothing more to be done
- */
- _, ok := cache[newChunk.FP]
- if(ok == true) {
- offset := (newChunk.ORDER * newChunk.CHUNKSIZE)
- outputToFile(cache, newChunk.FP, newChunk.DECOMPRESSED_SIZE, file, offset)
- return
- }
- /* 'buf' is used for the compressed file content.
- * Need to decompress it. Make an entry in cache used for outputting
- * the correct content
- */
- buf := bytes.NewBuffer(b)
- cache[newChunk.FP] = *buf
- offset := (newChunk.ORDER * newChunk.CHUNKSIZE)
- outputToFile(cache, newChunk.FP, newChunk.DECOMPRESSED_SIZE, file, offset)
- }
- func decompress(cache map[[20]byte]bytes.Buffer, idx [20]byte, size int64) []byte{
- decompressed := make([]byte, size)
- compressed := cache[idx]
- gz, err := gzip.NewReader(&compressed)
- if err != nil {
- panic(err)
- }
- gz.Read(decompressed)
- return decompressed
- }
- func outputToFile(cache map[[20]byte]bytes.Buffer, idx [20]byte, size int64, file *os.File, offset int64) {
- cont := decompress(cache, idx, size)
- file.WriteAt(cont, offset)
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement