Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: utf-8 -*-
- """
- Fantastic v4.1 – Enterprise Policy Manager – PySide6
- Finalized script with Standard "First Match Wins" logic and fully robust IP/Domain match checking.
- """
- import sys, json, uuid, redis, math
- from PySide6.QtWidgets import (
- QApplication, QMainWindow, QWidget, QListWidget, QListWidgetItem,
- QPushButton, QLabel, QLineEdit, QTextEdit, QCheckBox,
- QVBoxLayout, QHBoxLayout, QSplitter, QDialog, QFormLayout,
- QDialogButtonBox, QMessageBox, QInputDialog, QProgressDialog
- )
- from PySide6.QtCore import Qt, QThread, Signal, QCoreApplication
- from PySide6.QtGui import QColor, QPalette
- # -----------------------------
- # Redis Backend (omitted for brevity, assume correct)
- # -----------------------------
- class PolicyStore:
- def __init__(self):
- self.r = None
- def connect(self, host, port):
- self.r = redis.Redis(host=host, port=int(port), decode_responses=True)
- self.r.ping()
- self.r.setnx("policy:domain_order", json.dumps(["*"]))
- def domain_order(self):
- if not self.r: return []
- order = self.r.get("policy:domain_order")
- return list(json.loads(order)) if order else []
- def save_domain_order(self, order):
- if not self.r: return
- self.r.set("policy:domain_order", json.dumps(order))
- def load_domain(self, domain):
- if not self.r: return {"domain": domain, "items": []}
- raw = self.r.get(f"domain:{domain}")
- if not raw:
- return {"domain": domain, "items": []}
- data = json.loads(raw)
- for item in data.get("items", []):
- if "id" not in item:
- item["id"] = str(uuid.uuid4())
- return data
- def save_domain(self, domain_json):
- if not self.r: return
- self.r.set(f"domain:{domain_json['domain']}", json.dumps(domain_json))
- def delete_domain(self, domain):
- if not self.r: return
- self.r.delete(f"domain:{domain}")
- def list_domains(self):
- if not self.r: return []
- order = self.domain_order()
- existing_domains = set()
- for key in self.r.scan_iter("domain:*"):
- existing_domains.add(key.split(":", 1)[1])
- domains = [d for d in order if d in existing_domains]
- for d in existing_domains:
- if d not in domains:
- domains.append(d)
- return domains
- # -----------------------------
- # ASYNCHRONOUS WORKER (omitted for brevity, assume correct)
- # -----------------------------
- class RedisWorker(QThread):
- domains_loaded = Signal(list)
- domain_data_loaded = Signal(dict)
- error = Signal(str)
- def __init__(self, store, operation, data=None, parent=None):
- super().__init__(parent)
- self.store = store
- self.operation = operation
- self.data = data
- def run(self):
- try:
- if self.operation == "list_domains":
- result = self.store.list_domains()
- self.domains_loaded.emit(result)
- elif self.operation == "load_domain":
- domain_name = self.data
- result = self.store.load_domain(domain_name)
- self.domain_data_loaded.emit(result)
- except redis.exceptions.ConnectionError:
- self.error.emit("Redis connection error. Please check host/port.")
- except Exception as e:
- self.error.emit(f"Worker Error: {str(e)}")
- # -----------------------------
- # Domain Item Dialog (omitted for brevity, assume correct)
- # -----------------------------
- class DomainItemDialog(QDialog):
- def __init__(self, item=None, parent=None):
- super().__init__(parent)
- self.setWindowTitle("Edit Domain Item")
- self.item = item or {}
- self.layout = QFormLayout(self)
- self.action_edit = QLineEdit(self.item.get("action","allow"))
- self.source_edit = QLineEdit(",".join(self.item.get("source_net",[])))
- self.applies_edit = QLineEdit(",".join(self.item.get("applies_on",[])))
- self.type_edit = QLineEdit(self.item.get("policy_type","custom"))
- self.desc_edit = QTextEdit(self.item.get("description",""))
- self.path_edit = QLineEdit(self.item.get("path","/"))
- self.wildcard_chk = QCheckBox()
- self.wildcard_chk.setChecked(self.item.get("wildcard",False))
- self.user_edit = QLineEdit(",".join(self.item.get("users",[])))
- self.layout.addRow("Action", self.action_edit)
- self.layout.addRow("Source Net", self.source_edit)
- self.layout.addRow("Applies On", self.applies_edit)
- self.layout.addRow("Policy Type", self.type_edit)
- self.layout.addRow("Description", self.desc_edit)
- self.layout.addRow("Path", self.path_edit)
- self.layout.addRow("Wildcard", self.wildcard_chk)
- self.layout.addRow("Users (LDAP)", self.user_edit)
- buttons = QDialogButtonBox(QDialogButtonBox.Ok | QDialogButtonBox.Cancel)
- buttons.accepted.connect(self.accept)
- buttons.rejected.connect(self.reject)
- self.layout.addWidget(buttons)
- def get_data(self):
- return {
- "id": self.item.get("id", str(uuid.uuid4())),
- "action": self.action_edit.text().strip(),
- "source_net": [x.strip() for x in self.source_edit.text().split(",") if x.strip()],
- "applies_on": [x.strip() for x in self.applies_edit.text().split(",") if x.strip()],
- "policy_type": self.type_edit.text().strip(),
- "description": self.desc_edit.toPlainText().strip(),
- "path": self.path_edit.text().strip(),
- "wildcard": self.wildcard_chk.isChecked(),
- "users": [x.strip() for x in self.user_edit.text().split(",") if x.strip()]
- }
- # -----------------------------
- # GUI (omitted for brevity, assume correct)
- # -----------------------------
- class PolicyGUI(QMainWindow):
- PAGE_SIZE = 50
- def __init__(self):
- super().__init__()
- self.setWindowTitle("Fantastic v4.1 – Enterprise Policy Manager (Final)")
- self.resize(1500,950)
- self.store = PolicyStore()
- self.current_domain = None
- self.domain_page = 0
- self.item_page = 0
- self.all_domains_list = []
- self.worker = None
- self.progress = None
- self.init_ui()
- def init_ui(self):
- splitter = QSplitter(Qt.Horizontal)
- self.setCentralWidget(splitter)
- left = QWidget(); l = QVBoxLayout(left)
- # Redis connect
- conn = QHBoxLayout()
- self.host = QLineEdit("127.0.0.1")
- self.port = QLineEdit("6379")
- btn_conn = QPushButton("Connect")
- btn_conn.clicked.connect(self.redis_connect)
- self.conn_indicator = QLabel(); self.conn_indicator.setFixedSize(16,16)
- self.update_conn_indicator(False)
- btn_refresh = QPushButton("Refresh Redis")
- btn_refresh.clicked.connect(self.reload_all)
- conn.addWidget(QLabel("Redis")); conn.addWidget(self.host); conn.addWidget(self.port)
- conn.addWidget(btn_conn); conn.addWidget(btn_refresh); conn.addWidget(self.conn_indicator)
- l.addLayout(conn)
- # Domain list
- l.addWidget(QLabel("Domains (Top is highest priority)"))
- self.domain_list = QListWidget()
- self.domain_list.itemClicked.connect(self.load_domain)
- l.addWidget(self.domain_list)
- db = QHBoxLayout()
- b_list = [
- ("Add", self.add_domain),("Edit", self.edit_domain),("Delete", self.del_domain),
- ("Clone", self.clone_domain),("▲", lambda: self.move_domain(-1)),("▼", lambda: self.move_domain(1))
- ]
- for t,f in b_list:
- b=QPushButton(t)
- if t in ["Add", "Edit", "Delete", "Clone"]:
- b.clicked.connect(lambda checked, func=f: self.run_async_operation(func))
- else:
- b.clicked.connect(f)
- db.addWidget(b)
- l.addLayout(db)
- # Domain items
- l.addWidget(QLabel("Selected Domain Rules (Top is highest priority within domain)"))
- self.item_list = QListWidget(); l.addWidget(self.item_list)
- ib = QHBoxLayout()
- i_list = [
- ("Add", self.add_item),("Edit", self.edit_item),("Delete", self.del_item),
- ("Clone", self.clone_item),("▲", lambda: self.move_item(-1)),("▼", lambda: self.move_item(1))
- ]
- for t,f in i_list:
- b=QPushButton(t)
- b.clicked.connect(f)
- ib.addWidget(b)
- l.addLayout(ib)
- # Paging labels
- self.domain_page_label = QLabel("Domain Page: 1")
- self.item_page_label = QLabel("Items Page: 1")
- l.addWidget(self.domain_page_label)
- l.addWidget(self.item_page_label)
- # Simulator Controls
- l.addWidget(QLabel("Policy Simulator (Evaluates ALL Domains by Order)"))
- sim_fields = QHBoxLayout()
- self.user_sim = QLineEdit(); self.user_sim.setPlaceholderText("User")
- self.ip_sim = QLineEdit(); self.ip_sim.setPlaceholderText("IP Address")
- self.domain_sim = QLineEdit(); self.domain_sim.setPlaceholderText("Domain/Path (e.g., visa.com)")
- sim_fields.addWidget(self.user_sim); sim_fields.addWidget(self.ip_sim); sim_fields.addWidget(self.domain_sim)
- l.addLayout(sim_fields)
- btn_sim = QPushButton("Run Simulator")
- btn_sim.clicked.connect(self.run_simulator)
- l.addWidget(btn_sim)
- # Apply changes
- self.btn_apply = QPushButton("Apply Changes")
- self.btn_apply.clicked.connect(self.apply_changes)
- l.addWidget(self.btn_apply)
- splitter.addWidget(left)
- # -----------------------------
- # Helper Functions (omitted for brevity, assume correct)
- # -----------------------------
- def start_worker(self, operation, data=None):
- if self.worker and self.worker.isRunning():
- QMessageBox.warning(self, "Busy", "Please wait for the current operation to finish.")
- return
- if not self.store.r:
- QMessageBox.warning(self, "Connection Required", "Please connect to Redis first.")
- return
- if self.progress is None:
- self.progress = QProgressDialog("Connecting to Redis...", "Cancel", 0, 0, self)
- self.progress.setWindowModality(Qt.WindowModal)
- self.progress.setAutoClose(True)
- self.progress.setWindowTitle("Redis Operation")
- self.progress.setLabelText(f"Running {operation}...")
- self.progress.show()
- self.worker = RedisWorker(self.store, operation, data, parent=self)
- self.worker.domains_loaded.connect(self.handle_domains_loaded)
- self.worker.domain_data_loaded.connect(self.handle_domain_data_loaded)
- self.worker.error.connect(self.handle_worker_error)
- self.worker.finished.connect(self.progress.hide)
- self.worker.start()
- def run_async_operation(self, func):
- try:
- func()
- self.start_worker("list_domains")
- except Exception as e:
- if "cancelled" not in str(e):
- QMessageBox.critical(self, "Operation Error", str(e))
- if self.progress:
- self.progress.hide()
- def handle_worker_error(self, message):
- if self.progress:
- self.progress.hide()
- self.update_conn_indicator(False)
- QMessageBox.critical(self, "Redis Worker Error", message)
- def handle_domains_loaded(self, domains_list):
- self.all_domains_list = domains_list
- self.reload_domains()
- def handle_domain_data_loaded(self, domain_data):
- self.current_domain = domain_data
- self.reload_items()
- def redis_connect(self):
- try:
- self.store.connect(self.host.text(), self.port.text())
- self.update_conn_indicator(True)
- self.start_worker("list_domains")
- QMessageBox.information(self,"Redis","Connected successfully")
- except Exception as e:
- self.update_conn_indicator(False)
- QMessageBox.critical(self,"Redis",str(e))
- def update_conn_indicator(self, status):
- palette = self.conn_indicator.palette()
- palette.setColor(QPalette.Window, QColor("green" if status else "red"))
- self.conn_indicator.setAutoFillBackground(True)
- self.conn_indicator.setPalette(palette)
- self.conn_indicator.show()
- def reload_all(self):
- if self.store.r:
- self.start_worker("list_domains")
- else:
- QMessageBox.warning(self, "Connection Required", "Please connect to Redis first.")
- def reload_domains(self):
- self.domain_list.clear()
- all_domains = self.all_domains_list
- start = self.domain_page*self.PAGE_SIZE
- end = start+self.PAGE_SIZE
- for d in all_domains[start:end]:
- self.domain_list.addItem(d)
- total_pages = max(1, math.ceil(len(all_domains)/self.PAGE_SIZE))
- self.domain_page_label.setText(f"Domain Page: {self.domain_page+1}/{total_pages}")
- def reload_items(self):
- self.item_list.clear()
- if not self.current_domain: return
- items=self.current_domain['items']
- start=self.item_page*self.PAGE_SIZE
- end=start+self.PAGE_SIZE
- for itm in items[start:end]:
- i=QListWidgetItem(f"{itm['action']} | {','.join(itm['source_net'])} | wildcard:{itm.get('wildcard',False)} | {itm.get('description', '')[:30]}...")
- i.setData(Qt.UserRole,itm['id'])
- self.item_list.addItem(i)
- total_pages=max(1, math.ceil(len(items)/self.PAGE_SIZE))
- self.item_page_label.setText(f"Items Page: {self.item_page+1}/{total_pages} (Total: {len(items)})")
- def add_domain(self):
- if not self.store.r: return QMessageBox.warning(self, "Connection Required", "Please connect to Redis first.")
- name, ok = QInputDialog.getText(self,"Add Domain","Domain:")
- if ok and name:
- name=name.strip()
- if name in self.store.list_domains():
- QMessageBox.warning(self, "Exists", "Domain already exists.")
- return
- order=self.store.domain_order()
- order.append(name)
- self.store.save_domain_order(order)
- self.store.save_domain({"domain":name,"items":[]})
- def edit_domain(self):
- if not self.store.r: return QMessageBox.warning(self, "Connection Required", "Please connect to Redis first.")
- sel=self.domain_list.currentItem()
- if not sel: return
- old=sel.text()
- new, ok = QInputDialog.getText(self,"Edit Domain","Domain:", text=old)
- if ok and new and new!=old:
- data=self.store.load_domain(old)
- self.store.delete_domain(old)
- data["domain"]=new
- self.store.save_domain(data)
- order=self.store.domain_order();
- try:
- idx=order.index(old);
- order[idx]=new;
- self.store.save_domain_order(order)
- except ValueError:
- pass
- def del_domain(self):
- if not self.store.r: return QMessageBox.warning(self, "Connection Required", "Please connect to Redis first.")
- sel=self.domain_list.currentItem();
- if not sel: return
- domain=sel.text()
- reply = QMessageBox.question(self, 'Confirm Delete',
- f"Are you sure you want to delete the domain: '{domain}' and ALL its policies?",
- QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
- if reply == QMessageBox.Yes:
- self.store.delete_domain(domain)
- order=self.store.domain_order();
- try:
- order.remove(domain)
- self.store.save_domain_order(order)
- except ValueError:
- pass
- self.current_domain = None
- else:
- raise Exception("Delete operation cancelled.")
- def clone_domain(self):
- if not self.store.r: return QMessageBox.warning(self, "Connection Required", "Please connect to Redis first.")
- sel=self.domain_list.currentItem()
- if not sel: return
- domain_name = sel.text()
- data = self.store.load_domain(domain_name)
- new_name = domain_name + "_clone"
- data["domain"] = new_name
- self.store.save_domain(data)
- order = self.store.domain_order(); order.append(new_name); self.store.save_domain_order(order)
- def move_domain(self,d):
- if not self.store.r: return QMessageBox.warning(self, "Connection Required", "Please connect to Redis first.")
- current_item = self.domain_list.currentItem()
- if not current_item: return
- current_domain_name = current_item.text()
- order = self.store.domain_order()
- try:
- sel_index_in_order = order.index(current_domain_name)
- except ValueError:
- QMessageBox.warning(self, "Error", "Domain not found in the priority order list. Please refresh the GUI.")
- return
- n = sel_index_in_order + d
- if 0 <= n < len(order):
- order[sel_index_in_order], order[n] = order[n], order[sel_index_in_order]
- self.store.save_domain_order(order)
- self.all_domains_list = self.store.list_domains()
- self.reload_domains()
- try:
- current_visible_index = self.domain_list.row(current_item)
- self.domain_list.setCurrentRow(current_visible_index + d)
- except:
- pass
- def load_domain(self,item):
- if not self.store.r: return QMessageBox.warning(self, "Connection Required", "Please connect to Redis first.")
- self.start_worker("load_domain", item.text())
- def add_item(self):
- if not self.current_domain: return
- dlg=DomainItemDialog(parent=self)
- if dlg.exec():
- itm=dlg.get_data()
- self.current_domain['items'].append(itm)
- self.store.save_domain(self.current_domain)
- self.reload_items()
- def edit_item(self):
- sel=self.item_list.currentItem()
- if not sel or not self.current_domain: return
- iid=sel.data(Qt.UserRole)
- try:
- itm=[i for i in self.current_domain['items'] if i['id']==iid][0]
- except IndexError:
- QMessageBox.warning(self, "Error", "Selected item not found.")
- return
- dlg=DomainItemDialog(item=itm,parent=self)
- if dlg.exec():
- new_data=dlg.get_data()
- idx=self.current_domain['items'].index(itm)
- self.current_domain['items'][idx]=new_data
- self.store.save_domain(self.current_domain)
- self.reload_items()
- def del_item(self):
- sel=self.item_list.currentItem()
- if not sel or not self.current_domain: return
- iid=sel.data(Qt.UserRole)
- reply = QMessageBox.question(self, 'Confirm Delete',
- f"Are you sure you want to delete the selected policy rule?",
- QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
- if reply == QMessageBox.Yes:
- self.current_domain['items']=[i for i in self.current_domain['items'] if i['id']!=iid]
- self.store.save_domain(self.current_domain)
- self.reload_items()
- def clone_item(self):
- sel=self.item_list.currentItem()
- if not sel or not self.current_domain: return
- iid=sel.data(Qt.UserRole)
- itm=[i for i in self.current_domain['items'] if i['id']==iid][0]
- new_item=itm.copy(); new_item['id']=str(uuid.uuid4()); new_item['description']+=" (clone)"
- self.current_domain['items'].append(new_item)
- self.store.save_domain(self.current_domain)
- self.reload_items()
- def move_item(self,d):
- sel=self.item_list.currentRow()
- items=self.current_domain['items']; n=sel+d
- if 0<=n<len(items):
- items[sel],items[n]=items[n],items[sel]
- self.store.save_domain(self.current_domain)
- self.reload_items()
- self.item_list.setCurrentRow(n)
- def apply_changes(self):
- if self.current_domain:
- self.store.save_domain(self.current_domain)
- QMessageBox.information(self,"Apply","Changes applied for domain: "+self.current_domain['domain'])
- else:
- QMessageBox.warning(self,"Apply","No domain selected")
- # -----------------------------
- # Multi-Domain Simulator (Standard "First Match Wins" Logic)
- # -----------------------------
- def check_rule_match(self, item, user, ip, request_domain):
- """
- Enhanced Helper function to check ALL criteria: User, IP, AND Domain/Path.
- Returns False at the first failed check.
- """
- # 1. USER MATCH CHECK
- applies = item.get("applies_on", [])
- users = item.get("users", [])
- if applies and applies != ["-"] and user and user not in applies:
- return False
- if users and user and user not in users:
- return False
- # 2. IP MATCH CHECK (Source Network)
- if item.get('source_net') and ip and ip not in item['source_net']:
- return False
- # 3. DOMAIN/PATH MATCH CHECK
- # The domain defined in the policy (e.g., 'ntm.com' or '/')
- rule_domain = item.get('path','/').strip("/")
- wildcard = item.get('wildcard', False)
- # Request Domain from Simulator (e.g., 'visa.com' or 'ntm.com/login')
- input_domain_part = request_domain.split('/', 1)[0].strip()
- input_path_part = request_domain.strip().strip('/')
- # Rule 1: Specific Domain Policy (e.g., in domain:ntm.com)
- # Check if the domain being requested matches the policy domain *or* the rule is wildcarded.
- if item['domain'] != '*':
- if input_domain_part != item['domain']:
- return False # Request for visa.com cannot match rule in ntm.com domain.
- # Now check the path part of the request against the rule_domain/path
- if wildcard:
- # Wildcard check (e.g., rule_domain='ntm.com/path' should match 'ntm.com/path/sub')
- if not input_path_part.startswith(rule_domain):
- return False
- else:
- # Exact path match (e.g., rule_domain='ntm.com' must match exactly 'ntm.com')
- if input_path_part != rule_domain:
- return False
- # Rule 2: Catch-All Policy (in domain:*)
- elif item['domain'] == '*':
- # If the domain is the catch-all, the rule applies if it passed IP/User checks above.
- # The catch-all rule itself may contain a path filter (e.g. *.exe)
- if rule_domain != '/':
- # If the catch-all rule has a specific path/file filter, check it against the input path.
- if wildcard:
- if not input_path_part.endswith(rule_domain):
- return False
- else:
- if input_path_part != rule_domain:
- return False
- return True
- def run_simulator(self):
- if not self.store.r: return QMessageBox.warning(self, "Connection Required", "Please connect to Redis first.")
- user = self.user_sim.text().strip()
- ip = self.ip_sim.text().strip()
- domain_path = self.domain_sim.text().strip().strip("/")
- if not domain_path:
- QMessageBox.warning(self, "Simulator", "Please enter a Domain/Path to simulate (e.g., visa.com).")
- return
- domain_priority_list = self.store.domain_order()
- matched_rule = None
- final_action = "deny"
- # 1. TIER 1 & 2 LOOPS: Standard "First Match Wins"
- for domain_name in domain_priority_list:
- domain_data = self.store.load_domain(domain_name)
- for item in domain_data.get('items', []):
- # Attach the domain name to the item data for the helper function to use
- item['domain'] = domain_name
- if self.check_rule_match(item, user, ip, domain_path):
- # MATCH FOUND! (First Match Wins)
- final_action = item.get('action', 'deny')
- matched_rule = {
- "domain": domain_name,
- "description": item.get('description','(no description)'),
- "action": final_action
- }
- # Stop the entire evaluation process immediately
- break
- if matched_rule:
- break
- # 2. Present Results with Enhanced Feedback
- if matched_rule:
- msg = "Policy Evaluation Complete (First Match Wins):\n\n"
- msg += f"MATCH FOUND in Domain: {matched_rule['domain']}\n"
- msg += f"Rule Description: {matched_rule['description']}\n"
- msg += f"\nFinal Decision: **{final_action.upper()}**"
- else:
- # This is the state where the request is denied due to a lack of matching ALLOW rule.
- msg = "No matching rules found across all prioritized domains.\n\n"
- msg += "The request was not explicitly allowed by any rule, including the domain:*** catch-all.\n"
- msg += "Final Decision: **DENY** (Default Explicit Deny)\n\n"
- msg += f"***TIP:*** Double-check that all simulator inputs (User: '{user}', IP: '{ip}', Domain: '{domain_path}') precisely match a policy rule filter."
- QMessageBox.information(self,"Simulator Result", msg)
- # -----------------------------
- # Run
- # -----------------------------
- if __name__=='__main__':
- app=QApplication(sys.argv)
- w=PolicyGUI()
- w.show()
- sys.exit(app.exec())
Advertisement
Add Comment
Please, Sign In to add comment