Find rows in a column

Use Tesseract to separate columns into rows.

In [1]:
import cv2
import pandas as pd
import numpy as np
try:
    from PIL import Image
except ImportError:
    import Image
import pytesseract
from statistics import mean
import math
import statistics
import re
import os
import tempfile
from fuzzywuzzy import fuzz
from tqdm.auto import tqdm
from pathlib import Path
In [2]:
# These OCR image preprocessing steps are based on https://stackoverflow.com/a/43493383
# I don't really understand why this particular combination of filters works, but it does seem to improve OCR results

BINARY_THRESHOLD = 200

def process_image_for_ocr(file_path):
    # TODO : Implement using opencv
    temp_filename = set_image_dpi(file_path)
    im_new = remove_noise_and_smooth(temp_filename)
    return im_new


def set_image_dpi(file_path):
    im = Image.open(file_path)
    #length_x, width_y = im.size
    #factor = max(1, int(IMAGE_SIZE / length_x))
    #size = factor * length_x, factor * width_y
    # size = (1800, 1800)
    #im_resized = im.resize(size, Image.ANTIALIAS)
    temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.jpg')
    temp_filename = temp_file.name
    im.save(temp_filename, dpi=(300, 300))
    return temp_filename


def image_smoothening(img):
    ret1, th1 = cv2.threshold(img, BINARY_THRESHOLD, 255, cv2.THRESH_BINARY)
    ret2, th2 = cv2.threshold(th1, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
    blur = cv2.GaussianBlur(th2, (1, 1), 0)
    ret3, th3 = cv2.threshold(blur, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
    return th3


def remove_noise_and_smooth(file_name):
    img = cv2.imread(file_name, 0)
    filtered = cv2.adaptiveThreshold(img.astype(np.uint8), 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY, 41, 3)
    kernel = np.ones((1, 1), np.uint8)
    opening = cv2.morphologyEx(filtered, cv2.MORPH_OPEN, kernel)
    closing = cv2.morphologyEx(opening, cv2.MORPH_CLOSE, kernel)
    img = image_smoothening(img)
    or_image = cv2.bitwise_or(img, closing)
    return or_image
In [3]:
def get_ocr(image_path):
    temp_img = process_image_for_ocr(image_path)
    df = pytesseract.image_to_data(temp_img,  config='--psm 4 --oem 1 -l eng', output_type=pytesseract.Output.DATAFRAME)
    return df

"""
def find_col_width(df):
    for confidence in reversed(range(60, 110, 10)):
        for heading in ['buyers', 'closing', 'quotations']:
            for word in df.loc[(df['level'] == 5) & (df['top'] < 100)].sort_values(by='top', ascending=False).itertuples():
                # print(word.text.lower())
                if len(str(word.text)) > 5 and fuzz.partial_ratio(heading, word.text.lower()) >= confidence:
                    # print(word)
                    # print(fuzz.ratio('buyers', word.text.lower()))
                    if word.left > 250:
                        return word.left
    return None
"""

def find_col_width(df):
    candidates = []
    for confidence in reversed(range(80, 110, 10)):
        for heading in ['buyers', 'closing', 'quotations']:
            for word in df.loc[(df['level'] == 5) & (df['left'] < 1500)].sort_values(by='top').itertuples():
                # print(word.text.lower())
                if len(str(word.text)) > 5 and fuzz.partial_ratio(heading, word.text.lower()) >= confidence:
                    # print(word)
                    # print(fuzz.ratio('buyers', word.text.lower()))
                    if word.left > 625:
                        candidates.append(word.left)
    try:
        lowest = sorted(candidates)[0] - 10
    except IndexError:
        lowest = None
    return lowest

def save_row(img, words, word_top, word_height, word_left, col_width, width, row_file):
    new_img = img.copy()
    top = words['top'].min()
    height = words['height'].max()
    left = 0
    if word_height:
        # print(top, top+height, word_top+word_height)
        cv2.line(new_img, (word_left, word_top + word_height), (col_width, word_top + word_height), (255,0,0), 1)
    row = new_img[max(0, word_top - 20):word_top + word_height + 20, left: word_left + width]
    cv2.imwrite(str(row_file), row)

def find_text(vol_dir, image_path, save_markup=False):
    col_data = []
    vol_id = re.search(r'(AU NBAC N193-\d+)', str(image_path)).group(1)
    # vol_id = 'AU NBAC N193-001'
    # print(vol_id)
    image_name = os.path.basename(image_path)
    page_id, col_id = re.search(r'N193-\d+_(\d+)\.*-col-(\d+).jpg', image_name).groups()
    page_id = int(page_id)
    col_id = int(col_id)
    # print(page_id, col_id)
    # output_path = Path(vol_dir, 'rows')
    output_path = Path(f'/Volumes/bigdata/mydata/stockexchange/rows/{vol_id}')
    output_path.mkdir(parents=True, exist_ok=True)
    #rowcol_path = Path(output_path, 'cols')
    #rowcol_path.mkdir(parents=True, exist_ok=True)
    df = get_ocr(image_path)
    img = cv2.imread(str(image_path))
    h, w = img.shape[:2]
    col_width = find_col_width(df)
    if not col_width:
        # col_width = 400
        col_width = 1000
    # print(col_width)
    if save_markup:
        new_img = np.zeros((h, w * 2, 3), np.uint8)
        new_img[:] = (255, 255, 255)
        new_img[0:h,0:w] = img
        cv2.line(new_img,(col_width, 0),(col_width, h),(0,0,255),1)
        ft = cv2.freetype.createFreeType2()
        ft.loadFontData(fontFileName='/Library/Fonts/Arial Unicode.ttf', id=0)
    row_id = 0
    for para, lines in df.loc[(df['level'] == 5)].groupby(by=['block_num', 'par_num']):
        for line, words in lines.groupby(by=['line_num']):
            left = 10000
            right = 0
            top = 10000
            height = 0
            heights = []
            tops = []
            name_parts = []
            for word in words.itertuples():
                # Make sure it's not just nonsense
                cleaned_word = re.sub(r'[^&%\(\)\"\w\s\/\-]', '', str(word.text))
                if cleaned_word and not cleaned_word.isspace() and (word.left + word.width) < (col_width + 20):
                    name_parts.append(str(word.text))
                    if word.left < left:
                        left = word.left
                    if word.top < top:
                        top = word.top
                    if word.height > height:
                        height = word.height
                    if word.left + word.width > right:
                        right = word.left + word.width
                    tops.append(word.top)
                    heights.append(word.height)
                    height = int(round(statistics.mean(heights)))
                    top = int(round(statistics.mean(tops)))
                    # row_file = Path(output_path, f'{image_name[:-4].replace(".", "")}-row-{row_id}.jpg')
                    # save_row(img, words, top, height, left, col_width, w, row_file)
            if name_parts:
                name_string = ' '.join(name_parts).replace('”', '"').replace('»', '"').replace('’', "'")
                # print(name_string)
                # Removes non-word characters & most punctuation
                cleaned_name = re.sub(r'[^&%\(\)\"\w\s\/\-\']', '', name_string)
                # OCR seems to turn dots into these words (and perhaps others)
                cleaned_name = re.sub(r'\s*\b[on,an,we,ee,oe,os,vs,\s]+\s*$', '', cleaned_name).strip()
                if cleaned_name and not cleaned_name.isspace():
                    # print(left, top, height, cleaned_name)
                    col_data.append({'vol_id': vol_id, 'page_id': page_id, 'col_id': col_id, 'row_id': row_id, 'text': cleaned_name, 'left': left, 'top': top, 'height': height, 'right': right })
                    if save_markup:
                        cv2.line(new_img,(0, top+height),(w * 2, top+height),(255,0,0),2)
                        ft.putText(new_img, cleaned_name, (w + 20, top + height), fontHeight=40, color=(0,  0, 0), thickness=-1, line_type=cv2.LINE_AA, bottomLeftOrigin=True)
                    row_file = Path(output_path, f'{image_name[:-4].replace(".", "")}-row-{row_id}.jpg')
                    save_row(img, words, top, height, left, col_width, w, row_file)
                    row_id += 1
    if save_markup:
        markup_img = Path(rowcol_path, f'{image_name[:-4]}-rows.jpg')
        cv2.imwrite(str(markup_img), new_img)
    return col_data

Process directories

In [ ]:
input_path = Path('/Volumes/bigdata/mydata/stockexchange/processed')
start_vol = 103

for vol_dir in tqdm([d for d in input_path.glob('*') if d.is_dir()], desc='Directories'):
    # print(img_dir)
    output_path = Path(vol_dir, 'rows')
    rowcol_path = Path(output_path, 'cols')
    rowcol_path.mkdir(parents=True, exist_ok=True)
    vol_num = int(re.search(r'(\d+)$', vol_dir.name).group(1))
    vol_data = []
    columns_dir = Path(vol_dir, 'columns')
    if vol_num >= start_vol:
        for img_name in tqdm([i for i in columns_dir.glob('*.jpg')]):
        # print(img_name)
            vol_data += find_text(vol_dir, img_name, save_markup=True)
        df_text = pd.DataFrame(vol_data).sort_values(by=['vol_id', 'page_id', 'col_id', 'row_id'])
        df_text.to_csv(f'vol-{vol_num}-text.csv')
In [ ]: