Test for columns

Test an individual image and display the results

In [1]:
# If running locally need to set up Cloudstor client to download images
# DON'T RUN THIS ON SWAN (or you'll get an error because webdav is not installed)
import webdav.client as wc
from credentials import * # Storing my CloudStor credentials in another file
# Set the connection options. CLOUDSTOR_USER and CLOUDSTOR_PW are stored in a separate credentials file.
options = {
    'webdav_hostname': 'https://cloudstor.aarnet.edu.au',
    'webdav_login':    CLOUDSTOR_USER,
    'webdav_password': CLOUDSTOR_PW,
    'webdav_root': '/plus/remote.php/webdav/'
}
# Ok let's initiate the client.
client = wc.Client(options)
In [2]:
# If on SWAN you might need to run this to install OpenCV
!pip install --user opencv-python 
Requirement already satisfied: opencv-python in /Users/tim/mycode/stock-exchange/lib/python3.7/site-packages (4.1.0.25)
Requirement already satisfied: numpy>=1.14.5 in /Users/tim/mycode/stock-exchange/lib/python3.7/site-packages (from opencv-python) (1.16.2)
You are using pip version 19.0.3, however version 19.1.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.
In [2]:
# COPIED FROM detect_columns
# MODIFIED FOR SWAN USE

import numpy as np
import cv2
import math
import statistics
import os
import pandas as pd
import time

# import pytesseract
from statistics import mean
import re

# Added
from IPython.display import display, HTML
import ipywidgets as widgets

def find_lines(img):
    gray = cv2.cvtColor(img,cv2.COLOR_BGR2GRAY)
    #th = cv2.adaptiveThreshold(gray,255,cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY,5,2)
    retval, th = cv2.threshold(gray,125,255,cv2.THRESH_BINARY+cv2.THRESH_OTSU)
    cv2.imwrite('data/th.jpg',th)
    kernel = np.ones((5,5),np.uint8)
    median = cv2.medianBlur(th, 11)
    # cv2.imwrite('data/median.jpg',median)
    #eroded = cv2.erode(median, kernel, iterations=1)
    #opened = cv2.morphologyEx(median, cv2.MORPH_OPEN, kernel)
    #th = cv2.adaptiveThreshold(gray,255,cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY,11,2)
    opened = cv2.morphologyEx(median, cv2.MORPH_OPEN, kernel, iterations=1)
    # cv2.imwrite('data/opened.jpg',opened)
    #v = np.median(median)
    #sigma = 0.33
    #lower = int(max(0, (1.0 - sigma) * v))
    #upper = int(min(255, (1.0 + sigma) * v))
    #edges = cv2.Canny(median, lower, upper)
    edges = cv2.Canny(opened,50,150,apertureSize=3)
    # cv2.imwrite('data/edges.jpg',edges)
    lines = cv2.HoughLinesP(image=edges,rho=1,theta=np.pi/180, threshold=200,lines=np.array([]), minLineLength=200,maxLineGap=100)
    return lines

def find_header(img):
    (h, w) = img.shape[:2]
    points = []
    cropped = img[0:round(h/4), 0:round(w-(w/5))]
    results = pytesseract.image_to_data(cropped, output_type=pytesseract.Output.DICT)
    for index, word in enumerate(results['text']):
        if re.search(r'Shares|Quotations|Buyers|Sellers|Business|Done', word, flags=re.IGNORECASE):
            # y = results['top'][index]
            points.append(results['top'][index])
    #y = round(mean(points))
    try:
        y = sorted(points)[0]
    except IndexError:
        y = 0
    return y
    
def check_for_skew(lines):
    angles = []
    # lines = find_lines(img)
    for line in lines:
        # print(line)
        for x1,y1,x2,y2 in line:
            if abs(y1 - y2) > 200 and x1 > 300:
                if y2 > y1:
                    radians = math.atan2((y2 - y1), (x2 - x1))
                else:
                    radians = math.atan2((y1 - y2), (x1 - x2))
                degrees = math.degrees(radians)
                angles.append(degrees)
    #print(angles)
    # print(statistics.median(angles))
    angle = statistics.median(angles) - 90
    return angle

def deskew(img, angle):
    (h, w) = img.shape[:2]
    center = (w // 2, h // 2)
    M = cv2.getRotationMatrix2D(center, angle, 1.0)
    rotated = cv2.warpAffine(img, M, (w, h),flags=cv2.INTER_CUBIC, borderMode=cv2.BORDER_REPLICATE)
    return rotated

def add_grid(img):
    h, w = img.shape[:2]
    for x in range(0, w, 100):
        cv2.line(img,(x,0),(x,h),(255,0,0),1) 
    for y in range(0, h, 100):
        cv2.line(img,(0,y),(w,y),(255,0,0),1)
    return img

def find_columns(lines, h, w):
    x_values = []
    for line in lines:
        for x1,y1,x2,y2 in line:
            top = y1 if y1 < y2 else y2
            first = x1 if x1 < x2 else x2
            if abs(x1 - x2) < 10 and top < (h - 600):
                x_values.append(first)
    x_values = sorted(x_values)
    # print(x_values)
    clusters = []
    start = 0
    distance = 10
    cluster = []
    for x in x_values:
        if x < start + distance:
            cluster.append(x)
        else:
            if cluster:
                clusters.append(cluster)
            cluster = [x]
        start = x
    clusters.append(cluster)
    columns = []
    start = 0
    gutter = 0
    # print(clusters)
    for cluster in clusters:
        if cluster and cluster[0] < 600:
            if cluster[0] < 50:
                gutter = 0
            else:
                gutter = cluster[0] - 50
            start = gutter
        else:
            for width in reversed(range(900, 1200, 100)):
                if cluster and cluster[0] > start + width and cluster[0] < (w - 600) and (cluster[0] - start) < 2000:
                    columns.append(mean(cluster))
                    start = cluster[-1]
                    break
    return (gutter, columns)

def resize(img, h, w):
    scale = 5000 / float(w)
    resized = cv2.resize(img, None, fx=scale, fy=scale, interpolation = cv2.INTER_AREA)
    return resized

def save_header(img, header, w, image_name, output_dir):
    # numpy slicing
    # roi = im[y1:y2, x1:x2]
    header_dir = os.path.join(output_dir, 'headers')
    header_img = img[0:header+20, 0:w]
    cv2.imwrite('{}/{}-header.jpg'.format(header_dir, image_name[:-4]), header_img)
    
def save_columns(img, columns, header, h, image_name, output_dir):
    col_dir = os.path.join(output_dir, 'columns')
    for index, column in enumerate(columns):
        try:
            next_col = columns[index+1]
        except IndexError:
            pass
        else:
            if column > 20:
                this_col = column - 20
            else:
                this_col = column
            col_img = img[header-20:h, this_col:next_col]
            cv2.imwrite('{}/{}-col-{}.jpg'.format(col_dir, image_name[:-4], index+1), col_img)

# THIS FUNCTION HAS BEEN MODIFIED
def process_image(image_name, image_path, output_dir='test', markup=False, grid=False):
    img = cv2.imread(image_path)
    # This is just to weed out dodgy images
    try:
        h, w = img.shape[:2]
    except AttributeError:
        print('Not a valid image')
    else:
        if w > 5000:
            img = resize(img, h, w)
            h, w = img.shape[:2]
        lines = find_lines(img)
        angle = check_for_skew(lines)
        if angle != 0.0:
            img = deskew(img, angle)
            lines = find_lines(img)
        gutter, columns = find_columns(lines, h, w)
        # Header detection needs Tesseract 3.05 or greater, SWAN has 3.04
        # header = find_header(img)
        if grid:
            img = add_grid(img)
        if markup:
            cv2.line(img,(gutter,0),(gutter,h),(0,255,0),3)
            for column in columns:
                cv2.line(img,(column,0),(column,h),(0,0,255),3)
            # cv2.line(img,(0, header),(w, header),(255,0,0),3)
            # This has been changed to save as a generic name
            cv2.imwrite('{}/{}'.format(output_dir, image_name), img)
            # Display as HTML
            if set_filename:
                display(HTML('<img src="{}/{}?{}">'.format(output_dir, image_name, time.time())))
            else:
                with out:
                    display(HTML('<img src="{}/{}?{}">'.format(output_dir, image_name, time.time())))
        else:
            save_header(img, header, w, image_name, output_dir)
            columns = [gutter] + columns + [w] 
            save_columns(img, columns, header, h, image_name, output_dir)
In [3]:
def download_image(image):
    client.download_sync(remote_path=image['path'], local_path='test/test.jpg')  

def test_image(b):
    if b:
        out.clear_output()
    df = pd.read_csv('files.csv')
    if set_filename:
        image_name = set_filename
    else:
        image_name = filename.value
    if not image_name:
        image = df.sample(1).iloc[0]
    else:
        image = df.loc[df['name'] == image_name].iloc[0]
    print(image)
    image_path = os.path.join(os.sep, 'webdav', image['path'])
    if not os.path.exists(image_path):
        download_image(image)
        image_path = 'test/test.jpg'
    process_image('test-cols.jpg', image_path, markup=True)     
In [6]:
# Widgets don't work on SWAN at the moment
# So insert and image name below and then run the this and the next cell
set_filename = ''
In [8]:
if set_filename:
    test_image(None)
else:
    out = widgets.Output()
    filename = widgets.Text(
            value='',
            placeholder='Enter image filename',
            description='Filename:',
            disabled=False
        )

    detect = widgets.Button(
        description='Detect columns',
        disabled=False,
        button_style='primary', # 'success', 'info', 'warning', 'danger' or ''
        tooltip='Click me',
        icon='check'
    )

    detect.on_click(test_image)

    display(widgets.VBox([widgets.HTML('<p>Leave box blank for a random image</p>'), widgets.HBox([filename, detect]), out]))
directory                                    AU NBAC N193-102/
name                                         N193-102_0259.tif
path         Shared/ANU-Library/Sydney Stock Exchange 1901-...
Name: 32839, dtype: object
In [ ]: