2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

PythonAdvent Calendar 2022

Day 11

PythonとSeleniumとWebDriverとChrome拡張機能でDownload完了判定と待機(追加)

Last updated at Posted at 2022-12-10

はじめに

前回の続編です。

前回の最終的なコードですと、
ダウンロードファイルの拡張子のリネームが完了しない可能性がありますので、

Chrome108?で修正されたのかもしれません。

ダウンロードに関する情報の全てのfilenameと、
ダウンロードファイルを比較してリネームが完了しているかを確認します。

併せて、
Chrome拡張機能側で判定処理するのではなく、
Python側に持ってきて判定するようにします。

PythonとSeleniumとWebDriverでDownload完了判定とWebDriverWait().until()による待機

前回の最終的なコードを変更して、
Python側で判定処理を行なうようにします。

該当サイトの<body>のカスタムデータ属性に、
content-script.jsから書き込む際、
情報をJSONにして書き込みすることで
情報のJavascript⇒Pythonの移動に対処しています。

background.js
console.log('Hello, I am background scripts');

const getWindowDownloadsState = (items) => {
  let state;
  if (items?.length) {
    const l1 = items;
    const l2 = l1.filter((e) => e);
    const l3 = l2.map((e) => e.state);
    const stateInProgress = l3.some((e) => e === 'in_progress');
    const stateInterrupted = l3.some((e) => e === 'interrupted');
    const stateComplete = l3.every((e) => e === 'complete');
    state = stateInProgress ? 'in_progress' : state;
    state = stateInterrupted ? 'interrupted' : state;
    state = stateComplete ? 'complete' : state;
    state = state || 'unknown';
    // console.log(l1);
    // console.log(l2);
    // console.log(l3);
    // console.log(stateInProgress);
    // console.log(stateInterrupted);
    // console.log(stateComplete);
    // console.log(state);
  } else {
    state = 'none';
  }
  return state;
};

const sendMessage = async (key, message) => {
  const tabs = await chrome.tabs.query({ url: ['http://*/*', 'https://*/*'] });
  tabs.forEach((tab) => {
    chrome.tabs.sendMessage(tab.id, { [key]: message });
  });
};

chrome.downloads.onCreated.addListener(async (item) => {
  const items = await chrome.downloads.search({});
  const state = getWindowDownloadsState(items);
  console.log(`onCreated: ${state}`);
  // console.log(items);
  sendMessage('dataWindowDownloadsState', state);
+ sendMessage('dataWindowDownloadsItems', items);
  await chrome.storage.session.set({ dataWindowDownloadsState: state });
+ await chrome.storage.session.set({ dataWindowDownloadsItems: items });
});
chrome.downloads.onChanged.addListener(async (delta) => {
  const items = await chrome.downloads.search({});
  const state = getWindowDownloadsState(items);
  console.log(`onChanged: ${state}`);
  // console.log(items);
  sendMessage('dataWindowDownloadsState', state);
+ sendMessage('dataWindowDownloadsItems', items);
  await chrome.storage.session.set({ dataWindowDownloadsState: state });
+ await chrome.storage.session.set({ dataWindowDownloadsItems: items });
});

chrome.tabs.onUpdated.addListener(async (tabId, changeInfo, tab) => {
  if (changeInfo.status === 'complete') {
    const items = await chrome.downloads.search({});
    const state = getWindowDownloadsState(items);
    console.log(`onUpdated: ${state}`);
    // console.log(items);
    sendMessage('dataWindowDownloadsState', state);
+   sendMessage('dataWindowDownloadsItems', items);
    await chrome.storage.session.set({ dataWindowDownloadsState: state });
+   await chrome.storage.session.set({ dataWindowDownloadsItems: items });
  }
});
content-script.js
console.log('Hello, I am content scripts');

chrome.storage.onChanged.addListener((objects, strings) => {
+ if (objects?.dataWindowDownloadsItems?.newValue) {
+   console.log('storage.onChanged:');
+   console.log(objects.dataWindowDownloadsItems.newValue);
+   // document.body.setAttribute('data-window-downloads-items', JSON.stringify(objects.dataWindowDownloadsItems.newValue));
+ }
  if (objects?.dataWindowDownloadsState?.newValue) {
    console.log(`storage.onChanged: ${objects.dataWindowDownloadsState.newValue}`);
    // document.body.setAttribute('data-window-downloads-state', objects.dataWindowDownloadsState.newValue);
  }
});

chrome.runtime.onMessage.addListener((request, sender, sendResponse) => {
+ if (request?.dataWindowDownloadsItems) {
+   console.log('runtime.onMessage:');
+   console.log(request.dataWindowDownloadsItems);
+   document.body.setAttribute('data-window-downloads-items', JSON.stringify(request.dataWindowDownloadsItems));
+ }
  if (request?.dataWindowDownloadsState) {
    console.log(`runtime.onMessage: ${request.dataWindowDownloadsState}`);
    document.body.setAttribute('data-window-downloads-state', request.dataWindowDownloadsState);
  }
});
manifest.json
{
  "manifest_version": 3,
  "name": "DownloadsState",
  "version": "1.0",
  "background": {
    "service_worker": "background.js"
  },
  "content_scripts": [
    {
      "matches": ["<all_urls>"],
      "js": ["content-script.js"]
    }
  ],
  "permissions": [
    "downloads",
    "storage",
    "tabs"
  ]
}
sample.py
import datetime
+ import json
import os
import time

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions
from selenium.webdriver.support.wait import WebDriverWait


def expected_conditions_download_ready_state_complete():
    def _predicate(driver):
        jikan = f"{f'{datetime.datetime.now()}'[:-4]}"
        state = driver.execute_script("return document.body.getAttribute('data-window-downloads-state')")
        _ = state == "in_progress" and print(f"{jikan}: download in progress...")
        _ = state == "interrupted" and print(f"{jikan}: download interrupted...")
        _ = state == "complete" and print(f"{jikan}: download complete!!")
        _ = state == "unknown" and print(f"{jikan}: download unknown...")
        _ = state == "none" and print(f"{jikan}: download none...")
        _ = state is None and print(f"{jikan}: error...")
        return bool(state == "complete")

    return _predicate


+def expected_conditions_download_ready_items_complete():
+    def _predicate(driver):
+        jikan = f'{f"{datetime.datetime.now()}"[:-4]}'
+        state = None
+        data = driver.execute_script("return document.body.getAttribute('data-window-downloads-items')")
+        list = json.loads(data) if data is not None else None
+        if list:
+            l1 = list
+            l2 = [x for x in l1 if x]
+            l3 = [x["state"] for x in l2]
+            # state1_in_progress = [x for x in l3 if "in_progress" in x]
+            # state1_interrupted = [x for x in l3 if "interrupted" in x]
+            # state1_complete = [x for x in l3 if "complete" in x]
+            # state2_in_progress = ["in_progress" in x for x in l3]
+            # state2_interrupted = ["interrupted" in x for x in l3]
+            # state2_complete = ["complete" in x for x in l3]
+            state_in_progress = any("in_progress" in x for x in l3)
+            state_interrupted = any("interrupted" in x for x in l3)
+            state_complete = all("complete" in x for x in l3)
+            state = "in_progress" if state_in_progress else state
+            state = "interrupted" if state_interrupted else state
+            state = "complete" if state_complete else state
+            state = state or "unknown"
+            # print(l1)
+            # print(l2)
+            # print(l3)
+            # print(state1_in_progress)
+            # print(state1_interrupted)
+            # print(state1_complete)
+            # print(state2_in_progress)
+            # print(state2_interrupted)
+            # print(state2_complete)
+            # print(state_in_progress)
+            # print(state_interrupted)
+            # print(state_complete)
+            # print(state)
+        else:
+            state = "none"
+        _ = state == "in_progress" and print(f"{jikan}: download in progress...")
+        _ = state == "interrupted" and print(f"{jikan}: download interrupted...")
+        _ = state == "complete" and print(f"{jikan}: download complete!!")
+        _ = state == "unknown" and print(f"{jikan}: download unknown...")
+        _ = state == "none" and print(f"{jikan}: download none...")
+        _ = state is None and print(f"{jikan}: error...")
+        return bool(state == "complete")
+
+    return _predicate


def main():

    print("PRESS ENTER KEY TO BOOT")
    input()

    driver = None

    try:
        # extension_location = os.getcwd()
        extension_location = f"{os.getenv('USERPROFILE')}\\Desktop\\window.downloads.state"
        options = webdriver.ChromeOptions()
        options.add_argument(f"load-extension={extension_location}")
        driver = None
        driver = webdriver.Chrome(options=options)
        driver.get("https://chromedriver.chromium.org/downloads")
        WebDriverWait(driver, 10).until(expected_conditions.element_to_be_clickable((By.XPATH, "//a[contains(@href,'chromedriver.storage.googleapis.com')]")))
        driver.execute_script("""document.querySelector("a[href*='chromedriver.storage.googleapis.com']").setAttribute("target","_self")""")
        driver.find_element(by=By.XPATH, value="//a[contains(@href,'chromedriver.storage.googleapis.com')]").click()
        WebDriverWait(driver, 10).until(expected_conditions.element_to_be_clickable((By.XPATH, "//a[contains(@href,'chromedriver_linux64.zip')]")))
        WebDriverWait(driver, 10).until(expected_conditions.element_to_be_clickable((By.XPATH, "//a[contains(@href,'chromedriver_mac64.zip')]")))
        WebDriverWait(driver, 10).until(expected_conditions.element_to_be_clickable((By.XPATH, "//a[contains(@href,'chromedriver_win32.zip')]")))
        driver.find_element(by=By.XPATH, value="//a[contains(@href,'chromedriver_linux64.zip')]").click()
        driver.find_element(by=By.XPATH, value="//a[contains(@href,'chromedriver_mac64.zip')]").click()
        driver.find_element(by=By.XPATH, value="//a[contains(@href,'chromedriver_win32.zip')]").click()
        WebDriverWait(driver, 10).until(expected_conditions_download_ready_state_complete())
+       WebDriverWait(driver, 10).until(expected_conditions_download_ready_items_complete())
        time.sleep(1)
    except Exception as e:
        print(e)
    finally:
        driver = driver.quit() if driver is not None else None

    print()
    print("PRESS ENTER KEY TO EXIT")
    input()


if __name__ == "__main__":
    main()

PythonとSeleniumとWebDriverでリネーム完了判定とWebDriverWait().until()による待機

ダウンロードに関する情報の全てのfilenameが、
ダウンロードのフォルダに全てあれば完了、
無ければ未完了とする処理を追加します。

sample.py
import datetime
+ import glob
import json
import os
- import time

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions
from selenium.webdriver.support.wait import WebDriverWait


def expected_conditions_download_ready_state_complete():
    def _predicate(driver):
        jikan = f"{f'{datetime.datetime.now()}'[:-4]}"
        state = driver.execute_script("return document.body.getAttribute('data-window-downloads-state')")
        _ = state == "in_progress" and print(f"{jikan}: download in progress...")
        _ = state == "interrupted" and print(f"{jikan}: download interrupted...")
        _ = state == "complete" and print(f"{jikan}: download complete!!")
        _ = state == "unknown" and print(f"{jikan}: download unknown...")
        _ = state == "none" and print(f"{jikan}: download none...")
        _ = state is None and print(f"{jikan}: error...")
        return bool(state == "complete")

    return _predicate


def expected_conditions_download_ready_items_complete():
    def _predicate(driver):
        jikan = f'{f"{datetime.datetime.now()}"[:-4]}'
        state = None
        data = driver.execute_script("return document.body.getAttribute('data-window-downloads-items')")
        list = json.loads(data) if data is not None else None
        if list:
            l1 = list
            l2 = [x for x in l1 if x]
            l3 = [x["state"] for x in l2]
            # state1_in_progress = [x for x in l3 if "in_progress" in x]
            # state1_interrupted = [x for x in l3 if "interrupted" in x]
            # state1_complete = [x for x in l3 if "complete" in x]
            # state2_in_progress = ["in_progress" in x for x in l3]
            # state2_interrupted = ["interrupted" in x for x in l3]
            # state2_complete = ["complete" in x for x in l3]
            state_in_progress = any("in_progress" in x for x in l3)
            state_interrupted = any("interrupted" in x for x in l3)
            state_complete = all("complete" in x for x in l3)
            state = "in_progress" if state_in_progress else state
            state = "interrupted" if state_interrupted else state
            state = "complete" if state_complete else state
            state = state or "unknown"
            # print(l1)
            # print(l2)
            # print(l3)
            # print(state1_in_progress)
            # print(state1_interrupted)
            # print(state1_complete)
            # print(state2_in_progress)
            # print(state2_interrupted)
            # print(state2_complete)
            # print(state_in_progress)
            # print(state_interrupted)
            # print(state_complete)
            # print(state)
        else:
            state = "none"
        _ = state == "in_progress" and print(f"{jikan}: download in progress...")
        _ = state == "interrupted" and print(f"{jikan}: download interrupted...")
        _ = state == "complete" and print(f"{jikan}: download complete!!")
        _ = state == "unknown" and print(f"{jikan}: download unknown...")
        _ = state == "none" and print(f"{jikan}: download none...")
        _ = state is None and print(f"{jikan}: error...")
        return bool(state == "complete")

    return _predicate


+def expected_conditions_download_file_rename_complete():
+    def _predicate(driver):
+        basyo = f"{os.getenv('USERPROFILE')}\\Downloads"
+        jikan = f'{f"{datetime.datetime.now()}"[:-4]}'
+        state = None
+        data = driver.execute_script("return document.body.getAttribute('data-window-downloads-items')")
+        list = json.loads(data) if data is not None else None
+        if list:
+            l1 = list
+            l2 = [x for x in l1 if x]
+            l3 = [x["filename"] for x in l2]
+            dl = glob.glob(f"{basyo}\\*")
+            state = set(l3) <= set(dl)
+            # print(l1)
+            # print(l2)
+            # print(l3)
+            # print(dl)
+            # print(state)
+        else:
+            state = False
+        _ = state or print(f"{jikan}: file rename in progress...")
+        _ = state and print(f"{jikan}: file rename complete!!")
+        return bool(state)
+
+    return _predicate


def main():

    print("PRESS ENTER KEY TO BOOT")
    input()

    driver = None

    try:
        # extension_location = os.getcwd()
        extension_location = f"{os.getenv('USERPROFILE')}\\Desktop\\window.downloads.state"
        options = webdriver.ChromeOptions()
        options.add_argument(f"load-extension={extension_location}")
        driver = None
        driver = webdriver.Chrome(options=options)
        driver.get("https://chromedriver.chromium.org/downloads")
        WebDriverWait(driver, 10).until(expected_conditions.element_to_be_clickable((By.XPATH, "//a[contains(@href,'chromedriver.storage.googleapis.com')]")))
        driver.execute_script("""document.querySelector("a[href*='chromedriver.storage.googleapis.com']").setAttribute("target","_self")""")
        driver.find_element(by=By.XPATH, value="//a[contains(@href,'chromedriver.storage.googleapis.com')]").click()
        WebDriverWait(driver, 10).until(expected_conditions.element_to_be_clickable((By.XPATH, "//a[contains(@href,'chromedriver_linux64.zip')]")))
        WebDriverWait(driver, 10).until(expected_conditions.element_to_be_clickable((By.XPATH, "//a[contains(@href,'chromedriver_mac64.zip')]")))
        WebDriverWait(driver, 10).until(expected_conditions.element_to_be_clickable((By.XPATH, "//a[contains(@href,'chromedriver_win32.zip')]")))
        driver.find_element(by=By.XPATH, value="//a[contains(@href,'chromedriver_linux64.zip')]").click()
        driver.find_element(by=By.XPATH, value="//a[contains(@href,'chromedriver_mac64.zip')]").click()
        driver.find_element(by=By.XPATH, value="//a[contains(@href,'chromedriver_win32.zip')]").click()
        WebDriverWait(driver, 10).until(expected_conditions_download_ready_state_complete())
        WebDriverWait(driver, 10).until(expected_conditions_download_ready_items_complete())
+       WebDriverWait(driver, 10).until(expected_conditions_download_file_rename_complete())
-       time.sleep(1)
    except Exception as e:
        print(e)
    finally:
        driver = driver.quit() if driver is not None else None

    print()
    print("PRESS ENTER KEY TO EXIT")
    input()


if __name__ == "__main__":
    main()

おわりに

最終的なコードはこちらを参照くださいませ。

あとがき

ノンプログラマーの素人が記述をしたコードです。
狭い利用範囲と少ない利用頻度での確認ですので、
記載内容に間違いや勘違いがあるかもしれません。
上記内容を参照の際は自己責任でお願い致します。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?