Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- //
- // Mediator.cpp
- // Chess
- //
- // Created by Nevin Flanagan on 4/10/12.
- // Copyright (c) 2012. All rights reserved.
- //
- #include "Mediator.h"
- #include <string.h>
- #include <sys/select.h>
- #include <sys/time.h>
- using namespace std;
- // Transmission()
- // Creates a blank transmission, usually for receiving
- // Anthony Xu
- HostMediator::Transmission::Transmission() {
- memset(data, 0, 256);
- length=0;
- }
- // Transmission(dataLength, someDataBuffer)
- // Copies data into a transmission buffer for sending
- // Anthony Xu
- HostMediator::Transmission::Transmission(unsigned short len, char const bytes[]) {
- if (len > 256) {
- throw string("package size too large!");
- }
- memcpy(this->data, bytes, len);
- memset(this->data + len, 0, 256 - len);
- // store length!
- }
- // HostMediator(hostName, contactPort, processor, errorRate)
- // Creates a new mediator to handle reliable transmissions over a newly created connection
- // Nevin Flanagan
- HostMediator::HostMediator(std::string const& targetAddress, int targetPort,
- Acceptor& localProcessor, float errorRate): partner(*new RemoteHost(targetAddress, targetPort, *new ConnectionManager(this), errorRate)), processor(localProcessor) {
- }
- // HostMediator(partnerHost, myProcessorCallback)
- // Creates a new mediator to handle large, reliable transmissions using connection
- // Nevin Flanagan
- HostMediator::HostMediator(RemoteHost& connection, HostMediator::Acceptor& localProcessor): partner(connection), processor(localProcessor) {
- sendWindowSize = 2;
- receiveWindowSize = 2;
- for (int i = 0; i < sendWindowSize; ++i) {
- sendingBuffers.push_back(new RemoteHost::Transmission());
- }
- receivingWindow.reset(receiveWindowSize);
- sendingID = 0;
- receivingID = 0;
- completedID = 0;
- completedReceipt = 0;
- }
- // mediator.transmit(myTransmission[, false]);
- // Starts sending myTransmission, dispatching an acknowledgement to the acceptor once receipt is confirmed
- // reliable determines whether or not resends are done
- // Nevin Flanagan
- HostMediator& HostMediator::transmit(HostMediator::Transmission const& data, bool reliable) throw (HostMediator::DeadConnection) {
- // add unique ID to queue
- long long currentID = ++sendingID;
- outgoingQueue.push_back(new SendBuffer(sendingID, data));
- // refill all empty send slots
- if (!sendingID) {
- list<unsigned char> openSlots = sendingWindow.missing();
- for (unsigned char* e = openSlots.begin(); e != openSlots.end(); ++e) {
- RemoteHost::Transmission* temporaryFrame = new RemoteHost::Transmission();
- if (*outgoingQueue.begin() >> *temporaryFrame) {
- sendingWindow[*e] = temporaryFrame;
- partner << *temporaryFrame;
- } else {
- delete temporaryFrame;
- break;
- }
- }
- }
- if (reliable) {
- while (completedID != currentID);
- completedID = 0;
- }
- return *this;
- }
- // mediator.transmit(myTransmission, interval[, false]);
- // Starts sending myTransmission, dispatching an acknowledgment to the acceptor once receipt is confirmed. An exception is thrown if the operation can be complete within interval seconds.
- // reliable determines whether or not resends are done
- // Nevin Flanagan
- HostMediator& HostMediator::transmit(HostMediator::Transmission const& data, float timeout, bool reliable)
- throw (HostMediator::DeadConnection, HostMediator::TimeoutExpired) {
- }
- // mediator.collect(blankTransmission)
- // Fills blankTransmission in with the next packet waiting in the mediator.
- // Blocks until a packet is available
- // Anthony Xu
- HostMediator& HostMediator::collect(HostMediator::Transmission& storage) throw (HostMediator::DeadConnection) {
- while (incomingQueue.size() == 0);
- HostMediator::Transmission* packet = *incomingQueue.begin();
- incomingQueue.pop_front();
- memcpy(storage.data, packet->data, storage.length = packet->length);
- return *this;
- }
- // mediator.collect(blankTransmission, interval)
- // Fills blankTransmission in with the next packet waiting in the mediator.
- // Blocks until a packet is available or interval passes.
- // Anthony Xu
- HostMediator& HostMediator::collect(HostMediator::Transmission& storage, float timeout) throw (HostMediator::DeadConnection, HostMediator::TimeoutExpired) {}
- // long val = mediator.checksum(userTransmission)
- // Calculates the four-byte checksum of the complete packet data using an XOR sequence
- // Nevin Flanagan
- unsigned long HostMediator::checksum(HostMediator::Transmission const& source) {
- unsigned long sum = 0;
- for (unsigned short i = 0; i < source.length; ++i) {
- sum ^= static_cast<unsigned long>(source.data[i]) << ((i % 4) * 8);
- }
- return sum;
- }
- // long val = mediator.checksum(internalTransmission)
- // Calculates the three-byte checksum of the frame data using an XOR sequence
- // Nevin Flanagan
- unsigned long HostMediator::checksum(RemoteHost::Transmission const& source) {
- unsigned long sum = 0;
- for (unsigned short i = 0; i < 128; ++i) {
- if (i >= 1 && i <= 3)
- continue;
- sum ^= static_cast<unsigned long>(source.data[i]) << ((i % 3) * 8);
- }
- return sum;
- }
- // nack(fill, validFrame)
- // Populates fill with a NACK frame based on the current state. Unless validFrame is false, fill is specified as the frame that failed.
- // Nevin Flanagan
- RemoteHost::Transmission& HostMediator::nack(RemoteHost::Transmission& storage, bool valid) {
- char sourceFrame = storage.data[18];
- memset(storage.data, 0, 128);
- storage.data[0] = NACK_LEADER;
- packetID partial = receivingID;
- for (int i = 11; i >= 4; --i) {
- storage.data[i] = partial & 0xFF;
- partial >>= 8;
- }
- char* frameList = storage.data + 12;
- if (valid) {
- *frameList = 0xF0 | sourceFrame >> 4;
- ++frameList;
- }
- if (receivingID) {
- *frameList = incoming->nextSequence() & 0x0F;
- ++frameList;
- }
- list<unsigned char> missingFrames = receivingWindow.missing();
- for (unsigned char* e = missingFrames.begin(); e != missingFrames.end(); ++e) {
- *frameList = *e;
- ++frameList;
- }
- *++frameList = 0;
- unsigned long chk = checksum(storage);
- for (int i = 3; i > 0; --i) { //4 or 3? !!!
- storage.data[i] = chk & 0xFF;
- chk >>= 8;
- }
- return storage;
- }
- // ack(overwriteFrame)
- // Creates a window state update based on the current receiving state
- // Nevin Flanagan
- RemoteHost::Transmission& HostMediator::ack(RemoteHost::Transmission& complete) {
- memset(complete.data, 0, 128);
- complete.data[0] = ACK_LEADER;
- packetID partial = receivingID;
- if (!partial)
- partial = completedReceipt;
- for (int i = 11; i >= 4; --i) {
- complete.data[i] = partial & 0xFF;
- partial >>= 8;
- }
- char* frameList = complete.data + 12;
- if (receivingID) {
- list<unsigned char> missingFrames = receivingWindow.missing();
- for (unsigned char* e = missingFrames.begin(); e != missingFrames.end(); ++e) {
- *frameList = *e;
- ++frameList;
- }
- }
- *++frameList = 0;
- unsigned long chk = checksum(complete);
- for (int i = 3; i > 0; --i) { //4 or 3? !!!
- complete.data[i] = chk & 0xFF;
- chk >>= 8;
- }
- return complete;
- }
- // ack(completedFrame, index)
- // Creates a window state update acknowledging up through the specified index
- // Nevin Flanagan
- RemoteHost::Transmission& HostMediator::ack(RemoteHost::Transmission& complete, unsigned char index) {
- memset(complete.data, 0, 128);
- complete.data[0] = ACK_LEADER;
- packetID partial = receivingID;
- for (int i = 11; i >= 4; --i) {
- complete.data[i] = partial & 0xFF;
- partial >>= 8;
- }
- char* frameList = complete.data + 12;
- *frameList = index | 0xF0;
- ++frameList;
- list<unsigned char> missingFrames = receivingWindow.missing();
- for (unsigned char* e = missingFrames.begin(); e != missingFrames.end(); ++e) {
- *frameList = *e;
- ++frameList;
- }
- *++frameList = 0;
- unsigned long chk = checksum(complete);
- for (int i = 3; i > 0; --i) { //4 or 3? !!!
- complete.data[i] = chk & 0xFF;
- chk >>= 8;
- }
- return complete;
- }
- // mediator.processData(data)
- // Attempts to process a new data frame into the receiving buffer
- // Nevin Flanagan
- RemoteHost::Transmission& HostMediator::processFrame(RemoteHost::Transmission& receipt) {
- packetID receivedPacket = 0;
- for (int i = 5; i < 12; ++i) {
- receivedPacket <<= 8;
- receivedPacket |= receipt.data[i];
- }
- unsigned char receivedFrame = receipt.data[18] >> 4 & 0x0F;
- if (!receivingID) {
- if (receivedPacket == completedReceipt) {
- // ack was lost, re-acknowledge
- return ack(receipt, receivedFrame);
- } else {
- if (!receivingWindow.includes(receivedFrame)) {
- return nack(receipt);
- } else {
- receivingWindow[receivedFrame] = new RemoteHost::Transmission(128, receipt.data);
- receivingID = receivedPacket;
- unsigned short packetLength = 0;
- for (int i = 17; i >= 16; --i) {
- packetLength <<= 8;
- packetLength |= receipt.data[i];
- }
- incoming = new ReceiveBuffer(receivedPacket, packetLength);
- }
- }
- } else {
- if (receivedPacket != receivingID)
- return nack(receipt);
- else
- if (receivingWindow.includes(receivedFrame))
- if (receivingWindow.contains(receivedFrame))
- return *reinterpret_cast<RemoteHost::Transmission*>(NULL);
- else
- receivingWindow[receivedFrame] = new RemoteHost::Transmission(128, receipt.data);
- else
- return nack(receipt);
- }
- RemoteHost::Transmission* nextFrame;
- unsigned char lastExtracted = 0xFF;
- bool completePacket = false;
- while (!completePacket && receivingWindow >> nextFrame) {
- *incoming << *nextFrame;
- lastExtracted = nextFrame->data[18] >> 4;
- completePacket = !(nextFrame->data[18] & 0x0F);
- delete nextFrame;
- }
- receivingWindow.extend(receiveWindowSize);
- if (completePacket) {
- incomingQueue.push_back(new HostMediator::Transmission(incoming->length, incoming->data));
- completedReceipt = receivingID;
- receivingID = 0;
- receivingWindow.reset(receiveWindowSize);
- delete incoming;
- incoming = NULL;
- }
- if (lastExtracted < 16)
- return ack(receipt, lastExtracted);
- return *reinterpret_cast<RemoteHost::Transmission*>(NULL);
- }
- // processACK(ackFrame)
- // Updates the sending window to clear all data leading up to the acknowledged frame
- // Nevin Flanagan
- RemoteHost::Transmission& HostMediator::processACK(RemoteHost::Transmission& ackFrame) {
- packetID partial = 0;
- for (int i = 5; i < 12; ++i) {
- partial <<= 8;
- partial |= ackFrame.data[i];
- }
- unsigned char acknowledgedFrame = ackFrame.data[12] & 0x0F;
- while (sendingWindow.contains(acknowledgedFrame)) {
- RemoteHost::Transmission* completedFrame;
- sendingWindow >> completedFrame;
- }
- sendingWindow.extend(sendWindowSize);
- list<unsigned char> openSlots = sendingWindow.missing();
- for (unsigned char* e = openSlots.begin(); e != openSlots.end(); ++e) {
- RemoteHost::Transmission* temporaryFrame = new RemoteHost::Transmission();
- if (*outgoingQueue.begin() >> *temporaryFrame) {
- sendingWindow[*e] = temporaryFrame;
- partner << *temporaryFrame;
- } else {
- delete temporaryFrame;
- break;
- }
- }
- openSlots = sendingWindow.missing();
- if (openSlots.size() >= sendWindowSize) {
- SendBuffer* finished = *outgoingQueue.begin();
- outgoingQueue.pop_front();
- delete finished;
- processor(*this, HostMediator::Acceptor::packet_acknowledged);
- }
- if (outgoingQueue.size() > 0) {
- for (unsigned char * e = openSlots.begin(); e != openSlots.end(); ++ e) {
- RemoteHost::Transmission* temporaryFrame = new RemoteHost::Transmission();
- if (*outgoingQueue.begin() >> *temporaryFrame) {
- sendingWindow[*e] = temporaryFrame;
- partner << *temporaryFrame;
- } else {
- delete temporaryFrame;
- break;
- }
- }
- }
- return ackFrame;
- }
- // processNACK(nackFrame)
- // Handles retransmission when negative acknowledgments are received.
- // Nevin Flanagan
- RemoteHost::Transmission& HostMediator::processNACK(RemoteHost::Transmission& nackFrame) {
- packetID partial = 0;
- for (int i = 5; i < 12; ++i) {
- partial <<= 8;
- partial |= nackFrame.data[i];
- }
- unsigned char acknowledgedFrame = nackFrame.data[12] & 0x0F;
- unsigned char earliestAwaited = nackFrame.data[13];
- if (sendingWindow.contains(earliestAwaited))
- partner << *sendingWindow[earliestAwaited];
- return nackFrame;
- }
- // SlidingWindow.reset(size)
- // Leaves the window empty, with size slots starting from 0
- // Nevin Flanagan
- void HostMediator::SlidingWindow::reset(unsigned char windowSize) {
- clear();
- while (--windowSize) {
- push_back(value_type(size(), NULL));
- }
- }
- // SlidingWindow.extend(size)
- // Adds new empty slots in sequence until there are at least size
- // Nevin Flanagan
- void HostMediator::SlidingWindow::extend(unsigned char windowSize) {
- unsigned char lastIndex = rbegin()->first;
- while (size() < windowSize) {
- if (++lastIndex > 15) lastIndex = 1;
- push_back(value_type(lastIndex, NULL));
- }
- }
- // SlidingWIndow.includes(index)
- // returns true if there is a bloc in the window with the specified index,
- // whether or not it contains any content
- // Nevin Flanagan
- // SlidingWindow.contains(index)
- // Returns true if the window includes the specified index, and it has content.
- // Nevin Flanagan
- bool HostMediator::SlidingWindow::contains(unsigned char index) {
- for (iterator e = begin(); e != end(); ++e) {
- if (e->first == index)
- return e->second != NULL;
- }
- return false;
- }
- // SlidingWindow[index]
- // provides easy access to the frame pointer associated with a given index
- // Nevin Flanagan
- RemoteHost::Transmission*& HostMediator::SlidingWindow::operator [](unsigned char index) {
- for (iterator e = begin(); e != end(); ++e) {
- if (e->first == index)
- return e->second;
- }
- throw string("requested element not found in window");
- return NULL;
- }
- // SlidingWindow >> framePtr
- // Removes the front of the list, if it has content, and stores the pointer in framePtr
- // If the first item in the list has no content, returns fals and stores nothing.
- // Nevin Flanagan
- bool HostMediator::SlidingWindow::operator>>(RemoteHost::Transmission*& receiver) {
- RemoteHost::Transmission* next = begin()->second;
- if (next) {
- pop_front();
- receiver = next;
- return true;
- }
- return false;
- }
- // SlidingWindow.missing()
- // Returns a list of spaces reserved in the window but not filled, in order
- // Nevin Flanagan
- list<unsigned char> HostMediator::SlidingWindow::missing() {
- list<unsigned char> emptySlots;
- for (iterator e = begin(); e != end(); ++e) {
- if (!e->second)
- emptySlots.push_back(e->first);
- }
- return emptySlots;
- }
- // SendBuffer(packetID, sourceData)
- // Manages fragmenting the contents of a Transmission into a size suitable for a RemoteHost
- // Anthony Xu
- HostMediator::SendBuffer::SendBuffer(packetID sendingPacket, HostMediator::Transmission const& source): Transmission(source.length, source.data) {
- data=source.data; // Use parent class constructor to copy data instead; see line above
- length=source.length;
- ID = sendingPacket;
- sequenceNumber = 0;
- buffer = this->data; //256 // should be set to this->data, not source.data which belongs to application layer
- }
- // sendBuffer >> outgoingBuffer
- // Extracts the next frame from the buffer
- // Anthony Xu
- bool HostMediator::SendBuffer::operator>>(RemoteHost::Transmission& storage) {
- // this function uses fields length, data, buffer, ID, and sequence number,
- // as well as the static mediator checksum functions, to assemble a frame
- // inside storage.data. It updates the buffer and sequenceNumber fields so
- // that on the next call, it will read new data.
- // fill in the first byte with Mediator::DATA_LEADER
- // Fill in bytes 4-11 with the packet ID
- // Fill in bytes 12-15 with the packet checksum (use existing function)
- // Fill in bytes 16-17 with the total packet length (just use the length field here)
- // Fill in byte 19 with the amount of data being put in the frame (up to 108 bytes)
- // Fill in byte 18 with the sequence information; put the current value of
- // sequenceNumber in the top four bits, then if there is no more data for another
- // frame, put 0 in the bottom four bits; or, if there is more data left, add one to
- // sequenceNumber (skip to 1 if this makes it more than 15) and store that in the
- // bottom four bits instead.
- // Fill in bytes 20-127 with data from buffer, and move buffer up.
- // Finally, use HostMediator::checksum on the frame, and store the result in bytes 1-3.
- if (!buffer)
- return false;
- packetID partial = ID; // partial is not being initialized; set to ID
- bool b_moreData=false;
- storage.data[0] = HostMediator::DATA_LEADER;
- for (int i = 11; i >= 4; --i) {
- storage.data[i] = partial & 0xFF;
- partial >>= 8;
- }
- //copy current package data to make a HostMediator::Transmission
- //it's used for calculate checksum
- // NFF
- // Don't need a new HostMediator::Transmission; a SendBuffer inherits from it
- // Just say checksum(*this). Also, you leak memory here.
- /*
- HostMediator::Transmission* package0;
- package0 = new HostMediator::Transmission(length, data);
- unsigned long chk_pkg = checksum(package0);
- */
- unsigned long chk_pkg = checksum(*this);
- for (int i = 15; i > 11; --i) {
- storage.data[i] = chk_pkg & 0xFF;
- chk_pkg >>= 8; // variable name doesn't match
- }
- unsigned long len0=length;
- for (int i = 17; i > 15; --i) {
- storage.data[i] = len0 & 0xFF;
- len0 >>= 8;
- }
- unsigned long len1=strlen(buffer);
- if (len1>108){
- b_moreData=true;
- len1=108;
- }
- storage.data[19] = len1 & 0xFF;
- unsigned long seq0 = sequenceNumber;
- unsigned long seq1 = 0;
- if (b_moreData)
- seq1 = sequenceNumber+1;
- if (seq1 > 15)
- seq1 = 1;
- storage.data[18] = (seq1 & 0x0F) | (seq0 & 0xF0);
- memcpy(storage.data+20, buffer, len1);
- unsigned long chk_frame = checksum(storage);
- for (int i = 3; i > 0; --i) { //4 or 3? !!! // frame checksum goes in bytes 1, 2, 3; this is correct
- storage.data[i] = chk_frame & 0xFF;
- chk_frame >>= 8; // should be chk_frame
- }
- buffer+=len1;
- sequenceNumber = seq1; // sequenceNumber has to go to 1 if it gets bigger than 15; save seq1 instead of incrementing
- return true;
- }
- // ReceiveBuffer()
- // Initializes a blank buffer to receive packet data into
- // Anthony Xu
- HostMediator::ReceiveBuffer::ReceiveBuffer(packetID receivingPacket, unsigned short declaredLength) {
- memset(data, 0, 256);
- ID = receivingPacket;
- length = declaredLength;
- buffer = data;
- sequenceNumber = 0;
- // sequenceNumber needs to be zero
- }
- // buffer << receivedData
- // Adds data from a received block into the buffer.
- // First verify that the buffer's desired sequence number matches the recevied frame.
- // Anthony Xu
- bool HostMediator::ReceiveBuffer::operator<<(RemoteHost::Transmission const& source) {
- // Get data out of source.data. Make sure the sequence number (top four bits of byte 18)
- // matches the sequenceNumber field of this. If it doesn't, return false. Then, set the
- // sequenceNumber of this object to the bottom four bits of that byte. Get the frame data
- // length from byte 19, and copy that much data from bytes 20-127 into the buffer of this
- // object. Move the buffer up so that the next new frame doesn't overwrite the data we just copied.
- //
- unsigned long checkbytes0, checkbytes1;
- checkbytes0 = source[18] & 0xF0;
- checkbytes1 = checkbytes0 >> 4 & 0x0F;
- if (checkbytes1 != sequenceNumber)
- return false;
- else {
- checkbytes1 = source[18] & 0x0F;
- }
- sequenceNumber = checkbytes1;
- checkbytes0 = source[19] & 0xFF;
- memcpy((void*) (source.data+20), (void*) buffer, checkbytes0);
- buffer+=checkbytes0;
- return true;
- }
- // ConnectionManager(deferredHandler)
- // Creates a new ConnectionManager which will allocate a HostMediator if
- // needed and use it to process incoming connections
- // Nevin Flanagan
- HostMediator::ConnectionManager::ConnectionManager(HostMediator::Acceptor& handler): acceptor(handler), mediator(NULL), allocated(NULL) {}
- // ConnectionManager(linkedMediator)
- // Creates a ConnectionManager to handle an existing HostMediator
- // Nevin Flanagan
- HostMediator::ConnectionManager::ConnectionManager(HostMediator& client): mediator(&client), allocated(NULL), acceptor(*reinterpret_cast<HostMediator::Acceptor*>(NULL)) {}
- // [callback]
- // Processes incoming data from the remote connection to pass ACKs and store incoming packets.
- // Nevin Flanagan
- void HostMediator::ConnectionManager::operator()(RemoteHost& connection, event what) {
- if (!mediator)
- allocated = mediator = new HostMediator(connection, acceptor);
- switch (what) {
- case RemoteHost::Acceptor::connection_lost:
- mediator->processor(*mediator, HostMediator::Acceptor::connection_lost);
- if (allocated) {
- delete mediator;
- delete this;
- }
- break;
- case RemoteHost::Acceptor::data_ready:
- // collect arrived frame
- RemoteHost::Transmission receipt;
- connection >> receipt;
- // check frame integrity
- unsigned long checkval = 0;
- for (int i = 1; i < 4; ++i) {
- checkval <<= 8;
- checkval |= receipt.data[i];
- }
- if (mediator->checksum(receipt) != checkval) {
- mediator->nack(receipt, false);
- connection << receipt;
- return;
- }
- switch (receipt.data[0]) {
- case DATA_LEADER:
- RemoteHost::Transmission& result = mediator->processFrame(receipt);
- if (&result != NULL)
- connection << result;
- return;
- break;
- case ACK_LEADER:
- break;
- case NACK_LEADER:
- break;
- }
- break;
- }
- }
- // ConnectionManager::Constructor(genObject)
- // Creates a manager that can be invoked to create a ConnectionManager that will use genObject as a reference to create new HostMediator::Acceptors
- // Nevin Flanagan
- HostMediator::ConnectionManager::Generator::Generator(HostMediator::Acceptor::Generator& source): generator(source) {}
- // [callback]
- // Calls a Generator to produce a new ConnectionManager object
- // Nevin Flanagan
- RemoteHost::Acceptor& HostMediator::ConnectionManager::Generator::operator()(int socketID) {
- return *new HostMediator::ConnectionManager(generator(*this));
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement