LoginSignup
5
3

More than 3 years have passed since last update.

字幕から文字抽出してみた(OpenCV:GoogleCloudVisionAPI編)

Posted at

Motive

字幕から文字抽出してみた(OpenCV:tesseract-ocr編)の予告通りにGoogleCloudVisionAPIを使って字幕の文字を抽出してみたいと思います。

Method

まずGoogleCloudVisionAPIを使うにはgoogle cloud consoleにてアカウント登録してAPIキーを取得する必要があります。方法としては、Cloud Vision APIの使い方まとめ (サンプルコード付き)に書いてあるので参照してください。

import requests
import json
import base64
import cv2
import sys

if __name__ == "__main__":
    KEY = "--- your api key ---"
    url = 'https://vision.googleapis.com/v1/images:annotate?key='
    api_url = url + KEY
    # 画像読み込み
    img_file_path = sys.argv[1]
    mat = cv2.imread(img_file_path)
    # 字幕表示部分のみ
    roi = mat[435:600, :]

    # openCV -> base64
    result, dst_data = cv2.imencode('.png', roi)
    img_content = base64.b64encode(dst_data)

    # リクエストBody作成
    req_body = json.dumps({
        'requests': [{
            'image': {
                'content': img_content.decode('utf-8')
            },
            'features': [{
                'type': 'DOCUMENT_TEXT_DETECTION'
            }]
        }]
    })

    # リクエスト発行
    res = requests.post(api_url, data=req_body)
    # リクエストから画像情報取得
    res_json = res.json()['responses']
    if 0 < len(res_json[0]):
        textobj = res_json[0]['textAnnotations'][0]
        print("".join(textobj["description"].strip().split("\n")))

apiをリクエストするときは画像をbase64に文字列化してjsonを設計するのですが、


src = cv2.imread("image_path")
result, dst_data = cv2.imencode('.png', src)
img_content = base64.b64encode(dst_data)

でopenCVからの変換ができます。

また、このまま最低限に逐次処理でAPIを呼び出しつつ文字抽出をすればいいのですが、asyncio を使って並列処理をすれば処理スピードが上がります。 asyncioを使う理由としてはmultiprocessing.Poolよりはコルーチン内でAPIのレスポンス処理をブロックしていて安定しているためです。


async def main_image_process(src):
    #文字認識しやすいように加工
    gray_frame = pre_process(src.content)
    #テロップが出そうなところだけトリミング
    roi = gray_frame[435:600, :]
    #テキストを抽出
    text = await extractTelopText(roi)
    await asyncio.sleep(2)
    dst = await createFooterTelop(src.content)
    dst = await addJapaneseTelop(dst, text, 20, cap_height + telop_height - 30)
    dst = await addASCIITelop(dst, str(src.timestamp) + "[sec]", cap_width - 250, cap_height + telop_height - 10, color=(0,255,0))
    return MovieFrame(src.id, dst, src.timestamp)

if __name__ == "__main__":
    r = []
    loop = asyncio.get_event_loop()
    try:
        r = loop.run_until_complete(
            asyncio.gather(*[main_image_process(f) for f in frames])
        )
    finally:
        loop.close()

Python3.7以降だともっとシンプルに記述できるみたいですが、まだ安定しておらずcentOSのデフォルトパッケージがPython3.6だったので3.6ベースで書いています。

Development

コード全体です。

import sys
import cv2
import io
import os
import numpy as np
import base64
import json
import requests
import asyncio
from PIL import Image, ImageDraw, ImageFont
from collections import namedtuple
import time

MovieFrame = namedtuple("MovieFrame", ["id", "content", "timestamp"])
telop_height = 50
cap_width = 1
cap_height = 1

def pre_process(src):
    kernel = np.ones((3,3),np.uint8)
    gray = cv2.cvtColor(src, cv2.COLOR_BGR2GRAY)
    o_ret, o_dst = cv2.threshold(gray, 0, 255, cv2.THRESH_OTSU)
    dst = cv2.morphologyEx(o_dst, cv2.MORPH_OPEN, kernel)
    dst = cv2.bitwise_not(dst)
    dst = cv2.cvtColor(dst, cv2.COLOR_GRAY2BGR)
    return dst

async def extractTelopText(src):
    KEY = "--- your api key ---"
    url = 'https://vision.googleapis.com/v1/images:annotate?key='
    api_url = url + KEY

    message = ""
    result, dst_data = cv2.imencode('.png', src)
    img_content = base64.b64encode(dst_data)
    # リクエストBody作成
    req_body = json.dumps({
        'requests': [{
            'image': {
                'content': img_content.decode('utf-8')
            },
            'features': [{
                'type': 'DOCUMENT_TEXT_DETECTION'
            }]
        }]
    })
    # リクエスト発行
    res = requests.post(api_url, data=req_body)
    # リクエストから画像情報取得
    res_json = res.json()['responses']
    if 0 < len(res_json[0]):
        textobj = res_json[0]["textAnnotations"][0]
        message = "".join(textobj["description"].strip().split("\n"))
    return message

async def createFooterTelop(src):
    telop = np.zeros((telop_height, cap_width, 3), np.uint8)
    telop[:] = tuple((128,128,128))
    images = [src, telop]
    dst = np.concatenate(images, axis=0)
    return dst

async def main_image_process(src):
    #文字認識しやすいように加工
    gray_frame = pre_process(src.content)
    #テロップが出そうなところだけトリミング
    roi = gray_frame[435:600, :]
    #テキストを抽出
    text = await extractTelopText(roi)
    await asyncio.sleep(2)

    dst = await createFooterTelop(src.content)
    dst = await addJapaneseTelop(dst, text, 20, cap_height + telop_height - 30)
    dst = await addASCIITelop(dst, str(src.timestamp) + "[sec]", cap_width - 250, cap_height + telop_height - 10, color=(0,255,0))
    return MovieFrame(src.id, dst, src.timestamp)


async def addASCIITelop(src, sentence, px, py, color=(8,8,8), fsize=28):
    cv2.putText(src, sentence, 
                        (px, py), 
                        cv2.FONT_HERSHEY_SIMPLEX, 
                        1, 
                        color, 
                        2, 
                        cv2.LINE_AA)    
    return src


async def addJapaneseTelop(src, sentence, px, py, color=(8,8,8), fsize=28):
    rgbImg = cv2.cvtColor(src, cv2.COLOR_BGR2RGB)
    canvas = Image.fromarray(rgbImg).copy()
    draw = ImageDraw.Draw(canvas)
    font = ImageFont.truetype("./IPAfont00303/ipag.ttf", fsize)
    draw.text((px, py), sentence, fill=color, font=font)

    dst = cv2.cvtColor(np.array(canvas, dtype=np.uint8), cv2.COLOR_RGB2BGR)
    return dst

if __name__ == '__main__':

    cap = cv2.VideoCapture('one_minutes.mp4')
    cap_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    cap_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)

    telop_height = 50

    fourcc = cv2.VideoWriter_fourcc('m','p','4','v')
    writer = cv2.VideoWriter('extract_telop_async.mp4',fourcc, fps, (cap_width, cap_height + telop_height))

    frames = []

    start = time.time()
    idx = 0
    #read frame
    try :
        while True:
            if not cap.isOpened():
                break
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
            ret, frame = cap.read()
            if frame is None:
                break

            frames.append(MovieFrame(idx,frame, round(idx/fps, 4)) )
            idx += 1
    except cv2.error as e:
        print(e)

    cap.release()
    print("read movie file")

    #process
    r = []
    loop = asyncio.get_event_loop()
    try:
        r = loop.run_until_complete(
            asyncio.gather(*[main_image_process(f) for f in frames])
        )
    finally:
        loop.close()

    #sort
    sorted_out = sorted(r, key=lambda x: x.id)

    #write frame
    try :
        for item in sorted_out:
            writer.write(item.content)
    except cv2.error as e:
        print(e)
    writer.release()

    print("write movie file")
    print("Done!!! {}[sec]".format(round(time.time() - start,4)))

Result

tesseract-ocr


処理時間: 450sec ≒ 約7.5分

GoogleCloudVisionAPI

処理時間:1315sec ≒ 21分

外部APIを使いつつ非同期処理をかけているので処理時間が結構かかりましたが、OCRの精度としてはGoogleCloudVisionAPIの方が良いのがわかると思います。

Future

次回は画像修復(inpaint)を使ってオブジェクト消去したった(OpenCV:C++)を動画編集しようかなと思っているのですが、今回使ったコードをもとにC++にしようかなと思っています。
となると、
- curlの使い方
- thread(そもそもあるのかasyncioと同等のライブラリがあるのか?)
を処理するにはどうすればいいか考えないといかんとです。
boostを使うしかないか。:tired_face:

Reference

5
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
3