Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- import os
- import sys
- import time
- import math
- import argparse
- import logging
- import subprocess
- from pathlib import Path
- import numpy as np
- import cv2
- import torch
- from PIL import Image
- from scipy.fftpack import dct, idct
- from scipy.ndimage import gaussian_filter
- from skimage.color import rgb2lab, lab2rgb
- import pywt
- import torchvision
- from torchvision.models import resnet50, ResNet50_Weights
- from torchvision import transforms
- logging.basicConfig(level=logging.DEBUG, format='%(message)s')
- #clip extreme values
- def clip_percentile(value, low, high):
- value = value - value.mean() #shift mean to 0
- low_bound = np.percentile(value, low)
- high_bound = np.percentile(value, high)
- value = np.clip(value, low_bound, high_bound)
- return value, low_bound, high_bound
- #Clip value extremes (percentile/100%)
- #Set value mean 0.0 and linearly scale range to -1.0, 1.0
- def normalize_max(value, percentile):
- value, lo, hi = clip_percentile(value, percentile, 100.0-percentile)
- max_value = max(abs(lo), abs(hi))
- if(not max_value):
- return np.zeros_like(value)
- value = value / max_value #scale max to -+1.0
- return value
- #Scale by lerp such [that 0, 1, max_scale] input and output match
- def lerp_factor(value, strength, max_scale):
- if(strength>1.0):
- lerp_value = strength/max_scale
- value = value*(1.0-lerp_value) + max_scale*lerp_value
- else:
- value*=strength
- return value
- class Image_protector:
- def __init__(self):
- self.supported_formats = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp'}
- #Load pre-trained ResNet50 model
- self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
- self.model = torchvision.models.resnet50(weights=ResNet50_Weights.DEFAULT).to(self.device)
- self.model.eval()
- #Define image preprocessing
- self.resize_size = (256, 256)
- self.preprocess = transforms.Compose([
- transforms.Resize(self.resize_size),
- transforms.ToTensor(),
- transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
- ])
- #TODO: Create presets for different levels of protection
- #Apply protections to image
- def protect_image(self, image_path, output_dir='protected_images', intensity=1.0):
- #Scale factors by intensity, 0 to 255
- shift_strength = 2.0
- dct_strength = 24.0
- wavelet_strength = 8.0
- fourier_strength = 20.0
- pertubation_strength = 4.0
- arr_intensity = np.array( [shift_strength, dct_strength, wavelet_strength, fourier_strength, pertubation_strength] )
- shift_strength, dct_strength, wavelet_strength, fourier_strength, pertubation_strength = lerp_factor(arr_intensity, intensity, 255.0)
- print("Mix strengths:\nshift %.1f\ndct %.1f\nwave %.1f\nfft %.1f\npertub %.1f" % (shift_strength, dct_strength, wavelet_strength, fourier_strength, pertubation_strength))
- try:
- file_extension = os.path.splitext(image_path)[1].lower()
- if file_extension not in self.supported_formats:
- return f"Unsupported file format: {file_extension}"
- #Make folder
- os.makedirs(output_dir, exist_ok=True)
- logging.debug(f"Processing image: {image_path}")
- #format pixels
- with Image.open(image_path) as img:
- image = np.array(img)
- if len(image.shape) == 2: #grayscale to RGB
- image = np.stack((image,)*3, axis=-1)
- elif image.shape[2] == 4: #Remove alpha
- image = image[:,:,:3]
- #Convert to BGR
- image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
- #Apply protections
- protected_image = image
- protected_image = self.apply_color_shift(protected_image, strength=shift_strength)
- protected_image = self.apply_noise_tf(protected_image, col=0, strength=dct_strength)
- protected_image = self.apply_noise_tf(protected_image, col=1, strength=wavelet_strength)
- protected_image = self.apply_noise_tf(protected_image, col=2, strength=fourier_strength)
- protected_image = self.apply_perturbation(protected_image, strength=pertubation_strength, repeat_count=5, is_adversarial=1)
- #Save to file
- final_image_path = os.path.join(output_dir, f'p_{os.path.basename(image_path)}')
- pil_image = Image.fromarray(cv2.cvtColor(protected_image, cv2.COLOR_BGR2RGB))
- if file_extension.lower() in ['.jpg', '.jpeg']:
- pil_image.save(final_image_path, quality=95)
- else:#png
- pil_image.save(final_image_path)
- logging.debug(f"\nSaved protected image with embedded info: {final_image_path}")
- return f"Image processing complete. Protected image saved as {final_image_path}"
- except Exception as e:
- logging.error(f"Error processing image: {str(e)}", exc_info=True)
- return f"Error processing image {image_path}: {str(e)}"
- ### Generate top class features and mix them with original image ###
- #is_adversarial=0 increases top class probs
- #is_adversarial=1 reduces top class probs
- def apply_perturbation(self, image, strength, repeat_count, is_adversarial=0):
- debug_string = "adversarial" if is_adversarial else "reinforcing"
- logging.debug("\nApplying %s perturbation", debug_string)
- orig_image = image.astype(np.float32) / 255.0
- perturbed_image = image #original image mixed with accumulated perturbation
- accumulate = 0.0
- #accumulate features top guess features
- for i in range(repeat_count):
- logging.debug("Tensor pass No. %s/%s", i + 1, repeat_count)
- #Prepare image
- perturbed_image = Image.fromarray(perturbed_image)
- img_tensor = self.preprocess(perturbed_image).unsqueeze(0).to(self.device)
- img_tensor = img_tensor.requires_grad_(True)
- #Feed image to tensor and get top class
- output = self.model(img_tensor)
- probs = torch.nn.functional.softmax(output, dim=1)
- top_class = probs.argmax(dim=1)
- loss = torch.nn.functional.log_softmax(output, dim=1)[0, top_class.item()]
- loss.backward()
- #Generate
- perturbation = img_tensor.grad.data
- perturbation*= -1.0 if is_adversarial else 1.0 #invert to subtract the detected features
- perturbation = torch.nn.functional.interpolate(
- perturbation, size=image.shape[:2], mode='bicubic', align_corners=False
- ).squeeze().permute(1, 2, 0).cpu().numpy()
- accumulate+= perturbation
- #Mix accumulated perturbations with image
- new_delta = normalize_max(accumulate,0.2)
- new_delta*= (strength/255.0)
- perturbed_image = np.clip(orig_image + new_delta, 0.0, 1.0) * 255.0
- perturbed_image = np.round(perturbed_image).astype(np.uint8)
- logging.info("Lum delta: %f", (perturbed_image / 255.0).mean() - orig_image.mean())
- return perturbed_image
- ### Add noise to a color channel transform ###
- def apply_noise_tf(self, image, col, strength):
- tf_list = ["dct", "wavelet", "fourier"]
- tf_channel = ["blue", "green", "red"]
- tf_enum = [0,1,2]
- logging.debug("Applying %s watermark", tf_list[col])
- col_channel = image[:,:,col].astype(float)
- #transform channel
- if(tf_list[col] == "dct"):
- transform = dct(dct(col_channel.T, norm='ortho').T, norm='ortho')
- elif(tf_list[col] == "wavelet"):
- transform, (cH, cV, cD) = pywt.dwt2(col_channel, 'haar')
- else:#fft
- transform = np.fft.fft2(col_channel, norm='ortho')
- #add watermark
- noise_map = np.random.normal(0, 1, transform.shape)
- transform+= noise_map
- #inverse transform
- if(tf_list[col] == "dct"):
- new_channel = idct(idct(transform.T, norm='ortho').T, norm='ortho')
- elif(tf_list[col] == "wavelet"):
- new_channel = pywt.idwt2((transform, (cH, cV, cD)), 'haar')
- else:#fft
- new_channel = np.fft.ifft2(transform, norm='ortho').real
- #add the delta to the original image
- new_delta = new_channel - col_channel
- new_delta = gaussian_filter(new_delta, sigma=2.0)
- new_delta = new_delta - new_delta.mean() #center to zero
- new_delta = new_delta / np.max(np.abs(new_delta)) #normalize
- col_channel+= new_delta * strength
- image[:,:,col] = np.clip(col_channel, 0, 255).astype(np.uint8)
- return image
- ### Split hue and chroma to bins and shift them by random amount ###
- #strength = 0 to 255
- def apply_color_shift(self, image, strength=1.0):
- logging.debug("Applying color shift")
- strength = strength/255.0*180.0 #rgb to degrees
- lab = rgb2lab(image)
- L, a, b = lab[..., 0], lab[..., 1], lab[..., 2]
- C = np.sqrt(a**2 + b**2)
- H = np.arctan2(b, a) #radians [-, ]
- hue_rads = (strength * math.pi / 360.0)
- croma_shift = strength/1.8 #range 0 to 100
- num_bins_H = 16
- num_bins_C = 16
- C_max = max(1e-12, np.max(C))
- #Compute bin indices using uniform bins
- H_idx = np.floor((H + np.pi) / (2 * np.pi) * num_bins_H).astype(np.int32)
- C_idx = np.floor(C / C_max * num_bins_C).astype(np.int32)
- #Clamp indices
- H_idx = np.clip(H_idx, 0, num_bins_H - 1)
- C_idx = np.clip(C_idx, 0, num_bins_C - 1)
- #Generate jitter arrays once
- #Generate random jitter offsets per bin (fixed for the image)
- hue_jitter_offsets = np.random.uniform(-hue_rads, hue_rads, num_bins_H)
- chroma_jitter_offsets = np.random.uniform(-croma_shift, croma_shift, num_bins_C)
- #Apply jitter
- H_new = H + (hue_jitter_offsets[H_idx])
- C_new = C + (chroma_jitter_offsets[C_idx])
- a_new = C_new * np.cos(H_new)
- b_new = C_new * np.sin(H_new)
- lab_mod = np.stack([L, a_new, b_new], axis=-1)
- jittered_image = np.round(lab2rgb(lab_mod) * 255.0)
- jittered_image = np.clip(jittered_image, 0, 255)
- return jittered_image.astype(np.uint8)
- def batch_protect_image(self, image_paths, output_dir='./', **kwargs):
- os.makedirs(output_dir, exist_ok=True)
- total_images = len(image_paths)
- results = []
- for i, image_path in enumerate(image_paths):
- result = self.protect_image(image_path, output_dir, **kwargs)
- results.append(result)
- yield (i + 1) / total_images #Yield progress
- return results
- #Ensure metadata is unchanged
- import subprocess
- def copy_metadata(original_path, protected_path):
- subprocess.run([
- "exiftool",
- "-TagsFromFile", str(original_path),
- "-all:all",
- "-unsafe",
- "-icc_profile",
- "-o", str(protected_path) + ".tmp",
- str(protected_path)
- ], check=True)
- #Then move tmp back to original
- import os
- os.replace(str(protected_path) + ".tmp", str(protected_path))
- print(f"Metadata copied from {original_path.name} to {protected_path.name}")
- #AI classification test
- def classify_image(image_path, model, preprocess, device):
- img = Image.open(image_path).convert('RGB')
- img_t = preprocess(img).unsqueeze(0).to(device)
- with torch.no_grad():
- outputs = model(img_t)
- return torch.nn.functional.softmax(outputs, dim=1).cpu().numpy()
- def predict_image(image_path):
- #Load model
- model = torchvision.models.resnet50(weights=ResNet50_Weights.DEFAULT)
- model.eval()
- #Preprocess image
- transform = transforms.Compose([
- transforms.Resize(256),
- transforms.CenterCrop(224),
- transforms.ToTensor(),
- transforms.Normalize([0.485, 0.456, 0.406],
- [0.229, 0.224, 0.225])
- ])
- image = Image.open(image_path)
- input_tensor = transform(image).unsqueeze(0)
- #Run model
- with torch.no_grad():
- output = model(input_tensor)
- probs = torch.nn.functional.softmax(output[0], dim=0)
- top_prob, top_class = torch.topk(probs, 5)
- weights = ResNet50_Weights.DEFAULT
- model = resnet50(weights=weights)
- imagenet_classes = weights.meta["categories"]
- #Print top predictions
- for prob, cls_idx in zip(top_prob, top_class):
- print(f"{imagenet_classes[cls_idx]}: {prob.item():.4f}")
- if __name__ == "__main__":
- np.random.seed(int(time.time()))
- parser = argparse.ArgumentParser()
- parser.add_argument("images", nargs="+", help="Image file paths")
- parser.add_argument("--intensity", type=float, default=1.0, help="Intensity value (default: 1.0)")
- parser.add_argument("--output", type=str, default="./", help="Output path (default: \"./\")")
- args = parser.parse_args()
- image_paths = args.images
- protectedFolder = Path("./")
- output_paths = []
- for imgPath in image_paths:
- p = Path(imgPath)
- protectedName = f"p_{p.name}"
- output_paths.append(protectedFolder / protectedName)
- protector = Image_protector()
- #Process batch
- for progress, inputPath in zip(protector.batch_protect_image(image_paths, output_dir=args.output, intensity=args.intensity), image_paths):
- print(f"Progress: {progress * 100:.2f}% - Processing {inputPath}")
- for inputPath, outputPath in zip(image_paths, output_paths):
- #compare classification probs
- print(f"\nOriginal image classification prediction\n",inputPath)
- predict_image(inputPath)
- print(f"\nProtected image classification prediction\n",outputPath)
- predict_image(outputPath)
- print("")
- #Copy original metadata
- for original_path, protected_path in zip(image_paths, output_paths):
- copy_metadata(Path(original_path), Path(protected_path))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement