よく使う
エラーハンドリング
traceback
import traceback
traceback.format_exc()
時間処理
モジュール
import datetime
メイン
dt = datetime.datetime.now()
dt_str = dt.strftime("%Y-%m-%d %H:%M:%S")
dt_minutes = (dt + datetime.timedelta(minutes=-5)).strftime("%Y-%m-%d %H:%M:%S")
dt_hours = (dt + datetime.timedelta(hours=-1)).strftime("%Y-%m-%d %H:%M:%S")
dt_days = (dt + datetime.timedelta(days=-1)).strftime("%Y-%m-%d %H:%M:%S")
dt_years = (dt + datetime.timedelta(years=-1)).strftime("%Y-%m-%d %H:%M:%S")
正規表現
モジュール
import re
メイン
petrn = re.compile(r"(?P<number>[0-9]{1,})部屋").search(tage)
petrn2 = re.compile(r"(?P<number>[0-9]{1,})数").search(tage)
petrn3 = re.compile(r"(?P<number>[0-9]{1,})~").search(tage)
petrn4 = re.compile(r"~(?P<number>[0-9]{1,})").search(tage)
petrn5 = re.compile(r"(?P<number>[0-9]{1,})").search(tage)
re_target = petrn.group("number") if petrn else\
petrn2.group("number") if petrn2 else\
petrn3.group("number") if petrn3 else\
petrn4.group("number") if petrn4 else\
petrn5.group("number") if petrn5 else None
ファイル名操作
# 例:20221008【S】【対象外】【ホットペッパー】
basement = os.path.basename(excel_file)
basement_no_extension = os.path.splitext(basement)[0]
re_target = re.split("[【】]", basement_no_extension)
lst_time, rank, site = re_target[0], re_target[1], re_target[5]
検索
fiter関数
fruits = ["apple", "lemon", "melon", "orange"]
lst = list(filter(lambda x: x.endswith("n"), fruits)) # filter関数
assert lst == ["lemon", "melon"], '配列の値が違う'
find関数
# ジェネレータ式によるfind
elm = next((f for f in fruits if f.endswith("n")), None)
assert elm == "lemon"
vlookup関数
モジュール
import bisect
メイン
/*
id = "探索するキーワード"
name = "見つけたいワード"
*/
def vlookup(df, key_list, id, name):
id_list, name_list = [], []
df_id = df[id].to_list()
df_name = df[name].to_list()
dic = dict(zip(df_id, df_name))
sorted_str_dict = dict(sorted(dic.items()))
dic_key = list(sorted_str_dict.keys())
dic_value = list(sorted_str_dict.values())
for key in key_list:
data = ""
top_point = bisect.bisect_left(dic_key, key)
if int(top_point) < len(dic_key):
if key == dic_key[top_point]:
data = dic_value[top_point]
name_list.append(data)
return name_list
基本操作
text
読み込み
def read_text(path):
f = open(path, 'r', encoding='UTF-8')
data = f.read()
f.close()
data = data.replace("\n","")
return data
json
読み込み
sub_sub_cates_json = json.load(open("******.json", "r",encoding="utf-8"))
出力
jsonfile = '******.json'
jsonfile_open = open(jsonfile,"w",encoding="utf-8")
json.dump('******.json' , jsonfile_open , ensure_ascii=False , indent=4)
jsonfile_open.close()
EXCEL操作
モジュール
import os
import openpyxl
from openpyxl.utils import get_column_letter, column_index_from_string
出力
def adapt_filter(filename):
wb = openpyxl.load_workbook(filename)
ws = wb['Sheet1']
ws.auto_filter.ref = get_column_letter(1) + str(1) + ':' + get_column_letter(ws.max_column) + str(ws.max_row)
wb.save(filename)
OS操作
ファイル探索
全フォルダ
path = "*****"
file_path = [os.path.join(current_dir,sub_dir) for current_dir, sub_dirs, files_list in os.walk(path) for sub_dir in sub_dirs]
全ファイル
path = "******"
file_path = [os.path.join(current_dir,file_name) for current_dir, sub_dirs, files_list in os.walk(path) for file_name in files_list]
相対ファイル
import glob
path = "*******"
excel_files = glob.glob(save_path + "/*.xlsx")
csv_files = glob.glob(save_path + "/*.csv")
時間条件追加
def get_file_path_time(file_path, int_time):
tage_file = []
for file in file_path:
try:
file_po = os.path.basename(file)
text = re.split("[【】]", file_po)
time, rank, pre, gyousyu, syokusyu, site,exe = text[0], text[1], text[3], text[5], text[7], text[9], text[10]
petrn = re.compile(r"\d{1,}").search(time)
time = petrn.group() if petrn else None
if time:
if int(time) < int_time:
tage_file.append(file)
else:
continue
except:
print(file)
return tage_file
file_path_time = get_file_path_time(file_path, 20220630)
テンプレ
path = "**********"
file_path = [os.path.join(current_dir,file_name) for current_dir, sub_dirs, files_list in os.walk(path) for file_name in files_list]
file_path = [file for file in file_path if ".xlsx" in file]
file_path = [file for file in file_path if not os.path.basename(file).endswith("】.xlsx")]
file_path = [file for file in file_path if os.path.basename(file).startswith("2022")]
file_path
ファイルコピー
モジュール
import shutil
メイン
def copy_file(file_path, copy_to_files):
for i,moto_file in enumerate(file_path):
try:
shutil.copyfile(moto_file, copy_to_files[i])
except:
print(moto_file)
copy_to_file = [file.replace("*****","*****") for file in tage_file]
copy_file(file_path, copy_to_files)
ファイル削除
モジュール
import os
メイン
def del_file(file_path):
for file in file_path:
try:
os.remove(file)
except FileNotFoundError:
print("ファイルなし")
del_file(file_path)
フォルダ作成
モジュール
import os, datetime
メイン
def make_dirs(save_path):
dt = datetime.datetime.now()
dt_str = dt.strftime("%Y-%m-%d")
save_dir = f"{save_path}/dt_str"
if not os.path.exists(save_dir):
os.makedirs(save_dir)
return save_dir
save_dir = make_dirs(save_path, exist_ok=True))
スクレイピング
セレニウム
基本の形(プロキシなし)
モジュール
from selenium import webdriver
from selenium.common.exceptions import TimeoutException, NoSuchElementException, WebDriverException, ElementClickInterceptedException,StaleElementReferenceException
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from webdriver_manager.chrome import ChromeDriverManager
from retry import retry
メイン
@retry(exceptions=(WebDriverException), tries = 3, delay = 5)
def url_open(first_url, chrome):
chrome.get(first_url)
time.sleep(int(random.uniform(2,4)))
def append_chromeoption(webdriver):
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--start-maximized")
chrome_options.add_argument("--lang=ja")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument('--ignore-certificate-errors')
chrome_options.add_argument('--ignore-ssl-errors')
chrome_options.add_argument('disable-dev-shm-usage')
chrome_options.add_argument('--blink-settings=imagesEnabled=false')
return chrome_options
url = ""
chrome_options = append_chromeoption(webdriver)
capabilities = DesiredCapabilities.CHROME.copy()
capabilities['acceptInsecureCerts'] = True
chrome = webdriver.Chrome(ChromeDriverManager().install(), chrome_options=chrome_options,desired_capabilities=capabilities)
chrome.implicitly_wait(10)
url_open(url, chrome)
基本の形(プロキシあり)
モジュール
import random
from selenium import webdriver
from selenium.common.exceptions import TimeoutException, NoSuchElementException, WebDriverException, ElementClickInterceptedException,StaleElementReferenceException
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from webdriver_manager.chrome import ChromeDriverManager
メイン
@retry(exceptions=(WebDriverException), tries = 3, delay = 5)
def url_open(first_url, chrome):
chrome.get(first_url)
time.sleep(int(random.uniform(2,4)))
def append_chromeoption(webdriver, pluginfile):
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--start-maximized")
chrome_options.add_argument("--lang=ja")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument('--ignore-certificate-errors')
chrome_options.add_argument('--ignore-ssl-errors')
chrome_options.add_argument('disable-dev-shm-usage')
chrome_options.add_argument('--blink-settings=imagesEnabled=false')
return chrome_options
url = "******"
pluginfile = "*******.zip"
chrome_options = append_chromeoption(webdriver, pluginfile)
capabilities = DesiredCapabilities.CHROME.copy()
capabilities['acceptInsecureCerts'] = True
chrome = webdriver.Chrome(ChromeDriverManager().install(), chrome_options=chrome_options,desired_capabilities=capabilities)
chrome.implicitly_wait(10)
url_open(url, chrome)
モジュール
from selenium import webdriver
from selenium.common.exceptions import TimeoutException, NoSuchElementException, WebDriverException, ElementClickInterceptedException,StaleElementReferenceException
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from webdriver_manager.chrome import ChromeDriverManager
メイン
@retry(exceptions=(WebDriverException), tries = 3, delay = 5)
def url_open(first_url, chrome):
chrome.get(first_url)
time.sleep(int(random.uniform(2,4)))
def append_chromeoption(webdriver):
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--start-maximized")
chrome_options.add_argument("--lang=ja")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument('--ignore-certificate-errors')
chrome_options.add_argument('--ignore-ssl-errors')
chrome_options.add_argument('disable-dev-shm-usage')
chrome_options.add_argument('--blink-settings=imagesEnabled=false')
return chrome_options
url = "*****"
chrome_options = append_chromeoption(webdriver)
capabilities = DesiredCapabilities.CHROME.copy()
capabilities['acceptInsecureCerts'] = True
chrome = webdriver.Chrome(ChromeDriverManager().install(), chrome_options=chrome_options,desired_capabilities=capabilities)
chrome.implicitly_wait(10)
url_open(url, chrome)
リクエスト
基本の形
モジュール
import urllib.request, json, urllib.parse, requests, urllib3, json, cchardet, random
from requests.packages.urllib3.util.retry import Retry
from urllib3.exceptions import InsecureRequestWarning
from requests.adapters import HTTPAdapter
from fake_useragent import UserAgent
from urllib3.exceptions import InsecureRequestWarning
urllib3.disable_warnings(InsecureRequestWarning)
メイン
proxy_json = json.load(open("*****.json", "r",encoding="utf-8"))
URL = "https://www.google.com/search?"
headers = {"User-Agent": USER_AGENT}
proxy = proxy_json['proxy'][random.randint(0, len(proxy_json['proxy']) - 1)]
proxies = {"http" :proxy,"https":proxy}
header = {'user-agent':UserAgent().chrome}
s = requests.Session()
retries = Retry(total = 3, backoff_factor = 1, status_forcelist = [ 500, 502, 503, 504, 443 ])
s.mount('https://', HTTPAdapter(max_retries = retries))
s.mount('http://', HTTPAdapter(max_retries = retries))
try:
responce = s.get(URL, headers = header, proxies = proxies, verify = False)
responce.encoding = cchardet.detect(responce.content)["encoding"]
except:
pass
基本の形(検索系)
モジュール
import urllib.request, json, urllib.parse, requests, urllib3, json, cchardet, random
from requests.packages.urllib3.util.retry import Retry
from urllib3.exceptions import InsecureRequestWarning
from requests.adapters import HTTPAdapter
from fake_useragent import UserAgent
from urllib3.exceptions import InsecureRequestWarning
urllib3.disable_warnings(InsecureRequestWarning)
メイン
proxy_json = json.load(open("******.json", "r",encoding="utf-8"))
ENDPOINT = "https://www.google.com/search?"
headers = {"User-Agent": USER_AGENT}
proxy = proxy_json['proxy'][random.randint(0, len(proxy_json['proxy']) - 1)]
proxies = {"http" :proxy,"https":proxy}
header = {'user-agent':UserAgent().chrome}
s = requests.Session()
retries = Retry(total = 3, backoff_factor = 1, status_forcelist = [ 500, 502, 503, 504, 443 ])
s.mount('https://', HTTPAdapter(max_retries = retries))
s.mount('http://', HTTPAdapter(max_retries = retries))
nav_request = 'q={}&hl=ja'.format(search_title)
nav_request = urllib.parse.quote_plus(nav_request, safe='=&')
request = ENDPOINT + nav_request
try:
responce = s.get(request, headers = header, proxies = proxies, verify = False)
responce.encoding = cchardet.detect(responce.content)["encoding"]
except:
pass
相対パスのURL変換
モジュール
from urllib.parse import urlparse
import re
メイン
def convert_to_url(base, target_path):
component = urlparse(base)
directory = re.sub('/[^/]*$', '/',component.path)
url_return = ""
while(True):# 絶対パスのケース(簡易版)
petrn1 = re.compile("^http").search(target_path)
petrn2 = re.compile("^\/\/.+").search(target_path)
petrn3 = re.compile("^\/[^\/].+").search(target_path)
petrn4 = re.compile("^\.\/(.+)").search(target_path)
petrn5 = re.compile("^([^\.\/]+)(.*)").search(target_path)
petrn6 = re.compile("^([^\.\/]+)(.*)").search(target_path)
if petrn1: url_return = target_path; break
if petrn2: url_return = component.scheme + ":" + target_path; break # [1]「//exmaple.jp/aa.jpg」のようなケース
if petrn3: url_return = component.scheme + "://" + component.hostname + target_path; break # [2]「/aaa/aa.jpg」のようなケース
if petrn4: url_return = component.scheme + "://" + component.hostname + directory + re.findall("^\.\/(.+)", target_path)[0]; break #「./aa.jpg」のようなケース
if petrn5:
match_find = re.findall("^([^\.\/]+)(.*)", target_path)
url_return = component.scheme + "://" + component.hostname + directory + match_find[0][0] + match_find[0][1]; break#「../aa.jpg」のようなケース
if petrn6:
match_find = re.findall("\.\./", target_path)
nest = len(match_find)
dir_name = re.sub('/[^/]*$', '/',component.path) + "\n"
dir_array = dir_name.split('/')
dir_array.pop(0)
dir_array.pop()
dir_count = len(dir_array)
count = dir_count - nest
pathto=""
i = 0
while i < count:
pathto += "/" + dir_array[i];
i += 1
file = target_path.replace("../","")
url_return = component.scheme + "://" + component.hostname + pathto + "/" + file
break
return url_return
スプレッドシート
読み込む
モジュール
import gspread
import pandas as pd
from oauth2client.service_account import ServiceAccountCredentials
メイン
def load_spread(SERVICE_ACCOUNT_FILE,SPREADSHEET_KEY,SHEET_NANE):
SCOPES = ['https://spreadsheets.google.com/feeds','https://www.googleapis.com/auth/drive']
credentials = ServiceAccountCredentials.from_json_keyfile_name(SERVICE_ACCOUNT_FILE,SCOPES)
gs = gspread.authorize(credentials)
sheet = gs.open_by_key(SPREADSHEET_KEY).worksheet(SHEET_NANE)
df = pd.DataFrame(sheet.get_all_values())
df.columns = df.iloc[0].tolist()
df = df.drop(0, axis=0)
return [df, sheet]
SERVICE_ACCOUNT_FILE = "*****.json"
SPREADSHEET_KEY = '*******'
SHEET_NANE = 'シート1'
df_sheet, sheet = load_spread(SERVICE_ACCOUNT_FILE,SPREADSHEET_KEY,SHEET_NANE)
出力
モジュール
from gspread_dataframe import set_with_dataframe
メイン
max_row = len(df_sheet) + 2
set_with_dataframe(sheet, df, row=max_row, col=1, include_index=False, include_column_header=False)
データフレーム
読み込み
EXCEL
df = pd.read_excel(excel_file)
CSV
df = pd.read_csv(csv_file, encoding = "cp932")
列の数が違うとエラーがあったため
with open(csv_files[2], "r", encoding="shift_jis", errors="", newline="" ) as f:
lst = csv.reader(f, delimiter=",")
df = pd.DataFrame(lst)
df.columns = df.iloc[0].tolist()
df = df.drop(0, axis=0)
出力
EXCEL
df.to_excel(save_path, index=False)
CSV
def to_csv(df, save_path):
with open(save_path, mode="w", encoding="cp932", errors="ignore", newline="") as f:
df.to_csv(f, index=False)
to_csv(df, save_path)#出力
データ成形
列削除
df = df.drop('列名', axis=1)
nanの列削除
df = df.dropna(axis=1, how='all')
nan削除
df = df.dropna(subset = ['屋号', 'URL', '住所'])
df = df.reset_index(drop = True)
辞書型変換
dic = df.to_dict(orient="record")
重複削除
#最初残す
df = df[~df.duplicated(keep='first',subset = ['屋号', 'URL', '住所'])]
#最後残す
df = df[~df.duplicated(keep='last',subset = ['屋号', 'URL', '住所'])]
#残さない
df = df[~df.duplicated(subset = ['屋号', 'URL', '住所'])]
空白/0埋め
df['列名'] = df['列名'].astype(int)
df['列名'] = df['列名'].astype(str)
カラム変換
rename_dict = {
"屋号" : clients,
"電話番号" : phone
}
df = df.rename(columns=rename_dict)
結合
df_concat = pd.DataFrame()
df_concat = pd.concat([df_concat, df_next], axis=0)
データ型変換
時間型
df['作成日'] = pd.to_datetime(df['作成日'], format='%Y-%m-%d %H:%M')
文字列型
df['列名'] = df['列名'].fillna("").astype(str)
数字型
df['列名'] = df['列名'].fillna(0).astype(int)
自然言語処理
文字コード・空白削除
モジュール
import re
メイン
def re_text(df, col_name):
target = df[col_name].to_list()
tage_lst = target.astype(str)
tage_lst = [mojimoji.zen_to_han(tage, kana = True, digit = True, ascii = True) for tage in tage_lst] #半角にする
tage_lst = [re.sub('\s|[!"\n#$%&\'\\\\()*+,./:;<=>?@[\\]^_`{|}~「」〔〕“”〈〉『』【】&*・()$#@。、?!`+¥%]',
'',tage) for tage in tage_lst] #空白削除, 文字コード削除
df[col_name] = tage_lst
return df
屋号成形
モジュール
import pandas as pd
import re, mojimoji
メイン
def re_text(target):
tage_lst = target.astype(str)
tage_lst = [mojimoji.zen_to_han(tage, kana = True, digit = True, ascii = True) for tage in tage_lst] #半角にする
tage_lst = [re.sub('\s|[!"\n#$%&\'\\\\()*+,./:;<=>?@[\\]^_`{|}~「」〔〕“”〈〉『』【】&*・()$#@。、?!`+¥%]','', tage)\
for tage in tage_lst] #空白削除, 文字コード削除
return tage_lst
def main(df,file_type_itszai_cms,clomn_name, zyouken_path):
df_com = re_text(df[clomn_name])
df_kinsicom = pd.read_excel(, sheet_name=file_type_itszai_cms, header=None)[0].to_list()
df_kinsicom = [mojimoji.zen_to_han(tage, kana = True, digit = True, ascii = True) for tage in df_kinsicom] #半角にする
df_kinsicom = [re.sub('\s|[!"\n#$%&\'\\\\()*+,./:;<=>?@[\\]^_`{|}~「」〔〕“”〈〉『』【】&*・()$#@。、?!`+¥%]','', tage) \
for tage in df_kinsicom] #空白削除, 文字コード削除
target_com = []
for i, com in enumerate(df_com):
flag = True
for kinsi_com in df_kinsicom:
if kinsi_com in com:
flag = False
com = None;
break
if not flag:
target_com.append(i)
df = df.drop(df.index[target_com])
df = df.reset_index(drop=True)
return df
zyouken_path = "*****"
df = main(df,"イツザイ","屋号", zyouken_path)
屋号成形
モジュール
import pandas as pd
import re, mojimoji
メイン
def re_text(target):
tage_lst = target.astype(str)
tage_lst = [mojimoji.zen_to_han(tage, kana = True, digit = True, ascii = True) for tage in tage_lst] #半角にする
tage_lst = [re.sub('\s|[!"\n#$%&\'\\\\()*+,./:;<=>?@[\\]^_`{|}~「」〔〕“”〈〉『』【】&*・()$#@。、?!`+¥%]','', tage)\
for tage in tage_lst] #空白削除, 文字コード削除
return tage_lst
def main(df,file_type_itszai_cms,clomn_name, zyouken_path):
df_com = re_text(df[clomn_name])
df_kinsicom = pd.read_excel(, sheet_name=file_type_itszai_cms, header=None)[0].to_list()
df_kinsicom = [mojimoji.zen_to_han(tage, kana = True, digit = True, ascii = True) for tage in df_kinsicom] #半角にする
df_kinsicom = [re.sub('\s|[!"\n#$%&\'\\\\()*+,./:;<=>?@[\\]^_`{|}~「」〔〕“”〈〉『』【】&*・()$#@。、?!`+¥%]','', tage) \
for tage in df_kinsicom] #空白削除, 文字コード削除
target_com = []
for i, com in enumerate(df_com):
flag = True
for kinsi_com in df_kinsicom:
if kinsi_com in com:
flag = False
com = None;
break
if not flag:
target_com.append(i)
df = df.drop(df.index[target_com])
df = df.reset_index(drop=True)
return df
zyouken_path = "source/zyoukenn.xlsx"
df = main(df,"イツザイ","屋号", zyouken_path)
電話番号成形
モジュール
import re, mojimoji, phonenumbers
import pandas as pd
メイン
def main(df, df_kinsiphone, column):
df_kinsiphone = [str(phone) for phone in df_kinsiphone['number'].to_list()]
df_kinsiphone = [number.zfill(10) if len(number) == 9 else number.zfill(11) for number in df_kinsiphone]
phone_lst = df[column].to_list()
phone_lst = [str(phone) for phone in phone_lst]
phone_lst = [mojimoji.zen_to_han(phone, kana = True, digit = True, ascii = True) for phone in phone_lst] #半角にする
phone_lst = [re.sub('\s|[!"\n#$%&\'\\\\()*+,./:;<=>?@[\\]^_`{|}~「」〔〕“”〈〉『』【】&*・()$#@。、?!`+¥%]','', phone) for phone in phone_lst]
phone_lst = [number.zfill(10) if len(number) == 9 else number.zfill(11) for number in phone_lst]#空白削除, 文字コード削除
phone_lst = [re.sub('-|-|‐|−|‒|—|–|―|ー|─|━|ㅡ|ـ|⁻|₋','', phone) for phone in phone_lst] #ハイフン置き換え
phone_lst_tage = []
for phone in phone_lst:
try:
ptrn1 = re.compile(r"([0]\d{1,4})-(\d{1,4})-(\d{3,4})").search(phone) # "090-9628-5069"
ptrn2 = re.compile(r"(\d{1,4})-(\d{1,4})-(\d{3,4})").search(phone) # "29-843-1581"
ptrn3 = re.compile(r"([0]\d{9,})").search(phone) #"0298431581"
ptrn4 = re.compile(r"(\d{1,11})").search(phone) #"298431581"
phone = re.sub('-','',ptrn1.group())\
if ptrn1 else re.sub('-','',ptrn2.group()).zfill(len(re.sub('-','',ptrn2.group())))\
if ptrn2 else ptrn3.group()\
if ptrn3 else ptrn4.group().zfill(len(ptrn4.group()))\
if ptrn4 else None
if phone in df_kinsiphone:
phone_lst_tage.append("禁止番号")
elif phone:
try:
phone = phonenumbers.format_number(phonenumbers.parse(phone, "JP"), phonenumbers.PhoneNumberFormat.NATIONAL)
phone = None if phone.startswith("0120-") or phone.startswith("0800-") or phone.startswith("0066-")\
or phone.startswith("0037-") or phone.startswith("0-") or phone.startswith("050-") else phone
except phonenumbers.NumberParseException:
phone = None
finally:
phone_lst_tage.append(phone)
else:
phone_lst_tage.append(phone)
except:
phone_lst_tage.append(None)
df[column] = phone_lst_tage
return df
df_kinsiphone = pd.read_csv("*****.csv")
df = main(df, df_kinsiphone, "電話番号")
住所成形
モジュール
import re, json
メイン
/*
prefectures_id: [24,....
new_df_lst: [長野県長野市.....
search_address:[長野県,...
areas_id: [1,...
*/
def main(df,df_city,df_prenum):
pre_json_data = [dic["都道府県"] for dic in df_prenum]
petrn = r'(...??[都道府県])((?:旭川|伊達|石狩|盛岡|奥州|田村|南相馬|那須塩原|東村山|武蔵村山|羽村|十日町|上越|富山|\
野々市|大町|蒲郡|四日市|姫路|大和郡山|廿日市|下松|岩国|田川|大村|宮古|富良野|別府|佐伯|黒部|小諸|塩尻|玉野|\
周南)市|(?:余市|高市|[^市]{2,3}?)郡(?:玉村|大町|.{1,5}?)[町村]|(?:.{1,4}市)?[^町]{1,4}?区|.{1,7}?[市町村])'
df_lst = df["住所"].to_list()
df_lst = [str(address) for address in df_lst]
df_lst = [re.sub('\s|[!"\n#$%&\'\\\\()*+,./:;<=>?@[\\]^_`{|}~「」〔〕“”〈〉『』【】&*・()$#@。、?!`+¥%]','', address) for address in df_lst] #空白削除, 文字コード削除
df_lst = [re.compile(petrn).search(address).group() if re.compile(petrn).search(address) else address for address in df_lst]
df_lst = [re.sub("\d","",address) for address in df_lst]
df_city_dict = df_city.to_dict(orient = 'records')
df_ken = list(set(df_city['都道府県']))
search_address, count_lst, pre_dict, tage_df_lst = ([], [], {}, [])#県で調べる
for lst_data in df_lst:
text = "指定なし"
for dic_data in pre_json_data:
if dic_data in lst_data:
text = dic_data
break
tage_df_lst.append({"都道府県":text,"住所":lst_data})#市で調べる
for address in tage_df_lst:
text = address['都道府県']
if address['都道府県'] == "指定なし":
for address_dict in df_city_dict:
if address_dict['市区町村'] in address["住所"]:
text = address_dict['都道府県'];
break
search_address.append(text)
for i, address in enumerate(search_address):
count = 0
for prefecture in df_ken:
if (prefecture in df_lst[i]) and address == '指定なし':
count += 1
target = {i:prefecture}
if count == 1:
count_lst.append(target)
for count in count_lst:
pre_dict.update(count)
for num in pre_dict:
search_address[num] = pre_dict[num]
prefectures_id = [prenum["県コード"] for prefecture in search_address for prenum in df_prenum if prefecture == prenum["都道府県"]]
areas_id = [prenum["エリアコード"] for prefecture in prefectures_id for prenum in df_prenum if prefecture == prenum["県コード"]]
df_prefecture_sub = [pre["都道府県"] for pre in df_prenum]
new_df_lst = []
for city in df_lst:
for sub_text in df_prefecture_sub:
if sub_text in city:
city = re.sub(sub_text,"",city)
break
new_df_lst.append(city)
return prefectures_id,new_df_lst,search_address,areas_id
df_city = pd.read_csv("******.csv", encoding = "cp932")
df_prenum = json.load(open("*****.json", "r",encoding="utf-8"))
prefectures_id,new_df_lst,search_address,areas_id = main(df,df_city,df_prenum)
メール
送信
モジュール
import subprocess, smtplib, datetime
from email.mime.text import MIMEText
from email.utils import formatdate
メイン
def create_message(FROM_ADDER, to_addr, subject, body):
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = FROM_ADDER
msg['To'] = to_addr
msg['Date'] = formatdate()
return msg
def create_mail(FROM_ADDER, to_addr, msg, MY_PASSWORD):
smtpobj = smtplib.SMTP('smtp.gmail.com', 587)
smtpobj.ehlo()
smtpobj.starttls()
smtpobj.ehlo()
smtpobj.login(FROM_ADDER, MY_PASSWORD)
smtpobj.sendmail(FROM_ADDER, to_addr, msg.as_string())
smtpobj.close()
def send_mail(FROM_ADDER, TO_ADDER, MY_PASSWORD, title, body):
log_time = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")
msg = create_message(FROM_ADDER, TO_ADDER, title, body)
create_mail(FROM_ADDER, TO_ADDER, msg, MY_PASSWORD)
FROM_ADDER = "***********"
TO_ADDER = '**************'
MY_PASSWORD = "**************"
title ~ "【】"
body = "実行完了\n"
send_mail(FROM_ADDER, TO_ADDER, MY_PASSWORD, title, body):
受信
インストール
pip install pyzmail36
pip install backports.ssl
モジュール
import pyzmail, imapclient, datetime
from backports import ssl
from OpenSSL import SSL
メイン
def get_mail(MY_MAIL, MY_PASSWORD, dt):# SSL暗号化
context = ssl.SSLContext(SSL.TLSv1_2_METHOD)
imap = imapclient.IMAPClient("imap.gmail.com", ssl=True, ssl_context=context);# IMAP接続用のオブジェクト作成
imap.login(MY_MAIL,MY_PASSWORD);# IMAPサーバーログイン
imap.select_folder("INBOX", readonly=False)
KWD = imap.search(["ON",dt])
raw_message = imap.fetch(KWD,["BODY[]"])#検索結果保存
urls,names,texts = [],[],[]
if KWD:
for j in range(len(KWD)):
message = pyzmail.PyzMessage.factory(raw_message[KWD[j]][b"BODY[]"]);#特定メール取得
Subject = message.get_subject()
Body = message.text_part.get_payload().decode(message.text_part.charset)
Body = re.sub("\n|\r","",Body)
if "【メンション】" in Subject:
split_text = re.split("[【】]",Body)
split_num = len(split_text)
url = split_text[0] if split_num == 5 else None
name = split_text[2].split('/') if split_num == 5 else []
text = split_text[4] if split_num == 5 else None
urls.append(url)
names.append(name)
texts.append(text)
imap.delete_messages(KWD)
return urls,names,texts
MY_MAIL = "******************"
MY_PASSWORD = "******************"
dt = (datetime.datetime.now()).strftime('%d-%b-%Y')
urls,names,texts = get_mail(MY_MAIL, MY_PASSWORD, dt)
DB
cloud_sql・MySQL
インスタンス接続
インストール
pip install "cloud-sql-python-connector[pymysql]"
モジュール
import pandas as pd
import re, pymysql.cursors, os
from urllib.parse import urlparse
from google.cloud.sql.connector import Connector
メイン
def conect_db(db_url):
connector = Connector()
connection: pymysql.connections.Connection = connector.connect(
db_url.path[1:],
db_url.fragment,
user= db_url.username,
password = db_url.password,
db = db_url.query[1:],
local_infile = True,
cursorclass = pymysql.cursors.DictCursor
)
cursor = connection.cursor()
return cursor, connection
def sql_execute(sql, cursor, connection):
cursor.execute(sql)
connection.commit()
def sql_df(sql, connection):
df = pd.read_sql(sql = test, con = connection)
return df
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = f"*****.json"
INSTANCE_URL = "******************"; # 開発環境
INSTANCE_URL = '******************'; #本番環境
db_url = urlparse(INSTANCE_URL)
cursor, connection = conect_db(db_url)
df = sql_df(sql, connection)
display(df)
cursor.close()
connection.close()
ローカル接続
モジュール
import pymysql.cursors
import pandas as pd
メイン
def conect_db(HOST,PORT,USER, PASSWORD, DB_NAME):
connection = pymysql.connect(host = HOST,
port = PORT,
user = USER,
password = PASSWORD,
db = DB_NAME,
local_infile=True,
cursorclass = pymysql.cursors.DictCursor)
cursor = connection.cursor()
return cursor,connection
def sql_execute(sql, cursor, connection):
cursor.execute(sql)
connection.commit()
def sql_df(sql, cursor, connection):
df = pd.read_sql(sql = test, con = connection)
return df
HOST = '******************'
PORT = '******************'
USER = '******************'
PASSWORD = '******************'
DB_NAME = '******************'
cursor,connection = conect_db(HOST,PORT,USER, PASSWORD, DB_NAME)
sql_execute("""SLECT * FROM """, cursor, connection)
/*
df_test = sql_df(sql, cursor, connection)
display(df_test)
*/
cursor.close()
connection.close()
bigquery
SQLITE3
接続
モジュール
import sqlite3
メイン
def database_append(db_name, df):
conn = sqlite3.connect(db_name)
df.to_sql("table_class", conn, if_exists = "append")
c = conn.cursor()
conn.commit()
conn.close()
出力
モジュール
import sqlite3
メイン
def confi_database(db_name):
conn = sqlite3.connect(db_name)
c = conn.cursor()
query = u"SELECT * FROM table_class"
df = pd.read_sql_query(sql = query, con=conn )
conn.commit()
conn.close()
return df
db_name = "*****"
df = confi_database(db_name)
プロキシ設定
セレニウム
設定
モジュール
import zipfile
メイン
def make_proxy(save_path):
for i in range(10925, 10935):
PROXY_HOST = '******************'
PROXY_PORT = '******************'
PROXY_USER = '******************'
PROXY_PASS = '******************'
manifest_json = """
{
"version": "1.0.0",
"manifest_version": 2,
"name": "Chrome Proxy",
"permissions": [
"proxy",
"tabs",
"unlimitedStorage",
"storage",
"<all_urls>",
"webRequest",
"webRequestBlocking"
],
"background": {
"scripts": ["background.js"]
},
"minimum_chrome_version":"22.0.0"
}
"""
background_js = """
var config = {
mode: "fixed_servers",
rules: {
singleProxy: {
scheme: "http",
host: "%s",
port: parseInt(%s)
},
bypassList: ["localhost"]
}
};
chrome.proxy.settings.set({value: config, scope: "regular"}, function() {});
function callbackFn(details) {
return {
authCredentials: {
username: "%s",
password: "%s"
}
};
}
chrome.webRequest.onAuthRequired.addListener(
callbackFn,
{urls: ["<all_urls>"]},
['blocking']
);
""" % (PROXY_HOST, PROXY_PORT, PROXY_USER, PROXY_PASS)
pluginfile = f'{save_path}/proxy_auth_plugin_' + str(i) + '.zip'
with zipfile.ZipFile(pluginfile, 'w') as zp:
zp.writestr("manifest.json", manifest_json)
zp.writestr("background.js", background_js)
確認
モジュール
import random
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
メイン
save_path = ""
pluginfile = f"*****.zip"
chrome_options = webdriver.ChromeOptions()
chrome_options.add_extension(pluginfile)
browser = webdriver.Chrome(ChromeDriverManager().install(),chrome_options=chrome_options)
browser.implicitly_wait(10)
browser.get('https://expressvpn.com/what-is-my-ip')
ip_address = browser.find_element_by_class_name("ip-address").text
browser.quit()
print(ip_address)
リクエスト
確認
モジュール
import urllib.request, json, urllib.parse, requests, urllib3, json, cchardet, random
from bs4 import BeautifulSoup
メイン
proxy = '******************'
ENDPOINT = "https://www.expressvpn.com/what-is-my-ip"
proxies = {"http" :proxy,"https":proxy}
responce = requests.get(ENDPOINT, proxies = proxies, verify = False)
encode = responce.apparent_encoding
soup = BeautifulSoup(responce.text.encode(encode), "html.parser")
text = soup.find_all('h4',class_= "ip-address")[0].text
print(text)