Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package main
- import(
- "os"
- "encoding/gob"
- "net"
- "fmt"
- "crypto/sha1"
- "bytes"
- "bufio"
- "compress/gzip"
- "sync"
- "io"
- "syscall"
- "strings"
- "golang.org/x/crypto/ssh"
- "golang.org/x/crypto/ssh/terminal"
- "strconv"
- )
- /* What we need to compress a chunk. Used before compression */
- type UncompressedChunkData struct {
- buffer []byte
- sendSize int64
- chunkSize int64
- orderNumber int64
- }
- /* Describes a chunk and body. This is sent */
- type TempChunk struct {
- CHUNKSIZE int64
- ISCONTENT bool
- FP [20]byte
- COMPRESSED_SIZE int64
- DECOMPRESSED_SIZE int64
- ORDER int64
- }
- /* Sent into last channel... */
- type Chunk struct {
- CHUNKSIZE int64
- ISCONTENT bool
- FP [20]byte
- COMPRESSED_SIZE int64
- DECOMPRESSED_SIZE int64
- ORDER int64
- BODY []byte
- }
- type File struct {
- NAME string
- SIZE int64
- CHUNKNUM int64
- }
- func main() {
- /* The only file to use - ever. Also need
- * password and username from user
- */
- path := "/data/inf2202/uniprot_sprot.dat"
- username, password := credentials()
- /* Describes the session */
- sshConfig := &ssh.ClientConfig{
- User: username,
- Auth: []ssh.AuthMethod{
- ssh.Password(password)},
- }
- sshConfig.SetDefaults()
- session := establishConnection(sshConfig)
- /* Size of file in bytes */
- fileSize := getFileSize(session, path)
- session.Close()
- /* Need to re-open it because it can only
- * handle one command per session...
- */
- session = establishConnection(sshConfig)
- /* Reader from the remote file */
- fileReader := getFileStream(session, path)
- /* Connection stuff */
- server, err := net.Listen("tcp", "localhost:8080")
- if err != nil {
- panic(err)
- }
- var chunkSize int64 = 8000
- cache := make(map[[20]byte]bytes.Buffer);
- nChunks := fileSize / chunkSize + 1
- fmt.Printf("chunks to send: %d\n", fileSize / chunkSize + 1);
- connection, err := server.Accept()
- encoder := gob.NewEncoder(connection)
- sendFileName(connection, path, fileSize, nChunks, encoder)
- runCompressionEngine(connection, cache, &fileReader, chunkSize, fileSize, nChunks, encoder);
- }
- func sendFileName(connection net.Conn, fileName string, fileSize int64, chunknum int64, encoder *gob.Encoder){
- p := &File{NAME:fileName, SIZE:fileSize, CHUNKNUM:chunknum}
- encoder.Encode(p)
- }
- /* Maps SHA fingerprints to chunks. The values are compressed chunks.
- */
- func runCompressionEngine(connection net.Conn, cache map[[20]byte]bytes.Buffer, reader *bufio.Reader, chunkSize int64, fileSize int64, nChunks int64, encoder *gob.Encoder) {
- var threadNum int = 8
- var k int = 0
- var wg sync.WaitGroup
- var l sync.Mutex
- send_channel := make(chan *Chunk, nChunks)
- comp_channel := make(chan *UncompressedChunkData, nChunks)
- var trailingSize, restSize int64
- /* Whole file can fit into chunk */
- if fileSize <= chunkSize {
- firstBuf := make([]byte, chunkSize)
- io.ReadAtLeast(reader, firstBuf, int(chunkSize))
- chunk := UncompressedChunkData{buffer: firstBuf, sendSize: fileSize, chunkSize: fileSize, orderNumber: 0}
- comp_channel<-&chunk
- /* We have to iterate
- * over whole file chunk by chunk. We might need to read the trailing chunk
- * which is smaller than chunkSize.
- */
- } else {
- i := int64(0)
- midBuffer := make([]byte, chunkSize)
- io.ReadAtLeast(reader, midBuffer, int(chunkSize))
- chunk := UncompressedChunkData{buffer:midBuffer, sendSize: chunkSize, chunkSize: chunkSize, orderNumber: i}
- comp_channel<-&chunk
- for i < nChunks - 1{
- i++
- restSize = fileSize - (i * chunkSize)
- /* Used for trailing chunk which is
- * smaller than chunkSize. Maybe we don't
- * need lastBuf? Rare case
- */
- if restSize <= chunkSize {
- trailingSize = restSize
- lastBuf := make([]byte, trailingSize)
- io.ReadAtLeast(reader, lastBuf, int(trailingSize))
- chunk := UncompressedChunkData{buffer:lastBuf, sendSize: trailingSize, chunkSize: chunkSize, orderNumber: i}
- comp_channel<-&chunk
- } else{
- mBuffer := make([]byte, chunkSize)
- io.ReadAtLeast(reader, mBuffer, int(chunkSize))
- chunk := UncompressedChunkData{buffer:mBuffer, sendSize: chunkSize, chunkSize: chunkSize, orderNumber: i}
- comp_channel<-&chunk
- }
- }
- close(comp_channel)
- /* Concurrent mapping and compression
- */
- for k < threadNum {
- wg.Add(1)
- go mapAndSend(cache, connection, comp_channel, send_channel, &wg, &l, encoder)
- k++
- }
- wg.Wait()
- close(send_channel)
- sendChunk(connection, send_channel, &l, encoder)
- }
- }
- /* Compresses 'buffer' and maps it to the given 'cache' using 'chunkSize'.
- * Also send the data through the socket.
- */
- func mapAndSend(cache map[[20]byte]bytes.Buffer, connection net.Conn, compChan chan *UncompressedChunkData, sendChan chan *Chunk, wg *sync.WaitGroup, l *sync.Mutex, encoder *gob.Encoder) {
- /* Loop through every chunk in the compression channel */
- for chunk := range compChan{
- fpInput := make([]byte, chunk.chunkSize);
- copy(fpInput[:], chunk.buffer[:]);
- idx := sha1.Sum(fpInput);
- /* Critical section */
- l.Lock()
- _, ok := cache[idx];
- /* Already in cache. Send just fingerprint */
- if ok {
- l.Unlock()
- cSize := int64(0)
- pkg := Chunk{CHUNKSIZE: chunk.chunkSize, ISCONTENT:false, FP:idx, COMPRESSED_SIZE:cSize, DECOMPRESSED_SIZE:chunk.sendSize, ORDER:chunk.orderNumber}
- /* Add the new header to the send channel */
- sendChan<-&pkg
- sendChunk(connection, sendChan, l, encoder)
- /* Not in cache. Compress buffer, map it using
- * the fingerpring, send header and compressed buffer
- */
- } else {
- l.Unlock()
- b := compressedBuffer(chunk.buffer)
- l.Lock()
- cache[idx] = b
- l.Unlock()
- cSize := int64(b.Len())
- body := make([]byte, cSize)
- copy(body, b.Bytes())
- pkg := Chunk{CHUNKSIZE: chunk.chunkSize, ISCONTENT:true, FP:idx, COMPRESSED_SIZE:cSize, DECOMPRESSED_SIZE:chunk.sendSize, ORDER:chunk.orderNumber, BODY:body}
- /* Add the new header to the send channel */
- sendChan<-&pkg
- sendChunk(connection, sendChan, l, encoder)
- }
- }
- wg.Done()
- }
- /* Compresses and returns 'buffer' */
- func compressedBuffer(buffer []byte) bytes.Buffer {
- var b bytes.Buffer
- gz := gzip.NewWriter(&b)
- if _, err := gz.Write(buffer); err != nil {
- panic(err)
- }
- if err := gz.Flush(); err != nil {
- panic(err)
- }
- if err := gz.Close(); err != nil {
- panic(err)
- }
- return b
- }
- /* Sends 'header' through the socket in an encoded format */
- func sendChunk(connection net.Conn, channel chan *Chunk, l *sync.Mutex, encoder *gob.Encoder){
- //Loop through and send every element in the channel
- for p := range channel{
- l.Lock()
- chunk := &TempChunk{CHUNKSIZE: p.CHUNKSIZE, ISCONTENT: p.ISCONTENT, FP: p.FP, COMPRESSED_SIZE: p.COMPRESSED_SIZE, DECOMPRESSED_SIZE: p.DECOMPRESSED_SIZE, ORDER:p.ORDER}
- c := make([]byte, p.COMPRESSED_SIZE)
- c = p.BODY[:]
- err := encoder.Encode(chunk)
- if err != nil {
- panic(err)
- }
- err = encoder.Encode(c)
- if err != nil {
- panic(err)
- }
- l.Unlock()
- if len(channel) == 0 {
- return
- }
- }
- }
- /* Returns a file stream from the file on a remote server */
- func getFileStream(session *ssh.Session, path string) (bufio.Reader) {
- r, err := session.StdoutPipe()
- if err != nil {
- panic(err)
- }
- err = session.Start("cat " + path)
- if err != nil {
- panic(err)
- }
- return *bufio.NewReader(r)
- }
- /* Returns the filesize from the file located at the SSH server */
- func getFileSize(session *ssh.Session, path string) int64 {
- r, err := session.StdoutPipe()
- if err != nil {
- panic(err)
- }
- /* Get size of file using wc from UNIX */
- err = session.Start("wc -c "+ path)
- if err != nil {
- panic(err)
- }
- reader := bufio.NewReader(r)
- b, err := reader.ReadBytes(' ')
- if err != nil {
- panic(err)
- }
- /* Don't want the trailing whitespace */
- k := string(b[:len(b) - 1])
- /* From string to int64 */
- size, err := strconv.ParseInt(k, 10, 64)
- if err != nil {
- panic(err)
- }
- return size
- }
- /* Establishes a connection to the SSH server, thus making it
- * possible to run commands on it.
- */
- func establishConnection(config *ssh.ClientConfig) *ssh.Session {
- connection, err := ssh.Dial("tcp", "lgserv3.stud.cs.uit.no:22", config)
- if err != nil {
- panic(err)
- }
- session, err := connection.NewSession()
- if err != nil {
- panic(err)
- }
- return session
- }
- /* Prompts for username and password used to SSH session */
- func credentials() (string, string) {
- reader := bufio.NewReader(os.Stdin)
- fmt.Printf("==== COMPRESSION ENGINE ====\n")
- fmt.Print("Username: ")
- username, _ := reader.ReadString('\n')
- fmt.Print("Password: ")
- bytePassword, _ := terminal.ReadPassword(int(syscall.Stdin))
- password := string(bytePassword)
- return strings.TrimSpace(username), strings.TrimSpace(password)
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement