Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <iostream>
- #include <string>
- #include <thread>
- #define PIPE_BUF 64 //bufor wszystkich pipe'ów
- #define PIPE_SIZE 64 //ilość bitów w jednym pipe
- struct process {
- int id;
- int parent = 0;
- };
- class communicationModule {
- private:
- struct pipe_inode_info {
- char data[PIPE_SIZE];
- process sender;
- process receiver;
- bool lock = 0;
- int dataSize = 0;
- };
- struct arrayPipes {
- pipe_inode_info canal;
- };
- arrayPipes streamArr[PIPE_BUF];
- int arrayPos = 0;
- int addToArray(pipe_inode_info f, pipe_inode_info s) {
- if (arrayPos + 1 > PIPE_BUF) {
- std::cout << "BLAD! Przekroczono maksymalny rozmiar bufora!\n";
- return -1;
- }
- arrayPipes p, r;
- p.canal = f;
- r.canal = s;
- streamArr[arrayPos] = p;
- streamArr[arrayPos + 1] = r;
- arrayPos += 2;
- return arrayPos - 1;
- }
- void delFromArray() {
- arrayPos -= 2;
- }
- void returnStream(int p, pipe_inode_info &f, pipe_inode_info &s) {
- f = streamArr[p].canal;
- s = streamArr[p + 1].canal;
- }
- int checkReader(int p, process r) {
- if (streamArr[p].canal.receiver.id == r.id) return 1;
- else return 0;
- }
- int checkWriter(int p, process w) {
- if (streamArr[p].canal.sender.id == w.id) return 1;
- else return 0;
- }
- public:
- ~communicationModule() {
- }
- communicationModule() {
- }
- int pipe(int(&fd)[2], process s, process r) {
- pipe_inode_info node1;
- pipe_inode_info node2;
- for (int n = 0; n < PIPE_SIZE; n++) {
- node1.data[n] = 0;
- node2.data[n] = 0;
- }
- node1.receiver = r;
- node1.sender = s;
- node2.receiver = s;
- node2.sender = r;
- int state = addToArray(node1, node2);
- int state2 = addToArray(node2, node1);
- if (state == -1 || state2 == -1) {
- std::cout << "BLAD! ENFILE - brak miejsca w tablicy!\n";
- return -1; //ENFILE (brakuje miejsca w systemowej tablica plikow)
- }
- fd[0] = state;
- fd[1] = state2;
- return 0;
- }
- int read(int fd, std::string* buf, int count, process reader) {
- std::cout << "Proces " << reader.id << " odczytuje " << count << " znakow\n";
- while (streamArr[fd].canal.data[0] == 0) { std::this_thread::sleep_for(std::chrono::seconds(1)); }
- switch (checkReader(fd, reader)) {
- case 1: {
- int &dataSize = streamArr[fd].canal.dataSize;
- if (count <= dataSize) {
- while (streamArr[fd].canal.data[0] != 0 && count > 0) {
- *buf += streamArr[fd].canal.data[0];
- for (int n = 0; n < PIPE_SIZE - 1; n++) {
- streamArr[fd].canal.data[n] = streamArr[fd].canal.data[n + 1];
- }
- count--;
- dataSize--;
- }
- for (int n = dataSize; n < PIPE_SIZE; n++) {
- streamArr[fd].canal.data[n] = 0;
- }
- return buf->size();
- }
- else if (count > dataSize) {
- while (streamArr[fd].canal.data[0] != 0 && count > 0) {
- *buf += streamArr[fd].canal.data[0];
- for (int n = 0; n < PIPE_SIZE - 1; n++) {
- streamArr[fd].canal.data[n] = streamArr[fd].canal.data[n + 1];
- }
- count--;
- dataSize--;
- }
- for (int n = dataSize; n < PIPE_SIZE; n++) {
- streamArr[fd].canal.data[n] = 0;
- }
- return buf->size();
- }
- }
- case 0: { std::cout << "BLAD! Brak uprawnien do zapisywanie w potoku!\n"; }
- }
- //else error
- }
- int write(int fd, std::string buf, process writer) {
- if (streamArr[fd].canal.lock == 1) {
- std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "Blokada\n";
- }
- std::cout << "Proces " << writer.id << " wysyla komunikat o tresci " << buf << "\n";
- if (streamArr[fd].canal.sender.id == NULL) {
- std::cout << "BLAD! EBADF - fd nie jest deskryptorem\n";
- return -1;
- } //EBADF (fd nie jest deskryptorem)
- if (streamArr[fd].canal.receiver.id == NULL) {
- std::cout << "BLAD! EPIPE - brak czytelnikow\n";
- return -1;
- } //EPIPE (nie ma czytelnikow i lacze jest zablokowane lub nie ma w nim miejsca) /+ wysłanie sygnału SIGPIPE do procesu
- if (streamArr[fd].canal.lock == 1) {
- std::cout << "BLAD! EPIPE - lacze jest zablokowane\n";
- return -1;
- } //EPIPE (nie ma czytelnikow i lacze jest zablokowane lub nie ma w nim miejsca)
- int insertCount = 0;
- switch (checkWriter(fd, writer)) {
- case 1: {
- streamArr[fd].canal.lock = 1;
- int &dataSize = streamArr[fd].canal.dataSize;
- if (buf.size() <= PIPE_SIZE - dataSize) {
- for (int n = dataSize, m = 0; n < PIPE_SIZE, m < buf.size(); n++, m++) {
- streamArr[fd].canal.data[n] = buf[m];
- dataSize++;
- insertCount++;
- }
- streamArr[fd].canal.lock = 0;
- return insertCount;
- }
- else if (buf.size() > PIPE_SIZE - dataSize) {
- //if (spacLeft == 0) zwrocenie bledu EAGAIN
- int m = 0;
- for (int n = dataSize, m = 0; n < PIPE_SIZE, m < buf.size(); n++, m++) {
- streamArr[fd].canal.data[n] = buf[m];
- insertCount++;
- }
- dataSize = PIPE_SIZE;
- streamArr[fd].canal.lock = 0;
- //zamrożenie procesu
- return insertCount;
- }
- streamArr[fd].canal.lock = 0;
- }
- case 0: {
- std::cout << "BLAD! EINVAL - fd nie jest dekstryptorem, do ktorego mozna pisac\n";
- return -1; //EINVAL (fd nie jest deskryptorem, do ktorego mozna pisac)
- }
- }
- }
- void printPipe(int fd) {
- std::cout << streamArr[fd].canal.receiver.id << " < ";
- for (int n = 0; n < PIPE_SIZE; n++) {
- if (streamArr[fd].canal.data[n] == 0) std::cout << '-';
- else std::cout << streamArr[fd].canal.data[n];
- }
- std::cout << " < " << streamArr[fd].canal.sender.id << '\n';
- }
- bool checkMessage(int fd) {
- if (streamArr[fd].canal.data[0] == 0) return 0;
- return 1;
- }
- };
- void readPipe(communicationModule* cm, int fd, std::string* bufor, process p) {
- while (1) {
- if (cm->checkMessage(fd)) {
- cm->read(fd, bufor, 8, p);
- std::cout << "Proces " << p.id << " odczytal wiadomosc. Obecny bufor: " << *bufor << "\n";
- }
- std::this_thread::sleep_for(std::chrono::seconds(1));
- }
- }
- int main() {
- communicationModule cm;
- process p1, p2;
- p1.id = 1;
- p2.id = 2;
- p2.parent = 1;
- int fd[2];
- std::string buf;
- std::string* b = &buf;
- cm.pipe(fd, p1, p2);
- cm.write(fd[1], "test", p1);
- cm.printPipe(fd[1]);
- cm.read(fd[1], b, 4, p2);
- getchar();
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement