LoginSignup
0
0

【AWS Lambda関数作成】Lambda関数の開発とデバッグ

Last updated at Posted at 2023-12-15

はじめに

【AWS Lambda関数作成】Lambda関数の開発に関しての備忘録です。
初心者です😅
間違えてる部分が多々あると思います。
もし見つけた場合、ツッコミいただけると助かります🙇

🦁結論🦁

作成手順

  1. 関数の作成
  2. エディタにて関数作成(AWS Lambdaに直接でも可能)
  3. Lambdaにアップロード
  4. デバック(テスト)の実施

押さえておくべき点

  • lambdaはサーバーレスコンピューティング
  • サーバーレスコンピューティングはユーザーインターフェイスを持ってないためヘッドレスブラウザが必要になる。
  • loggerを使ってログを残すことを行う
  • headlessのWindowサイズは1200x1000
  • サーバーのデータのやり取りは基本は「json形式」
  • 関数の最後にレスポンスの定義をする。

注意点

  • ほとんどのコードを関数の中に書き込む必要がある(変数定義、空の辞書定義、loggerの初期化、URLなど)
  • headlessのWindowサイズは1200x1000ぐらいにしておく必要がある。
  • ローカルファイル(エディタなど)で作成したファイルをAWS Lambdaへアップロードする際にはzipファイルになる。
  • AWS Lambdaへアップロードする際にはzipファイルは10 MB より大きい場合は、Amazon S3 を使用したアップロードが必要。

前提🙊

  1. AWS Lambdaの作成手順を理解してる。
  2. AWSのアカウント作成できてる。
  3. AWSの環境構築ができてる。
  4. Seleniumに対しての理解がある。
  5. Seleniumのオプション(headlessなど)の理解がある。

Seleniumなどの知識はやりながらでも大丈夫だと思います。
事前に調べておけるとコードの理解に深みが出ると思います。
特に「headless」に関しての知識は先に勉強しておくことをお勧めします。

※スクレイピングするサイトは注意して選定する必要がある。
※スクレイピングするための要素の選定は各自で修正してください。ここでは特定のものを先手してありますがサイトごとに違う
ここでは
検索バー:”keyword”
検索ボタン:”.__button.c-button"
商品名:'h2’タグ
価格:'dd’タグ
これらでスクレイピング。


手順詳細

※前回の「selenium + AWS lambda開発環境の構築」を実施されていれば「1. 関数の作成」は必要ありません。

1. 関数の作成

  1. ローカルにて作成したファイルをzip化する。
  2. AWSの画面左上の検索バーに「 Lambda」と入力して検索
  3. 関数の管理をクリック。
  4. 左上のハンバーガーメニューをクリック。
  5. レイヤーをクリック。
  6. 対象のレイヤーがあることを確認(作成方法がわからない場合は「selenium + AWS lambda開発環境の構築」にて確認)
  7. 左側のメニューから「関数」をクリック。
  8. 作成した関数があるかを確認(作成方法がわからない場合は「selenium + AWS lambda開発環境の構築」にて確認)

2. エディタにて関数作成(AWS Lambdaに直接でも可能)

  1. 下記のコードをご確認ください。

3. Lambdaにアップロード

  1. Lambdaのメイン画面を出す。
  2. 作成した関数をクリック(今回はscraper)
  3. コードをクリック(デフォルトの場合は必要なし)
  4. アップロード元をクリックして「zipファイル」を選択。
  5. アップロードするファイルを選択してアップロードする。
  6. アップロードしたコードが反映してるのか確認。
  7. 「deploy」を押す

4. デバック(テスト)の実施

  1. 「テスト」をクリック。
  2. 下記のコードを「イベントjson」に入力する
    ※jan_codeは任意で入力
{
  "local_jan_code": "4900xxxxx"
}
  1. cloudshellにてログを確認する(ターミナルみるイメージ)
    ※テストの詳細にも表示される。
    レスポンスで200が返ってきてるのかを確認する。

コード全体

AWS_Lambda.py
# scraper.py
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
import logging
import json


def lambda_handler(event, context):
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    logger.info(f"処理スタート")

    url = "https://www.petpochitto.com/"

    # webdriver(Chrome)のオプションを使うことを宣言
    options = webdriver.ChromeOptions()

    # webdriverのどんなオプション使うのかを選定
    options.add_argument("--headless")
    options.add_argument("--disable-gpu")
    options.add_argument("--hide-scrollbars")
    options.add_argument("--single-process")
    options.add_argument("--ignore-certificate-errors")
    options.add_argument("--window-size=1200x1000")  # サイズを大きくすることでsend_keysでの
    options.add_argument("--no-sandbox")
    options.add_argument("--homedir=/tmp")

    # headlessのChromeバージョンを選定
    options.binary_location = "/opt/headless/headless-chromium"

    logger.info(f"ブラウザのパスを指定")

    browser = webdriver.Chrome(
        # Chromeを操作できるようにするためのdriverをAWSにあるパスを選定
        executable_path="/opt/headless/chromedriver",
        options=options
    )

    logger.info(f"ブラウザを開く")

    # URLを開く
    browser.get(url)


    # リクエストのBODYを取得
    request_body = event.get('local_jan_code')

    logger.info("リクエストボディが存在します: %s", request_body)
    logger.info("ページロードを待機しています...")

    # ページが完全にロードされるのを待つ
    WebDriverWait(browser, 3).until(
        lambda browser: browser.execute_script('return document.readyState') == 'complete'
    )

    logger.info("ページが完全にロードされました。")

    if request_body:
        # jsonファイルの読み込みを実施
        # 読み込みができなかったらログを残す。
        try:
            request_data = json.loads(request_body)  # jsonを解析
            request_data_str = str(request_data)
            logger.info(f"解析完了: {request_data_str}")
            logger.info("Opened URL: " + url)

        except json.JSONDecodeError as e:
            logger.error(f"解析エラー:{e}")

        # "keyword"→最初の要素→name属性の値
        # 検索バー(IDがkeyword)が見つからなかったらログを残す
        try:
            elements = browser.find_elements_by_id("keyword")
            if elements:
                element = elements[0] 
                name_attribute = element.get_attribute('name')
                logger.info(f"ID:keywordをサーチ済み")
                logger.info(f"Element tag name: {element.tag_name}")
                logger.info(f"Element name attribute: {name_attribute}")
                logger.info(f"{request_data_str}")

            else:
                logger.info(f"ID:keywordが見つからない")

        except TimeoutException:
            logger.error(f"設定した時間で見つけられない。")

        # テキストボックスに検索ワードを入力
        # キーワードが入力できなかったらログを残す。
        try:
            if element.is_displayed():
                element.send_keys(request_data_str)
                logger.info("キーワード入力に成功")
            else:
                logger.info("要素が表示されていません")

        except Exception as e:
            logger.error(f"キーワード入力失敗: {e}")

        # 検索ボタンを探す
        # クリックができるようになるまで最大6秒、動的待機
        search_buttan = browser.find_element_by_css_selector(".__button.c-button")

        logger.info("検索ボタンを探すことを開始")

        # 検索ボタンをクリック
        search_buttan.click()

        logger.info("検索ボタンをクリック")

        # ddタグを見つける
        # 見つけるまで最大6秒、動的待機
        all_ddtags = WebDriverWait(browser, 3).until(
            EC.presence_of_all_elements_located((By.TAG_NAME, 'dd'))
        )
        logger.info("ddタグリスト作成")

        # 現在のページにある価格を全てリスト化する。
        all_ddtag_list = [dd_tag.text for dd_tag in all_ddtags]
        lambda_price = all_ddtag_list[0]
        logger.info(f"{lambda_price}")

        # h2タグを見つける
        # 見つけるまで最大6秒、動的待機
        all_titles = WebDriverWait(browser, 6).until(
            EC.presence_of_all_elements_located((By.TAG_NAME, 'h2'))
        )
        logger.info("h2タグリスト作成")
        

        # 商品カタログ名
        all_title_list = [title.text for title in all_titles]
        lambda_product_name = all_title_list[0]
        logger.info(f"{lambda_product_name}")

        # jsonオブジェクトの作成
        response_data = {
            "lambda_product_name": lambda_product_name,
            "lambda_price": lambda_price
        }


    else:
        logger.error("リクエストボディがありません。")

    # WebDriverを閉じる
    browser.quit()

    # jsonデータをレスポンスとして返す
    return {
        'statusCode': 200,
        'headers': {
            'Content-Type': 'application/json',
        },
        'body': response_data
    }


コードの押さえどころ

chromedriverのパスの指定

    browser = webdriver.Chrome(
        # Chromeを操作できるようにするためのdriverをAWSにあるパスを選定
        executable_path="/opt/headless/chromedriver",
        options=options
    )

AWS上でのパスの指定は少し特殊。
executable_path="/opt/headless/chromedriver"
** Lambdaレイヤー**に存在してるものを選定してる。


リクエストボディを取得

request_body = event.get('local_jan_code')

jsonにて送られてきたデータの'local_jan_code’の値を取得してる


ページが完全にロードされるのを確認する

    WebDriverWait(browser, 3).until(
        lambda browser: browser.execute_script('return document.readyState') == 'complete'
    )

    logger.info("ページが完全にロードされました。")

サイトの画面が表示されてるのか(DOMなどの解析状況)を3秒待つというコード。
"loading”読み込み中
"interactive”読み込み途中
"complete”読み込み完了
この3つのよって状態を確認することができる。


json型式でレスポンスをする

    return {
        'statusCode': 200,
        'headers': {
            'Content-Type': 'application/json',
        },
        'body': response_data
    }

これはレスポンスをする相手に
statusCode': 200
bodyにJSON型式のデータを送る
これらをレスポンスする



AWS Lambda関数作成 セクション リンク

  1. AWSアカウントの作成
  2. AWSの開発環境の構築
  3. Lambda関数の開発とデバッグ
  4. IAMロールの作成と設定(途中まで作成したら、Step Functionsを設定する必要あり)
  5. AWS Step Functionsの設定とデバッグ(完了後、IAMの権限にStep Functionsを追加)
  6. API Gateway(発火API)の設定とデバッグ
  7. API Gateway(状態維持API)の設定とデバッグ
  8. Python(local環境)の開発とデバック
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0