Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import io
- import os
- from typing import Dict
- import pandas as pd
- from pypdf import PdfReader
- import pypdfium2 as pdfium # LICENSE OK
- from PIL import Image
- from tqdm import tqdm
- # Caminhos das pastas e arquivos
- docs_folder = "docs"
- output_images_folder = "output_images"
- os.makedirs(output_images_folder, exist_ok=True)
- # Carrega os CSVs
- docs_df = pd.read_csv("document_base.csv")
- invoices_df = pd.read_csv("invoice_base.csv")
- # selecionar do docs_df apenas as linhas onde a coluna invoices_result.items() tem len == 1
- # passar para imagem cada pagina do pdf.
- # guardar nas images/ 1 pasta por fatura e passar o caminho de cada pasta salva.
- print('before docs_df:', docs_df.shape)
- docs_df = docs_df[docs_df['invoices_result'].apply(lambda x: len(eval(x).items()) == 1)]
- print('after docs_df:', docs_df.shape)
- def _fix_and_read_pdf(
- pdf_bytes: bytes
- ) -> PdfReader:
- """
- Attempt to fix PDF EOF marker issues and read the content.
- Parameters
- ----------
- pdf_bytes : bytes
- Bytes object of the PDF file.
- Returns
- -------
- PdfReader
- PdfReader object of the fixed PDF file.
- """
- EOF_MARKER = b'%%EOF'
- if EOF_MARKER in pdf_bytes:
- # we can remove the early %%EOF and put it at the end of the file
- pdf_bytes = pdf_bytes.replace(EOF_MARKER, b'')
- pdf_bytes = pdf_bytes + EOF_MARKER
- else:
- #find last %%
- last_percent_index = pdf_bytes.rfind(b'%%')
- if len(pdf_bytes) - last_percent_index < 10:
- pdf_bytes = pdf_bytes[:-last_percent_index] + EOF_MARKER
- else:
- # Some files really don't have an EOF marker
- pdf_bytes += EOF_MARKER
- return PdfReader(io.BytesIO(pdf_bytes))
- def convert_pdf_to_images(
- pdf_bytes: bytes
- ) -> Dict[str, Image.Image]:
- """
- Convert a PDF document to images.
- Parameters
- ----------
- pdf_bytes : bytes
- Bytes object of the PDF file.
- Returns
- -------
- Dict[str, Image.Image]
- Dictionary with the images by page from the PDF
- """
- pdf_document= None
- try:
- # Convert pdf_bytes to a PdfDocument object
- pdf_document = pdfium.PdfDocument(pdf_bytes)
- dict_images = {}
- for page_number, page in enumerate(pdf_document):
- image_id = f"PAGE_{page_number + 1}"
- # Render page to a bitmap and convert it to a PIL image
- image_pil = page.render(scale=.5).to_pil() # scale=2 corresponds to roughly 200 DPI
- dict_images[image_id] = image_pil
- return dict_images
- except Exception as e:
- print('Erro no pdf')
- finally:
- if pdf_document:
- pdf_document.close()
- import io
- pdf_df = docs_df[docs_df['filename'].str.endswith('.pdf')]
- image_df = docs_df[~docs_df['filename'].str.endswith('.pdf')]
- images = {}
- for file_path in pdf_df['filename']:
- try:
- with open(f'docs/{file_path}', 'rb') as f:
- original_pdf_bytes = f.read()
- fixed_pdf_reader = _fix_and_read_pdf(original_pdf_bytes)
- fixed_pdf_bytes = fixed_pdf_reader.stream.getvalue()
- file_images = convert_pdf_to_images(fixed_pdf_bytes)
- # print(f"PDF '{file_path}' converted to images: {list(file_images.values())}")
- images[f'{file_path.replace(".pdf", "")}_Fatura_1'] = {'images': list(file_images.values())}
- # print(images)
- except Exception as e:
- print(f"Error processing '{file_path}': {e}")
- for file_path in image_df['filename']:
- print(f"Processing '{file_path}'...")
- file_image = [Image.open(f'docs/{file_path}')]
- images[f'{file_path.replace(".JPG", "")}_Fatura_1'] = {'images': file_image}
- print(f"Image '{file_path}' loaded")
- for idx, row in invoices_df.iterrows():
- invoice_name = row['invoice_name']
- if invoice_name in images.keys():
- result_data = row.drop('invoice_name').to_dict()
- images[invoice_name]['result'] = result_data
- #print(images)
- finetune_data = []
- system_instruction = "You are an AI assistant for invoice fields extraction. You will receive the text of an invoice. You will receive a list of fields to fill in. Think step by step and extract the requested fields, always in text, one by one, with attention to detail"
- user_instruction = """You are an image processing language model specialized in extracting invoice details from images. Given an image of an invoice, extract the following fields:
- invoice_number: The invoice number.
- billing_date: The billing date.
- supplier_vat: The supplier's VAT number.
- supplier_country: The supplier's country.
- supplier_name: The supplier's name.
- customer_vat: The customer's VAT number.
- customer_country: The customer's country.
- customer_address_street: The customer's address street.
- customer_address_zip_code: The customer's zip code.
- customer_address_city: The customer's city.
- total_invoice_without_taxes: The total amount of the invoice without taxes.
- total_invoice_with_taxes: The total amount of the invoice with taxes.
- invoice_currency: The currency of the invoice.
- invoice_description: A description of the invoice.
- invoice_type: The type of the invoice.
- observation: Any additional observations.
- If any field is not present in the image, set its value to null.
- Return the extracted data as a JSON object with the keys exactly as listed above.
- """
- for invoice_name, data in images.items():
- #print(invoice_name)
- invoice_images = data['images']
- invoice_data = data['result']
- imagens = [{"type": "image", "image": image} for image in invoice_images]
- if len(imagens) == 0:
- print(f"Skipping '{invoice_name}' as it has no images")
- continue
- #imagens = [{'type': 'text', 'text': 'The following images are the invoice images'}, {'type': 'text', 'text': 'The following images are the invoice images'}]
- sample_instruction = [
- {
- "role": "system",
- "content": [{"type": "text", "text": system_instruction}]
- },
- {
- "role": "user",
- "content": imagens + [{"type": "text", "text": user_instruction}]
- },
- {
- "role": "assistant",
- "content": [{"type": "text", "text": invoice_data} ]
- },
- ]
- finetune_data.append({'messages': sample_instruction})
- #finetune_data.append(sample_instruction)
- #print(finetune_data)
- from unsloth import FastVisionModel # FastLanguageModel for LLMs
- model, tokenizer = FastVisionModel.from_pretrained(
- "unsloth/Qwen2.5-VL-7B-Instruct-bnb-4bit",
- load_in_4bit = True, # Use 4bit to reduce memory use. False for 16bit LoRA.
- use_gradient_checkpointing = False #"unsloth", # True or "unsloth" for long context
- )
- model = FastVisionModel.get_peft_model(
- model,
- finetune_vision_layers = False, # False if not finetuning vision layers
- finetune_language_layers = True, # False if not finetuning language layers
- finetune_attention_modules = True, # False if not finetuning attention layers
- finetune_mlp_modules = True, # False if not finetuning MLP layers
- r = 16, # The larger, the higher the accuracy, but might overfit
- lora_alpha = 16, # Recommended alpha == r at least
- lora_dropout = 0,
- bias = "none",
- random_state = 3407,
- use_rslora = False, # We support rank stabilized LoRA
- loftq_config = None, # And LoftQ
- # target_modules = "all-linear", # Optional now! Can specify a list if needed
- )
- from unsloth import is_bf16_supported
- from unsloth.trainer import UnslothVisionDataCollator
- from trl import SFTTrainer, SFTConfig
- FastVisionModel.for_training(model)
- trainer = SFTTrainer(
- model = model,
- tokenizer = tokenizer,
- data_collator = UnslothVisionDataCollator(model, tokenizer), # Must use!
- train_dataset = finetune_data,
- args = SFTConfig(
- per_device_train_batch_size = 1,
- gradient_accumulation_steps = 1,
- warmup_steps = 5,
- max_steps = 15,
- #num_train_epochs = 1, # Set this instead of max_steps for full training runs
- learning_rate = 2e-4,
- fp16 = not is_bf16_supported(),
- bf16 = is_bf16_supported(),
- logging_steps = 1,
- optim = "adamw_8bit",
- weight_decay = 0.01,
- lr_scheduler_type = "linear",
- seed = 3407,
- output_dir = "outputs",
- report_to = "none", # For Weights and Biases
- # You MUST put the below items for vision finetuning:
- remove_unused_columns = False,
- dataset_text_field = "",
- dataset_kwargs = {"skip_prepare_dataset": True},
- dataset_num_proc = 4,
- max_seq_length = 2048,
- ),
- )
- trainer_stats = trainer.train()
- model.save_pretrained("lora_model") # Local saving
- tokenizer.save_pretrained("lora_model")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement