はじめに
前回の続編です。
前回の最終的なコードですと、
ダウンロードファイルの拡張子のリネームが完了しない可能性がありますので、
Chrome108?で修正されたのかもしれません。
ダウンロードに関する情報の全てのfilenameと、
ダウンロードファイルを比較してリネームが完了しているかを確認します。
併せて、
Chrome拡張機能側で判定処理するのではなく、
Python側に持ってきて判定するようにします。
PythonとSeleniumとWebDriverでDownload完了判定とWebDriverWait().until()による待機
前回の最終的なコードを変更して、
Python側で判定処理を行なうようにします。
該当サイトの<body>
のカスタムデータ属性に、
content-script.jsから書き込む際、
情報をJSONにして書き込みすることで
情報のJavascript⇒Pythonの移動に対処しています。
console.log('Hello, I am background scripts');
const getWindowDownloadsState = (items) => {
let state;
if (items?.length) {
const l1 = items;
const l2 = l1.filter((e) => e);
const l3 = l2.map((e) => e.state);
const stateInProgress = l3.some((e) => e === 'in_progress');
const stateInterrupted = l3.some((e) => e === 'interrupted');
const stateComplete = l3.every((e) => e === 'complete');
state = stateInProgress ? 'in_progress' : state;
state = stateInterrupted ? 'interrupted' : state;
state = stateComplete ? 'complete' : state;
state = state || 'unknown';
// console.log(l1);
// console.log(l2);
// console.log(l3);
// console.log(stateInProgress);
// console.log(stateInterrupted);
// console.log(stateComplete);
// console.log(state);
} else {
state = 'none';
}
return state;
};
const sendMessage = async (key, message) => {
const tabs = await chrome.tabs.query({ url: ['http://*/*', 'https://*/*'] });
tabs.forEach((tab) => {
chrome.tabs.sendMessage(tab.id, { [key]: message });
});
};
chrome.downloads.onCreated.addListener(async (item) => {
const items = await chrome.downloads.search({});
const state = getWindowDownloadsState(items);
console.log(`onCreated: ${state}`);
// console.log(items);
sendMessage('dataWindowDownloadsState', state);
+ sendMessage('dataWindowDownloadsItems', items);
await chrome.storage.session.set({ dataWindowDownloadsState: state });
+ await chrome.storage.session.set({ dataWindowDownloadsItems: items });
});
chrome.downloads.onChanged.addListener(async (delta) => {
const items = await chrome.downloads.search({});
const state = getWindowDownloadsState(items);
console.log(`onChanged: ${state}`);
// console.log(items);
sendMessage('dataWindowDownloadsState', state);
+ sendMessage('dataWindowDownloadsItems', items);
await chrome.storage.session.set({ dataWindowDownloadsState: state });
+ await chrome.storage.session.set({ dataWindowDownloadsItems: items });
});
chrome.tabs.onUpdated.addListener(async (tabId, changeInfo, tab) => {
if (changeInfo.status === 'complete') {
const items = await chrome.downloads.search({});
const state = getWindowDownloadsState(items);
console.log(`onUpdated: ${state}`);
// console.log(items);
sendMessage('dataWindowDownloadsState', state);
+ sendMessage('dataWindowDownloadsItems', items);
await chrome.storage.session.set({ dataWindowDownloadsState: state });
+ await chrome.storage.session.set({ dataWindowDownloadsItems: items });
}
});
console.log('Hello, I am content scripts');
chrome.storage.onChanged.addListener((objects, strings) => {
+ if (objects?.dataWindowDownloadsItems?.newValue) {
+ console.log('storage.onChanged:');
+ console.log(objects.dataWindowDownloadsItems.newValue);
+ // document.body.setAttribute('data-window-downloads-items', JSON.stringify(objects.dataWindowDownloadsItems.newValue));
+ }
if (objects?.dataWindowDownloadsState?.newValue) {
console.log(`storage.onChanged: ${objects.dataWindowDownloadsState.newValue}`);
// document.body.setAttribute('data-window-downloads-state', objects.dataWindowDownloadsState.newValue);
}
});
chrome.runtime.onMessage.addListener((request, sender, sendResponse) => {
+ if (request?.dataWindowDownloadsItems) {
+ console.log('runtime.onMessage:');
+ console.log(request.dataWindowDownloadsItems);
+ document.body.setAttribute('data-window-downloads-items', JSON.stringify(request.dataWindowDownloadsItems));
+ }
if (request?.dataWindowDownloadsState) {
console.log(`runtime.onMessage: ${request.dataWindowDownloadsState}`);
document.body.setAttribute('data-window-downloads-state', request.dataWindowDownloadsState);
}
});
{
"manifest_version": 3,
"name": "DownloadsState",
"version": "1.0",
"background": {
"service_worker": "background.js"
},
"content_scripts": [
{
"matches": ["<all_urls>"],
"js": ["content-script.js"]
}
],
"permissions": [
"downloads",
"storage",
"tabs"
]
}
import datetime
+ import json
import os
import time
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions
from selenium.webdriver.support.wait import WebDriverWait
def expected_conditions_download_ready_state_complete():
def _predicate(driver):
jikan = f"{f'{datetime.datetime.now()}'[:-4]}"
state = driver.execute_script("return document.body.getAttribute('data-window-downloads-state')")
_ = state == "in_progress" and print(f"{jikan}: download in progress...")
_ = state == "interrupted" and print(f"{jikan}: download interrupted...")
_ = state == "complete" and print(f"{jikan}: download complete!!")
_ = state == "unknown" and print(f"{jikan}: download unknown...")
_ = state == "none" and print(f"{jikan}: download none...")
_ = state is None and print(f"{jikan}: error...")
return bool(state == "complete")
return _predicate
+def expected_conditions_download_ready_items_complete():
+ def _predicate(driver):
+ jikan = f'{f"{datetime.datetime.now()}"[:-4]}'
+ state = None
+ data = driver.execute_script("return document.body.getAttribute('data-window-downloads-items')")
+ list = json.loads(data) if data is not None else None
+ if list:
+ l1 = list
+ l2 = [x for x in l1 if x]
+ l3 = [x["state"] for x in l2]
+ # state1_in_progress = [x for x in l3 if "in_progress" in x]
+ # state1_interrupted = [x for x in l3 if "interrupted" in x]
+ # state1_complete = [x for x in l3 if "complete" in x]
+ # state2_in_progress = ["in_progress" in x for x in l3]
+ # state2_interrupted = ["interrupted" in x for x in l3]
+ # state2_complete = ["complete" in x for x in l3]
+ state_in_progress = any("in_progress" in x for x in l3)
+ state_interrupted = any("interrupted" in x for x in l3)
+ state_complete = all("complete" in x for x in l3)
+ state = "in_progress" if state_in_progress else state
+ state = "interrupted" if state_interrupted else state
+ state = "complete" if state_complete else state
+ state = state or "unknown"
+ # print(l1)
+ # print(l2)
+ # print(l3)
+ # print(state1_in_progress)
+ # print(state1_interrupted)
+ # print(state1_complete)
+ # print(state2_in_progress)
+ # print(state2_interrupted)
+ # print(state2_complete)
+ # print(state_in_progress)
+ # print(state_interrupted)
+ # print(state_complete)
+ # print(state)
+ else:
+ state = "none"
+ _ = state == "in_progress" and print(f"{jikan}: download in progress...")
+ _ = state == "interrupted" and print(f"{jikan}: download interrupted...")
+ _ = state == "complete" and print(f"{jikan}: download complete!!")
+ _ = state == "unknown" and print(f"{jikan}: download unknown...")
+ _ = state == "none" and print(f"{jikan}: download none...")
+ _ = state is None and print(f"{jikan}: error...")
+ return bool(state == "complete")
+
+ return _predicate
def main():
print("PRESS ENTER KEY TO BOOT")
input()
driver = None
try:
# extension_location = os.getcwd()
extension_location = f"{os.getenv('USERPROFILE')}\\Desktop\\window.downloads.state"
options = webdriver.ChromeOptions()
options.add_argument(f"load-extension={extension_location}")
driver = None
driver = webdriver.Chrome(options=options)
driver.get("https://chromedriver.chromium.org/downloads")
WebDriverWait(driver, 10).until(expected_conditions.element_to_be_clickable((By.XPATH, "//a[contains(@href,'chromedriver.storage.googleapis.com')]")))
driver.execute_script("""document.querySelector("a[href*='chromedriver.storage.googleapis.com']").setAttribute("target","_self")""")
driver.find_element(by=By.XPATH, value="//a[contains(@href,'chromedriver.storage.googleapis.com')]").click()
WebDriverWait(driver, 10).until(expected_conditions.element_to_be_clickable((By.XPATH, "//a[contains(@href,'chromedriver_linux64.zip')]")))
WebDriverWait(driver, 10).until(expected_conditions.element_to_be_clickable((By.XPATH, "//a[contains(@href,'chromedriver_mac64.zip')]")))
WebDriverWait(driver, 10).until(expected_conditions.element_to_be_clickable((By.XPATH, "//a[contains(@href,'chromedriver_win32.zip')]")))
driver.find_element(by=By.XPATH, value="//a[contains(@href,'chromedriver_linux64.zip')]").click()
driver.find_element(by=By.XPATH, value="//a[contains(@href,'chromedriver_mac64.zip')]").click()
driver.find_element(by=By.XPATH, value="//a[contains(@href,'chromedriver_win32.zip')]").click()
WebDriverWait(driver, 10).until(expected_conditions_download_ready_state_complete())
+ WebDriverWait(driver, 10).until(expected_conditions_download_ready_items_complete())
time.sleep(1)
except Exception as e:
print(e)
finally:
driver = driver.quit() if driver is not None else None
print()
print("PRESS ENTER KEY TO EXIT")
input()
if __name__ == "__main__":
main()
PythonとSeleniumとWebDriverでリネーム完了判定とWebDriverWait().until()による待機
ダウンロードに関する情報の全てのfilenameが、
ダウンロードのフォルダに全てあれば完了、
無ければ未完了とする処理を追加します。
import datetime
+ import glob
import json
import os
- import time
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions
from selenium.webdriver.support.wait import WebDriverWait
def expected_conditions_download_ready_state_complete():
def _predicate(driver):
jikan = f"{f'{datetime.datetime.now()}'[:-4]}"
state = driver.execute_script("return document.body.getAttribute('data-window-downloads-state')")
_ = state == "in_progress" and print(f"{jikan}: download in progress...")
_ = state == "interrupted" and print(f"{jikan}: download interrupted...")
_ = state == "complete" and print(f"{jikan}: download complete!!")
_ = state == "unknown" and print(f"{jikan}: download unknown...")
_ = state == "none" and print(f"{jikan}: download none...")
_ = state is None and print(f"{jikan}: error...")
return bool(state == "complete")
return _predicate
def expected_conditions_download_ready_items_complete():
def _predicate(driver):
jikan = f'{f"{datetime.datetime.now()}"[:-4]}'
state = None
data = driver.execute_script("return document.body.getAttribute('data-window-downloads-items')")
list = json.loads(data) if data is not None else None
if list:
l1 = list
l2 = [x for x in l1 if x]
l3 = [x["state"] for x in l2]
# state1_in_progress = [x for x in l3 if "in_progress" in x]
# state1_interrupted = [x for x in l3 if "interrupted" in x]
# state1_complete = [x for x in l3 if "complete" in x]
# state2_in_progress = ["in_progress" in x for x in l3]
# state2_interrupted = ["interrupted" in x for x in l3]
# state2_complete = ["complete" in x for x in l3]
state_in_progress = any("in_progress" in x for x in l3)
state_interrupted = any("interrupted" in x for x in l3)
state_complete = all("complete" in x for x in l3)
state = "in_progress" if state_in_progress else state
state = "interrupted" if state_interrupted else state
state = "complete" if state_complete else state
state = state or "unknown"
# print(l1)
# print(l2)
# print(l3)
# print(state1_in_progress)
# print(state1_interrupted)
# print(state1_complete)
# print(state2_in_progress)
# print(state2_interrupted)
# print(state2_complete)
# print(state_in_progress)
# print(state_interrupted)
# print(state_complete)
# print(state)
else:
state = "none"
_ = state == "in_progress" and print(f"{jikan}: download in progress...")
_ = state == "interrupted" and print(f"{jikan}: download interrupted...")
_ = state == "complete" and print(f"{jikan}: download complete!!")
_ = state == "unknown" and print(f"{jikan}: download unknown...")
_ = state == "none" and print(f"{jikan}: download none...")
_ = state is None and print(f"{jikan}: error...")
return bool(state == "complete")
return _predicate
+def expected_conditions_download_file_rename_complete():
+ def _predicate(driver):
+ basyo = f"{os.getenv('USERPROFILE')}\\Downloads"
+ jikan = f'{f"{datetime.datetime.now()}"[:-4]}'
+ state = None
+ data = driver.execute_script("return document.body.getAttribute('data-window-downloads-items')")
+ list = json.loads(data) if data is not None else None
+ if list:
+ l1 = list
+ l2 = [x for x in l1 if x]
+ l3 = [x["filename"] for x in l2]
+ dl = glob.glob(f"{basyo}\\*")
+ state = set(l3) <= set(dl)
+ # print(l1)
+ # print(l2)
+ # print(l3)
+ # print(dl)
+ # print(state)
+ else:
+ state = False
+ _ = state or print(f"{jikan}: file rename in progress...")
+ _ = state and print(f"{jikan}: file rename complete!!")
+ return bool(state)
+
+ return _predicate
def main():
print("PRESS ENTER KEY TO BOOT")
input()
driver = None
try:
# extension_location = os.getcwd()
extension_location = f"{os.getenv('USERPROFILE')}\\Desktop\\window.downloads.state"
options = webdriver.ChromeOptions()
options.add_argument(f"load-extension={extension_location}")
driver = None
driver = webdriver.Chrome(options=options)
driver.get("https://chromedriver.chromium.org/downloads")
WebDriverWait(driver, 10).until(expected_conditions.element_to_be_clickable((By.XPATH, "//a[contains(@href,'chromedriver.storage.googleapis.com')]")))
driver.execute_script("""document.querySelector("a[href*='chromedriver.storage.googleapis.com']").setAttribute("target","_self")""")
driver.find_element(by=By.XPATH, value="//a[contains(@href,'chromedriver.storage.googleapis.com')]").click()
WebDriverWait(driver, 10).until(expected_conditions.element_to_be_clickable((By.XPATH, "//a[contains(@href,'chromedriver_linux64.zip')]")))
WebDriverWait(driver, 10).until(expected_conditions.element_to_be_clickable((By.XPATH, "//a[contains(@href,'chromedriver_mac64.zip')]")))
WebDriverWait(driver, 10).until(expected_conditions.element_to_be_clickable((By.XPATH, "//a[contains(@href,'chromedriver_win32.zip')]")))
driver.find_element(by=By.XPATH, value="//a[contains(@href,'chromedriver_linux64.zip')]").click()
driver.find_element(by=By.XPATH, value="//a[contains(@href,'chromedriver_mac64.zip')]").click()
driver.find_element(by=By.XPATH, value="//a[contains(@href,'chromedriver_win32.zip')]").click()
WebDriverWait(driver, 10).until(expected_conditions_download_ready_state_complete())
WebDriverWait(driver, 10).until(expected_conditions_download_ready_items_complete())
+ WebDriverWait(driver, 10).until(expected_conditions_download_file_rename_complete())
- time.sleep(1)
except Exception as e:
print(e)
finally:
driver = driver.quit() if driver is not None else None
print()
print("PRESS ENTER KEY TO EXIT")
input()
if __name__ == "__main__":
main()
おわりに
最終的なコードはこちらを参照くださいませ。
あとがき
ノンプログラマーの素人が記述をしたコードです。
狭い利用範囲と少ない利用頻度での確認ですので、
記載内容に間違いや勘違いがあるかもしれません。
上記内容を参照の際は自己責任でお願い致します。