commit e86a67a123ebe6a217fcc89405984985e008c935 Author: Burathar Date: Mon Apr 6 17:28:42 2020 +0200 initial version diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..4d9aa3f --- /dev/null +++ b/__init__.py @@ -0,0 +1,25 @@ +import os + +from .reader import reader +from .formatter import formatter +from .writer import writer + +table_names = ['Bellen binnen Nederland', 'Bellen in het buitenland', 'Bellen naar het buitenland', 'Ontvangen gesprekken in het buitenland', 'Servicenummers - dienst', 'Servicenummers - verkeer', 'Servicenrs btw-vrij - dienst', 'Servicenrs btw-vrij - verkeer', 'Sms-berichten', 'Sms-en in het buitenland'] +first_column_names = ['Gekozen nummer', 'Land en nummer', 'Bestemming', 'Land en bestemming'] + +writer = writer() +reader = reader(writer, table_names, first_column_names) + +def extract(path, save_dir = os.getcwd()): + writer.set_save_dir(save_dir) + if path[-4:] == '.pdf': + reader.load(path) + reader.read_pdf() + elif os.path.isfile(path): + print('please parse a pdf file or folder with pdf files.') + return + else: + for filename in sorted(os.listdir(path)): + if filename[-4:] == '.pdf': + reader.load(os.path.join(path, filename)) + reader.read_pdf() \ No newline at end of file diff --git a/formatter.py b/formatter.py new file mode 100644 index 0000000..23cc6f9 --- /dev/null +++ b/formatter.py @@ -0,0 +1,49 @@ +import locale +import threading + +from re import search +from datetime import datetime +from contextlib import contextmanager + +class formatter: + + # source setlocale = https://stackoverflow.com/questions/18593661/how-do-i-strftime-a-date-object-in-a-different-locale + LOCALE_LOCK = threading.Lock() + + @staticmethod + @contextmanager + def setlocale(name): + with formatter.LOCALE_LOCK: + saved = locale.setlocale(locale.LC_ALL) + try: + yield locale.setlocale(locale.LC_ALL, name) + finally: + locale.setlocale(locale.LC_ALL, saved) + + @staticmethod + def date_time_merger(line, invoice_date): + with formatter.setlocale('nl_NL.UTF-8'): + invoice_datetime = datetime.strptime(invoice_date, '%d %B %Y') + invoice_year = invoice_datetime.year - 1 if invoice_datetime.month == 1 else invoice_datetime.year + date = [x for x in line if search("\d{2} \w{3}",x)] + time = [x for x in line if search("\d{2}:\d{2}",x)] + if len(date) < 1 or len(time) < 1: + print("Warning: Could not find date and time in {0}".format(line)) + return line + datetime_string = '{0} {1}'.format(date[0], time[0]) + with formatter.setlocale('nl_NL.UTF-8'): + linedate = datetime.strptime(datetime_string, '%d %b %H:%M').replace(year = invoice_year) + datestring = linedate.strftime('%Y-%m-%d %H:%M:%S') + return [line[0], datestring] +line[3:] + + @staticmethod + def remove_prefix(element): + element_text = element.text + if len(element_text) < 5: return + if element_text[3] == ' ': + element.text = element_text[4:] + return + + @staticmethod + def concat_line(line): + return ''.join([element + ';' for element in line])[:-1] + '\n' \ No newline at end of file diff --git a/reader.py b/reader.py new file mode 100644 index 0000000..11bb58e --- /dev/null +++ b/reader.py @@ -0,0 +1,140 @@ +import pdfquery +from lxml import etree +from .formatter import formatter +from .writer import writer + +class reader: + """Reads table data from KPN mobile incoices""" + + def __init__(self, writer, table_names, first_column_names): + self.writer = writer + self.first_column_names = first_column_names + self.requested_tables = table_names + + def load(self, file): + self.file = file + print('Loading %s' % str(self.file.split('\\')[-1:])) + self.pdf = pdfquery.PDFQuery(self.file) + page_count = len(self.pdf._pages) + #self.pdf.load(list(range(2,page_count))) + self.pdf.load() + #self.write_tree_to_xml() + + def write_tree_to_xml(self): + with open('xmltree.xml','wb') as f: + f.write(etree.tostring(self.pdf.tree, pretty_print=True)) + + @staticmethod + def get_coordinates(query): + assert 'x0' in query.attrib and 'y0' in query.attrib, 'querry doesn\'t contain coordinates' + return { 'x' : float(query.attrib['x0']), 'y' : float(query.attrib['y0'])} + + @staticmethod + def get_pageid(element) -> int: + pageid = next(element.iterancestors('LTPage')).attrib['pageid'] + if pageid == '': raise ValueError('element doesn\'t have a ancestor with pageid') + return int(pageid) + + def text_line_in_box_in_page(self, x0, y0, x1, y1, pageid, textbox = False): + lines = self.pdf.pq('%s:in_bbox("%s, %s, %s, %s")' % ('LTTextBoxHorizontal' if textbox else 'LTTextLineHorizontal', x0, y0, x1, y1)) + output = [line for line in lines if reader.get_pageid(line) == pageid] + return output + + def fix_boxvalue(self, line_results, coords, pageid_number): + line_box_results = self.text_line_in_box_in_page(coords['x']+1, coords['y']-2, coords['x']+300, coords['y'] + 8, pageid_number, True) + line_box_len = len(line_box_results) + line_box_index = 0 + for i in range(0, len(line_results)): + if line_box_index >= line_box_len: return line_results + if line_results[i].text == '' and line_box_results[line_box_index].text != '': + line_results[i] = line_box_results[line_box_index] + line_box_index += 1 + return line_results + + def read_table_line(self, first_element, header = False) -> str: + formatter.remove_prefix(first_element) + line_elements = [] + line_elements.append(first_element) + coords = reader.get_coordinates(first_element) + pageid_number = reader.get_pageid(first_element) + line_results = self.text_line_in_box_in_page(coords['x']+1, coords['y']-2, coords['x']+300, coords['y'] + 8, pageid_number) + if '' in [x.text for x in line_results]: + line_results = self.fix_boxvalue(line_results , coords, pageid_number) + line_elements += line_results + line_elements = sorted(line_elements, key=lambda x: float(x.attrib['x0'])) + line = [element.text.strip() for element in line_elements] + if header: + self.line_buffer.append(formatter.concat_line(line)) + else: + line = formatter.date_time_merger(line, self.invoice_date) + return formatter.concat_line(line) + + def read_table_part(self, header): + coords = reader.get_coordinates(header) + last_label = None + while True: + label_number = self.text_line_in_box_in_page(coords['x'], coords['y']-10, coords['x']+80, coords['y'], reader.get_pageid(header)) + if len(label_number) == 0: break + line = self.read_table_line(label_number[0]) + print(line[:-1]) + self.line_buffer.append(line) + coords = reader.get_coordinates(label_number[0]) + last_label = label_number[0] + return last_label + + def check_for_continuation(self, last_label, header_text): + pageid_last_label = reader.get_pageid(last_label) + coords_last_label = reader.get_coordinates(last_label) + label_totaal = self.text_line_in_box_in_page(coords_last_label['x'] - 1, coords_last_label['y']-15, coords_last_label['x']+165, coords_last_label['y'], pageid_last_label, True) + if len(label_totaal) != 0: + if label_totaal[0].text[:6] == 'Totaal' : return None + for label_vervolg in self.pdf.pq('LTTextLineHorizontal:contains("vervolg")'): + coords = reader.get_coordinates(label_vervolg) + if not 700 < coords['y'] < 715: continue + label_vervolg_page = reader.get_pageid(label_vervolg) + if coords_last_label['x'] < 300: + if label_vervolg_page != pageid_last_label: continue + if coords['x'] < 300: continue + else: + if label_vervolg_page != pageid_last_label + 1: continue + if coords['x'] > 300: continue + label_header2 = self.text_line_in_box_in_page(coords['x'], coords['y']-12, coords['x']+150, coords['y'], label_vervolg_page)[0] + if label_header2.text.strip() != header_text: continue + return label_header2 + return None + + def read_table(self, label_header): + label_start = label_header + while True: + last_label = self.read_table_part(label_start) + if last_label is None: break + label_start = self.check_for_continuation(last_label, label_header.text.strip()) + if label_start is None: break # No label matched + return + + def read_table_data(self, table_name): + for label_table_name in self.pdf.pq('LTTextLineHorizontal:contains("%s")' % table_name): + self.line_buffer = [] + table_name_coords = reader.get_coordinates(label_table_name) + label_header = self.text_line_in_box_in_page(table_name_coords['x'], table_name_coords['y']-10, table_name_coords['x']+75, table_name_coords['y'], reader.get_pageid(label_table_name)) + if len(label_header) == 0 : continue + if label_header[0].text.strip() in self.first_column_names: + self.read_table_line(label_header[0], True) + self.read_table(label_header[0]) + self.writer.write_buffer_to_file(self.line_buffer, table_name) + break + return + + def get_invoice_date(self): + date = self.text_line_in_box_in_page(440, 597, 519, 608, 1)[0] + return date.text.strip() + + def read_pdf(self): + print('Reading %s' % str(self.file.split('\\')[-1:])) + self.invoice_date = self.get_invoice_date() + for table_name in self.requested_tables: + self.read_table_data(table_name) + + + # in_bbox("x0,y0,x1,y1"): Matches only elements that fit entirely within the given bbox. + # overlaps_bbox("x0,y0,x1,y1"): Matches any elements that overlap the given bbox. diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..a22c5fb --- /dev/null +++ b/setup.py @@ -0,0 +1,10 @@ +from setuptools import setup + +setup(name='invoice_extractor', + version='1.0', + description='KPN invoice extractor', + long_description='A small python package to read, clean, and export table data from KPN mobile phone invoices.', + packages=['invoice_extractor'], + author = 'Marijn Kuypers', + author_email = '', + zip_safe=False) \ No newline at end of file diff --git a/writer.py b/writer.py new file mode 100644 index 0000000..0904bf2 --- /dev/null +++ b/writer.py @@ -0,0 +1,24 @@ +from pathlib import Path, PurePath +from os import makedirs, remove, path + +class writer: + + def __init__(self): + self.save_dir = PurePath(Path(__file__).parent.absolute(), '..') + pass + + def set_save_dir(self, new_save_dir): + self.save_dir = new_save_dir + + def append_to_file(self, line, filename): + makedirs(self.save_dir, exist_ok=True) + file = PurePath(self.save_dir, filename + '.csv') + with open(file, 'a', encoding='utf-8') as file: + file.write(line) + + def write_buffer_to_file(self, linearray, filename): + makedirs(self.save_dir, exist_ok=True) + file = PurePath(self.save_dir, filename + '.csv') + start = 1 if path.isfile(file) else 0 + with open(file, 'a', encoding='utf-8') as file: + file.writelines(linearray[start:])