1
1

More than 1 year has passed since last update.

チートシート【Python】

Last updated at Posted at 2022-12-11

よく使う

エラーハンドリング

traceback

import traceback
traceback.format_exc()

時間処理

モジュール

import datetime

メイン

dt = datetime.datetime.now()
dt_str = dt.strftime("%Y-%m-%d %H:%M:%S")
dt_minutes = (dt + datetime.timedelta(minutes=-5)).strftime("%Y-%m-%d %H:%M:%S")
dt_hours = (dt + datetime.timedelta(hours=-1)).strftime("%Y-%m-%d %H:%M:%S")
dt_days = (dt + datetime.timedelta(days=-1)).strftime("%Y-%m-%d %H:%M:%S")
dt_years = (dt + datetime.timedelta(years=-1)).strftime("%Y-%m-%d %H:%M:%S")

正規表現

モジュール

import re

メイン

petrn = re.compile(r"(?P<number>[0-9]{1,})部屋").search(tage)
petrn2 = re.compile(r"(?P<number>[0-9]{1,})数").search(tage)
petrn3 = re.compile(r"(?P<number>[0-9]{1,})~").search(tage)
petrn4 = re.compile(r"~(?P<number>[0-9]{1,})").search(tage)
petrn5 = re.compile(r"(?P<number>[0-9]{1,})").search(tage)
re_target = petrn.group("number")  if petrn else\
            petrn2.group("number") if petrn2 else\
            petrn3.group("number") if petrn3 else\
            petrn4.group("number") if petrn4 else\
            petrn5.group("number") if petrn5 else None

ファイル名操作

# 例:20221008【S】【対象外】【ホットペッパー】
basement  = os.path.basename(excel_file)
basement_no_extension = os.path.splitext(basement)[0]
re_target = re.split("[【】]", basement_no_extension)
lst_time, rank, site = re_target[0], re_target[1], re_target[5]

検索

fiter関数

fruits = ["apple", "lemon", "melon", "orange"]
lst = list(filter(lambda x: x.endswith("n"), fruits)) # filter関数

assert lst == ["lemon", "melon"], '配列の値が違う'

find関数

# ジェネレータ式によるfind
elm = next((f for f in fruits if f.endswith("n")), None)
assert elm == "lemon"

vlookup関数

モジュール

import bisect

メイン

/*
id = "探索するキーワード"
name = "見つけたいワード"
*/
def vlookup(df, key_list, id, name):
    id_list, name_list = [], []
    df_id = df[id].to_list()
    df_name = df[name].to_list()
    dic = dict(zip(df_id, df_name))
    sorted_str_dict = dict(sorted(dic.items()))
    dic_key = list(sorted_str_dict.keys())
    dic_value = list(sorted_str_dict.values())
    for key in key_list:
        data = ""
        top_point = bisect.bisect_left(dic_key, key)
        if int(top_point) < len(dic_key):
            if key == dic_key[top_point]:
                data = dic_value[top_point]
        name_list.append(data)
    return name_list

基本操作

text

読み込み

def read_text(path):
    f = open(path, 'r', encoding='UTF-8')
    data = f.read()
    f.close()
    data = data.replace("\n","")
    return data

json

読み込み

sub_sub_cates_json  = json.load(open("******.json", "r",encoding="utf-8"))

出力

jsonfile = '******.json'
jsonfile_open = open(jsonfile,"w",encoding="utf-8")
json.dump('******.json' , jsonfile_open , ensure_ascii=False , indent=4)
jsonfile_open.close()

EXCEL操作

モジュール

import os
import openpyxl
from openpyxl.utils import get_column_letter, column_index_from_string

出力


def adapt_filter(filename):
    wb = openpyxl.load_workbook(filename)
    ws = wb['Sheet1']
    ws.auto_filter.ref = get_column_letter(1) + str(1) + ':' + get_column_letter(ws.max_column) + str(ws.max_row)
    wb.save(filename)

OS操作

ファイル探索

全フォルダ

path = "*****"
file_path = [os.path.join(current_dir,sub_dir) for current_dir, sub_dirs, files_list in os.walk(path) for sub_dir in sub_dirs]

全ファイル

path = "******"
file_path = [os.path.join(current_dir,file_name) for current_dir, sub_dirs, files_list in os.walk(path) for file_name in files_list]

相対ファイル

import glob
path = "*******"
excel_files = glob.glob(save_path + "/*.xlsx")
csv_files = glob.glob(save_path + "/*.csv")    

時間条件追加

def get_file_path_time(file_path, int_time):
    tage_file = []
    for file in file_path:
        try:
            file_po = os.path.basename(file)
            text = re.split("[【】]", file_po)
            time, rank, pre, gyousyu, syokusyu, site,exe = text[0], text[1], text[3], text[5], text[7], text[9], text[10]
            petrn = re.compile(r"\d{1,}").search(time)
            time = petrn.group() if petrn else None
            if time:
                if int(time) < int_time:
                    tage_file.append(file)
            else:
                continue
        except:
            print(file)

    return tage_file

file_path_time =  get_file_path_time(file_path, 20220630)

テンプレ

path = "**********"
file_path = [os.path.join(current_dir,file_name) for current_dir, sub_dirs, files_list in os.walk(path) for file_name in files_list]
file_path = [file for file in file_path if ".xlsx" in file]
file_path = [file for file in file_path if not os.path.basename(file).endswith("】.xlsx")]
file_path = [file for file in file_path if os.path.basename(file).startswith("2022")]
file_path

ファイルコピー

モジュール

import shutil

メイン

def copy_file(file_path, copy_to_files):
    for i,moto_file in enumerate(file_path):
        try:
            shutil.copyfile(moto_file, copy_to_files[i])
        except:
            print(moto_file)

copy_to_file = [file.replace("*****","*****") for file in tage_file]
copy_file(file_path, copy_to_files)

ファイル削除

モジュール

import os

メイン

def del_file(file_path):
    for file in file_path:
        try:
            os.remove(file)
        except FileNotFoundError:
            print("ファイルなし")
del_file(file_path)

フォルダ作成

モジュール

import os, datetime

メイン

def make_dirs(save_path):
    dt = datetime.datetime.now()
    dt_str = dt.strftime("%Y-%m-%d")
    save_dir = f"{save_path}/dt_str"
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
    return save_dir
save_dir = make_dirs(save_path, exist_ok=True))

スクレイピング

セレニウム

基本の形(プロキシなし)

モジュール

from selenium import webdriver
from selenium.common.exceptions import TimeoutException, NoSuchElementException, WebDriverException, ElementClickInterceptedException,StaleElementReferenceException
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from webdriver_manager.chrome import ChromeDriverManager
from retry import retry

メイン

@retry(exceptions=(WebDriverException), tries = 3, delay = 5)
def url_open(first_url, chrome):
    chrome.get(first_url)
    time.sleep(int(random.uniform(2,4)))

def append_chromeoption(webdriver):
    chrome_options = webdriver.ChromeOptions()
    chrome_options.add_argument("--disable-gpu")
    chrome_options.add_argument("--start-maximized")
    chrome_options.add_argument("--lang=ja")
    chrome_options.add_argument("--no-sandbox")
    chrome_options.add_argument('--ignore-certificate-errors')
    chrome_options.add_argument('--ignore-ssl-errors')
    chrome_options.add_argument('disable-dev-shm-usage')
    chrome_options.add_argument('--blink-settings=imagesEnabled=false')    
    return chrome_options

url = ""
chrome_options = append_chromeoption(webdriver)
capabilities = DesiredCapabilities.CHROME.copy()
capabilities['acceptInsecureCerts'] = True
chrome = webdriver.Chrome(ChromeDriverManager().install(), chrome_options=chrome_options,desired_capabilities=capabilities)
chrome.implicitly_wait(10)
url_open(url, chrome)

基本の形(プロキシあり)

モジュール

import random 
from selenium import webdriver
from selenium.common.exceptions import TimeoutException, NoSuchElementException, WebDriverException, ElementClickInterceptedException,StaleElementReferenceException
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from webdriver_manager.chrome import ChromeDriverManager

メイン

@retry(exceptions=(WebDriverException), tries = 3, delay = 5)
def url_open(first_url, chrome):
    chrome.get(first_url)
    time.sleep(int(random.uniform(2,4)))

def append_chromeoption(webdriver, pluginfile):
    chrome_options = webdriver.ChromeOptions()
    chrome_options.add_argument("--disable-gpu")
    chrome_options.add_argument("--start-maximized")
    chrome_options.add_argument("--lang=ja")
    chrome_options.add_argument("--no-sandbox")
    chrome_options.add_argument('--ignore-certificate-errors')
    chrome_options.add_argument('--ignore-ssl-errors')
    chrome_options.add_argument('disable-dev-shm-usage')
    chrome_options.add_argument('--blink-settings=imagesEnabled=false')    
    return chrome_options

url = "******"
pluginfile = "*******.zip"
chrome_options = append_chromeoption(webdriver, pluginfile)
capabilities = DesiredCapabilities.CHROME.copy()
capabilities['acceptInsecureCerts'] = True
chrome = webdriver.Chrome(ChromeDriverManager().install(), chrome_options=chrome_options,desired_capabilities=capabilities)
chrome.implicitly_wait(10)
url_open(url, chrome)

モジュール

from selenium import webdriver
from selenium.common.exceptions import TimeoutException, NoSuchElementException, WebDriverException, ElementClickInterceptedException,StaleElementReferenceException
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from webdriver_manager.chrome import ChromeDriverManager

メイン

@retry(exceptions=(WebDriverException), tries = 3, delay = 5)
def url_open(first_url, chrome):
    chrome.get(first_url)
    time.sleep(int(random.uniform(2,4)))

def append_chromeoption(webdriver):
    chrome_options = webdriver.ChromeOptions()
    chrome_options.add_argument("--disable-gpu")
    chrome_options.add_argument("--start-maximized")
    chrome_options.add_argument("--lang=ja")
    chrome_options.add_argument("--no-sandbox")
    chrome_options.add_argument('--ignore-certificate-errors')
    chrome_options.add_argument('--ignore-ssl-errors')
    chrome_options.add_argument('disable-dev-shm-usage')
    chrome_options.add_argument('--blink-settings=imagesEnabled=false')    
    return chrome_options

url = "*****"
chrome_options = append_chromeoption(webdriver)
capabilities = DesiredCapabilities.CHROME.copy()
capabilities['acceptInsecureCerts'] = True
chrome = webdriver.Chrome(ChromeDriverManager().install(), chrome_options=chrome_options,desired_capabilities=capabilities)
chrome.implicitly_wait(10)
url_open(url, chrome)

リクエスト

基本の形

モジュール

import  urllib.request, json, urllib.parse, requests, urllib3, json, cchardet, random
from requests.packages.urllib3.util.retry import Retry
from urllib3.exceptions import InsecureRequestWarning
from requests.adapters import HTTPAdapter
from fake_useragent import UserAgent
from urllib3.exceptions import InsecureRequestWarning
urllib3.disable_warnings(InsecureRequestWarning)

メイン

proxy_json  = json.load(open("*****.json", "r",encoding="utf-8"))
URL = "https://www.google.com/search?"
headers = {"User-Agent": USER_AGENT} 
proxy = proxy_json['proxy'][random.randint(0, len(proxy_json['proxy']) - 1)]
proxies = {"http" :proxy,"https":proxy}
header  = {'user-agent':UserAgent().chrome}
s = requests.Session()
retries = Retry(total = 3, backoff_factor = 1, status_forcelist = [ 500, 502, 503, 504, 443 ])
s.mount('https://', HTTPAdapter(max_retries = retries))
s.mount('http://', HTTPAdapter(max_retries = retries))
try:
    responce = s.get(URL, headers = header, proxies = proxies, verify = False) 
    responce.encoding = cchardet.detect(responce.content)["encoding"] 
except:
    pass

基本の形(検索系)

モジュール

import  urllib.request, json, urllib.parse, requests, urllib3, json, cchardet, random
from requests.packages.urllib3.util.retry import Retry
from urllib3.exceptions import InsecureRequestWarning
from requests.adapters import HTTPAdapter
from fake_useragent import UserAgent
from urllib3.exceptions import InsecureRequestWarning
urllib3.disable_warnings(InsecureRequestWarning)

メイン

proxy_json  = json.load(open("******.json", "r",encoding="utf-8"))
ENDPOINT = "https://www.google.com/search?"
headers = {"User-Agent": USER_AGENT} 
proxy = proxy_json['proxy'][random.randint(0, len(proxy_json['proxy']) - 1)]
proxies = {"http" :proxy,"https":proxy}
header  = {'user-agent':UserAgent().chrome}
s = requests.Session()
retries = Retry(total = 3, backoff_factor = 1, status_forcelist = [ 500, 502, 503, 504, 443 ])
s.mount('https://', HTTPAdapter(max_retries = retries))
s.mount('http://', HTTPAdapter(max_retries = retries))
nav_request = 'q={}&hl=ja'.format(search_title)
nav_request = urllib.parse.quote_plus(nav_request, safe='=&')
request = ENDPOINT + nav_request 
try:
    responce = s.get(request, headers = header, proxies = proxies, verify = False) 
    responce.encoding = cchardet.detect(responce.content)["encoding"] 
except:
    pass

相対パスのURL変換

モジュール

from urllib.parse import urlparse
import re

メイン

def convert_to_url(base, target_path):
    component = urlparse(base)
    directory = re.sub('/[^/]*$', '/',component.path)
    url_return = ""
    while(True):# 絶対パスのケース(簡易版)
        petrn1 = re.compile("^http").search(target_path)
        petrn2 = re.compile("^\/\/.+").search(target_path)
        petrn3 = re.compile("^\/[^\/].+").search(target_path)
        petrn4 = re.compile("^\.\/(.+)").search(target_path)
        petrn5 = re.compile("^([^\.\/]+)(.*)").search(target_path)        
        petrn6 = re.compile("^([^\.\/]+)(.*)").search(target_path)    
        if petrn1: url_return = target_path; break
        if petrn2: url_return = component.scheme + ":" + target_path; break # [1]「//exmaple.jp/aa.jpg」のようなケース
        if petrn3: url_return = component.scheme + "://" + component.hostname + target_path; break # [2]「/aaa/aa.jpg」のようなケース
        if petrn4: url_return =  component.scheme + "://" + component.hostname + directory  + re.findall("^\.\/(.+)", target_path)[0]; break #「./aa.jpg」のようなケース
        if petrn5:
          match_find = re.findall("^([^\.\/]+)(.*)", target_path)
          url_return =  component.scheme + "://" + component.hostname + directory + match_find[0][0] + match_find[0][1]; break#「../aa.jpg」のようなケース
        if petrn6:
            match_find = re.findall("\.\./", target_path)
            nest = len(match_find)
            dir_name = re.sub('/[^/]*$', '/',component.path) + "\n"
            dir_array = dir_name.split('/')
            dir_array.pop(0)
            dir_array.pop()
            dir_count = len(dir_array)
            count = dir_count - nest
            pathto=""
            i = 0
            while i < count:
                pathto += "/" + dir_array[i];
                i += 1 
            file = target_path.replace("../","")    
            url_return =  component.scheme + "://" + component.hostname + pathto + "/" + file
            break

    return url_return

スプレッドシート

読み込む

モジュール

import gspread
import pandas as pd
from oauth2client.service_account import ServiceAccountCredentials

メイン

def load_spread(SERVICE_ACCOUNT_FILE,SPREADSHEET_KEY,SHEET_NANE):
    SCOPES = ['https://spreadsheets.google.com/feeds','https://www.googleapis.com/auth/drive']
    credentials = ServiceAccountCredentials.from_json_keyfile_name(SERVICE_ACCOUNT_FILE,SCOPES)
    gs = gspread.authorize(credentials)
    sheet = gs.open_by_key(SPREADSHEET_KEY).worksheet(SHEET_NANE)
    df = pd.DataFrame(sheet.get_all_values())
    df.columns = df.iloc[0].tolist()
    df = df.drop(0, axis=0)
    return [df, sheet]

SERVICE_ACCOUNT_FILE = "*****.json"
SPREADSHEET_KEY = '*******'
SHEET_NANE = 'シート1'
df_sheet, sheet = load_spread(SERVICE_ACCOUNT_FILE,SPREADSHEET_KEY,SHEET_NANE)

出力

モジュール

from gspread_dataframe import set_with_dataframe

メイン

max_row = len(df_sheet) + 2
set_with_dataframe(sheet, df, row=max_row, col=1, include_index=False, include_column_header=False)

データフレーム

読み込み

EXCEL

df = pd.read_excel(excel_file)

CSV

df = pd.read_csv(csv_file, encoding = "cp932")

列の数が違うとエラーがあったため

with open(csv_files[2], "r", encoding="shift_jis", errors="", newline="" ) as f:
    lst = csv.reader(f, delimiter=",")
    df = pd.DataFrame(lst)
    df.columns = df.iloc[0].tolist()
    df = df.drop(0, axis=0)

出力

EXCEL

df.to_excel(save_path, index=False)

CSV

def to_csv(df, save_path):
    with open(save_path, mode="w", encoding="cp932", errors="ignore", newline="") as f:
        df.to_csv(f, index=False)
to_csv(df, save_path)#出力

データ成形

列削除

df = df.drop('列名', axis=1)

nanの列削除

df = df.dropna(axis=1, how='all')

nan削除

df = df.dropna(subset = ['屋号', 'URL', '住所'])
df = df.reset_index(drop = True)

辞書型変換

dic = df.to_dict(orient="record")

重複削除

#最初残す
df = df[~df.duplicated(keep='first',subset = ['屋号', 'URL', '住所'])]
#最後残す
df = df[~df.duplicated(keep='last',subset = ['屋号', 'URL', '住所'])]
#残さない
df = df[~df.duplicated(subset = ['屋号', 'URL', '住所'])]

空白/0埋め

df['列名'] = df['列名'].astype(int)
df['列名'] = df['列名'].astype(str)

カラム変換

rename_dict = {
                "屋号" : clients,
                "電話番号" : phone
            }
df = df.rename(columns=rename_dict)

結合

df_concat = pd.DataFrame()
df_concat = pd.concat([df_concat, df_next], axis=0)

データ型変換

時間型

df['作成日'] = pd.to_datetime(df['作成日'], format='%Y-%m-%d %H:%M')

文字列型

df['列名'] = df['列名'].fillna("").astype(str)

数字型

df['列名'] = df['列名'].fillna(0).astype(int)

自然言語処理

文字コード・空白削除

モジュール

import re

メイン

def re_text(df, col_name):
    target = df[col_name].to_list()
    tage_lst = target.astype(str)
    tage_lst = [mojimoji.zen_to_han(tage, kana = True, digit = True, ascii = True) for tage in tage_lst] #半角にする
    tage_lst = [re.sub('\s|[!"\n#$%&\'\\\\()*+,./:;<=>?@[\\]^_`{|}~「」〔〕“”〈〉『』【】&*・()$#@。、?!`+¥%]',
    '',tage) for tage in tage_lst] #空白削除, 文字コード削除
    df[col_name] = tage_lst
    return df

屋号成形

モジュール

import pandas as pd
import re, mojimoji

メイン

def re_text(target):
    tage_lst = target.astype(str)
    tage_lst = [mojimoji.zen_to_han(tage, kana = True, digit = True, ascii = True) for tage in tage_lst] #半角にする
    tage_lst = [re.sub('\s|[!"\n#$%&\'\\\\()*+,./:;<=>?@[\\]^_`{|}~「」〔〕“”〈〉『』【】&*・()$#@。、?!`+¥%]','', tage)\
    for tage in tage_lst] #空白削除, 文字コード削除
    return tage_lst

def main(df,file_type_itszai_cms,clomn_name, zyouken_path):
    df_com = re_text(df[clomn_name])
    df_kinsicom = pd.read_excel(, sheet_name=file_type_itszai_cms, header=None)[0].to_list()
    df_kinsicom = [mojimoji.zen_to_han(tage, kana = True, digit = True, ascii = True) for tage in df_kinsicom] #半角にする
    df_kinsicom = [re.sub('\s|[!"\n#$%&\'\\\\()*+,./:;<=>?@[\\]^_`{|}~「」〔〕“”〈〉『』【】&*・()$#@。、?!`+¥%]','', tage) \
    for tage in df_kinsicom] #空白削除, 文字コード削除
    target_com = []
    for i, com in enumerate(df_com):
        flag = True
        for kinsi_com in df_kinsicom:
            if kinsi_com in com: 
                flag = False
                com = None; 
                break
        if not flag:
            target_com.append(i)
    df = df.drop(df.index[target_com])
    df = df.reset_index(drop=True)
    return df

zyouken_path = "*****"
df = main(df,"イツザイ","屋号", zyouken_path)

屋号成形

モジュール

import pandas as pd
import re, mojimoji

メイン

def re_text(target):
    tage_lst = target.astype(str)
    tage_lst = [mojimoji.zen_to_han(tage, kana = True, digit = True, ascii = True) for tage in tage_lst] #半角にする
    tage_lst = [re.sub('\s|[!"\n#$%&\'\\\\()*+,./:;<=>?@[\\]^_`{|}~「」〔〕“”〈〉『』【】&*・()$#@。、?!`+¥%]','', tage)\
    for tage in tage_lst] #空白削除, 文字コード削除
    return tage_lst

def main(df,file_type_itszai_cms,clomn_name, zyouken_path):
    df_com = re_text(df[clomn_name])
    df_kinsicom = pd.read_excel(, sheet_name=file_type_itszai_cms, header=None)[0].to_list()
    df_kinsicom = [mojimoji.zen_to_han(tage, kana = True, digit = True, ascii = True) for tage in df_kinsicom] #半角にする
    df_kinsicom = [re.sub('\s|[!"\n#$%&\'\\\\()*+,./:;<=>?@[\\]^_`{|}~「」〔〕“”〈〉『』【】&*・()$#@。、?!`+¥%]','', tage) \
    for tage in df_kinsicom] #空白削除, 文字コード削除
    target_com = []
    for i, com in enumerate(df_com):
        flag = True
        for kinsi_com in df_kinsicom:
            if kinsi_com in com: 
                flag = False
                com = None; 
                break
        if not flag:
            target_com.append(i)
    df = df.drop(df.index[target_com])
    df = df.reset_index(drop=True)
    return df

zyouken_path = "source/zyoukenn.xlsx"
df = main(df,"イツザイ","屋号", zyouken_path)

電話番号成形

モジュール

import re, mojimoji, phonenumbers
import pandas as pd

メイン

def main(df, df_kinsiphone, column):
    df_kinsiphone = [str(phone) for phone in df_kinsiphone['number'].to_list()]
    df_kinsiphone = [number.zfill(10) if len(number) == 9 else number.zfill(11) for number in  df_kinsiphone]
    phone_lst = df[column].to_list()
    phone_lst = [str(phone) for phone in phone_lst]
    phone_lst = [mojimoji.zen_to_han(phone, kana = True, digit = True, ascii = True) for phone in phone_lst] #半角にする
    phone_lst = [re.sub('\s|[!"\n#$%&\'\\\\()*+,./:;<=>?@[\\]^_`{|}~「」〔〕“”〈〉『』【】&*・()$#@。、?!`+¥%]','', phone) for phone in phone_lst] 
    phone_lst = [number.zfill(10) if len(number) == 9 else number.zfill(11) for number in  phone_lst]#空白削除, 文字コード削除
    phone_lst = [re.sub('-|-|‐|−|‒|—|–|―|ー|─|━|ㅡ|ـ|⁻|₋','', phone) for phone in phone_lst] #ハイフン置き換え
    phone_lst_tage = []
    for phone in phone_lst:
        try:
            ptrn1 = re.compile(r"([0]\d{1,4})-(\d{1,4})-(\d{3,4})").search(phone) # "090-9628-5069" 
            ptrn2 = re.compile(r"(\d{1,4})-(\d{1,4})-(\d{3,4})").search(phone) # "29-843-1581"
            ptrn3 = re.compile(r"([0]\d{9,})").search(phone) #"0298431581" 
            ptrn4 = re.compile(r"(\d{1,11})").search(phone) #"298431581"
            phone = re.sub('-','',ptrn1.group())\
                    if ptrn1 else re.sub('-','',ptrn2.group()).zfill(len(re.sub('-','',ptrn2.group())))\
                    if ptrn2 else ptrn3.group()\
                    if ptrn3 else ptrn4.group().zfill(len(ptrn4.group()))\
                    if ptrn4 else None
            if phone in df_kinsiphone:
                phone_lst_tage.append("禁止番号")
            elif phone:
                try: 
                    phone = phonenumbers.format_number(phonenumbers.parse(phone, "JP"), phonenumbers.PhoneNumberFormat.NATIONAL)
                    phone = None if phone.startswith("0120-") or phone.startswith("0800-") or phone.startswith("0066-")\
                    or phone.startswith("0037-") or phone.startswith("0-") or phone.startswith("050-") else phone  
                except phonenumbers.NumberParseException: 
                    phone = None    
                finally: 
                    phone_lst_tage.append(phone)
            else:
                phone_lst_tage.append(phone)
        except:
            phone_lst_tage.append(None)
    df[column] = phone_lst_tage
    return df

df_kinsiphone = pd.read_csv("*****.csv")
df = main(df, df_kinsiphone, "電話番号")

住所成形

モジュール

import re, json

メイン

/*
prefectures_id: [24,.... 
new_df_lst: [長野県長野市.....
search_address:[長野県,...
areas_id: [1,...
*/
def main(df,df_city,df_prenum):
    pre_json_data = [dic["都道府県"] for dic in df_prenum]
    petrn = r'(...??[都道府県])((?:旭川|伊達|石狩|盛岡|奥州|田村|南相馬|那須塩原|東村山|武蔵村山|羽村|十日町|上越|富山|\
                野々市|大町|蒲郡|四日市|姫路|大和郡山|廿日市|下松|岩国|田川|大村|宮古|富良野|別府|佐伯|黒部|小諸|塩尻|玉野|\
                周南)市|(?:余市|高市|[^市]{2,3}?)郡(?:玉村|大町|.{1,5}?)[町村]|(?:.{1,4}市)?[^町]{1,4}?区|.{1,7}?[市町村])'
    df_lst = df["住所"].to_list()
    df_lst = [str(address) for address in df_lst]
    df_lst = [re.sub('\s|[!"\n#$%&\'\\\\()*+,./:;<=>?@[\\]^_`{|}~「」〔〕“”〈〉『』【】&*・()$#@。、?!`+¥%]','', address) for address in df_lst] #空白削除, 文字コード削除
    df_lst = [re.compile(petrn).search(address).group() if re.compile(petrn).search(address) else address for address in  df_lst]
    df_lst = [re.sub("\d","",address) for address in df_lst]
    df_city_dict = df_city.to_dict(orient = 'records')
    df_ken = list(set(df_city['都道府県']))
    search_address, count_lst, pre_dict, tage_df_lst = ([], [], {}, [])#県で調べる

    for lst_data in df_lst:
        text = "指定なし"
        for dic_data in pre_json_data:
            if dic_data in lst_data:
                text = dic_data
                break
        tage_df_lst.append({"都道府県":text,"住所":lst_data})#市で調べる

    for address in tage_df_lst:
        text = address['都道府県'] 
        if address['都道府県'] == "指定なし":
            for address_dict in df_city_dict:
                if address_dict['市区町村'] in address["住所"]:
                    text =  address_dict['都道府県']; 
                    break
        search_address.append(text)

    for i, address in enumerate(search_address):
        count = 0
        for prefecture in df_ken: 
            if (prefecture in df_lst[i]) and address == '指定なし':
                count += 1
                target = {i:prefecture}
        if count == 1: 
            count_lst.append(target)  
    for count in count_lst:
        pre_dict.update(count)
    for num in pre_dict:
        search_address[num] = pre_dict[num]
    prefectures_id = [prenum["県コード"] for prefecture in search_address for prenum in df_prenum if prefecture == prenum["都道府県"]]
    areas_id = [prenum["エリアコード"] for prefecture in prefectures_id for prenum in df_prenum if prefecture == prenum["県コード"]]
    df_prefecture_sub = [pre["都道府県"] for pre in df_prenum]
    new_df_lst = []
    for city in df_lst:
        for sub_text in df_prefecture_sub:
            if sub_text in city:
                city = re.sub(sub_text,"",city)
                break
        new_df_lst.append(city)
    return prefectures_id,new_df_lst,search_address,areas_id

df_city = pd.read_csv("******.csv", encoding = "cp932")
df_prenum = json.load(open("*****.json", "r",encoding="utf-8"))
prefectures_id,new_df_lst,search_address,areas_id = main(df,df_city,df_prenum)

メール

送信

モジュール

import subprocess, smtplib, datetime
from email.mime.text import MIMEText
from email.utils import formatdate

メイン

def create_message(FROM_ADDER, to_addr, subject, body):
    msg = MIMEText(body)
    msg['Subject'] = subject
    msg['From'] = FROM_ADDER
    msg['To'] = to_addr
    msg['Date'] = formatdate()
    return msg

def create_mail(FROM_ADDER, to_addr, msg, MY_PASSWORD):
    smtpobj = smtplib.SMTP('smtp.gmail.com', 587)
    smtpobj.ehlo()
    smtpobj.starttls()
    smtpobj.ehlo()
    smtpobj.login(FROM_ADDER, MY_PASSWORD)
    smtpobj.sendmail(FROM_ADDER, to_addr, msg.as_string())
    smtpobj.close()


def send_mail(FROM_ADDER, TO_ADDER, MY_PASSWORD, title, body):
    log_time = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")
    msg = create_message(FROM_ADDER, TO_ADDER, title, body)
    create_mail(FROM_ADDER, TO_ADDER, msg, MY_PASSWORD) 

FROM_ADDER = "***********"
TO_ADDER = '**************'
MY_PASSWORD = "**************"
title ~ "【】"
body = "実行完了\n"
send_mail(FROM_ADDER, TO_ADDER, MY_PASSWORD, title, body):

受信

インストール

pip install  pyzmail36
pip install backports.ssl

モジュール

import pyzmail, imapclient, datetime
from backports import ssl
from OpenSSL import SSL 

メイン

def get_mail(MY_MAIL, MY_PASSWORD, dt):# SSL暗号化
    context = ssl.SSLContext(SSL.TLSv1_2_METHOD)
    imap = imapclient.IMAPClient("imap.gmail.com", ssl=True, ssl_context=context);# IMAP接続用のオブジェクト作成
    imap.login(MY_MAIL,MY_PASSWORD);# IMAPサーバーログイン
    imap.select_folder("INBOX", readonly=False)
    KWD = imap.search(["ON",dt])
    raw_message = imap.fetch(KWD,["BODY[]"])#検索結果保存
    urls,names,texts = [],[],[]
    if KWD:
        for j in range(len(KWD)):
            message = pyzmail.PyzMessage.factory(raw_message[KWD[j]][b"BODY[]"]);#特定メール取得
            Subject = message.get_subject()
            Body = message.text_part.get_payload().decode(message.text_part.charset)
            Body = re.sub("\n|\r","",Body)
            if "【メンション】" in Subject:
                split_text = re.split("[【】]",Body)
                split_num = len(split_text)
                url = split_text[0] if split_num == 5 else None
                name = split_text[2].split('/') if split_num == 5 else []
                text = split_text[4] if split_num == 5 else None
                urls.append(url)
                names.append(name)
                texts.append(text)
        imap.delete_messages(KWD)
    return urls,names,texts

MY_MAIL = "******************"
MY_PASSWORD = "******************"
dt = (datetime.datetime.now()).strftime('%d-%b-%Y')
urls,names,texts = get_mail(MY_MAIL, MY_PASSWORD, dt)

DB

cloud_sql・MySQL

インスタンス接続

インストール

pip install "cloud-sql-python-connector[pymysql]"

モジュール

import pandas as pd
import re, pymysql.cursors, os
from urllib.parse import urlparse
from google.cloud.sql.connector import Connector

メイン

def conect_db(db_url):
    connector = Connector()
    connection: pymysql.connections.Connection = connector.connect(
        db_url.path[1:],
        db_url.fragment,
        user= db_url.username,
        password = db_url.password,
        db = db_url.query[1:],
        local_infile = True,
        cursorclass = pymysql.cursors.DictCursor
    )
    cursor = connection.cursor()
    return cursor, connection

def sql_execute(sql, cursor, connection):
    cursor.execute(sql)
    connection.commit()

def sql_df(sql, connection):
    df = pd.read_sql(sql = test, con = connection)
    return df

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = f"*****.json"
INSTANCE_URL = "******************"; # 開発環境
INSTANCE_URL = '******************'; #本番環境
db_url = urlparse(INSTANCE_URL)
cursor, connection = conect_db(db_url)
df = sql_df(sql, connection)
display(df)
cursor.close()
connection.close()

ローカル接続

モジュール

import pymysql.cursors
import pandas as pd

メイン

def conect_db(HOST,PORT,USER, PASSWORD, DB_NAME):
    connection = pymysql.connect(host = HOST,
                                 port = PORT,
                                 user = USER,
                                 password = PASSWORD,
                                 db = DB_NAME,
                                 local_infile=True,
                                 cursorclass = pymysql.cursors.DictCursor)
    cursor = connection.cursor()
    return cursor,connection

def sql_execute(sql, cursor, connection):
    cursor.execute(sql)
    connection.commit()

def sql_df(sql, cursor, connection):
    df = pd.read_sql(sql = test, con = connection)
    return df

HOST = '******************'
PORT = '******************'
USER = '******************'
PASSWORD = '******************'
DB_NAME = '******************'
cursor,connection = conect_db(HOST,PORT,USER, PASSWORD, DB_NAME)
sql_execute("""SLECT * FROM """, cursor, connection)
/*
df_test = sql_df(sql, cursor, connection)
display(df_test)
*/
cursor.close()
connection.close()

bigquery

SQLITE3

接続

モジュール

import sqlite3

メイン

def database_append(db_name, df):
    conn = sqlite3.connect(db_name)
    df.to_sql("table_class", conn, if_exists = "append")
    c = conn.cursor()
    conn.commit()
    conn.close()

出力

モジュール

import sqlite3

メイン

def confi_database(db_name):
    conn = sqlite3.connect(db_name)
    c = conn.cursor()
    query = u"SELECT * FROM table_class"
    df = pd.read_sql_query(sql = query, con=conn )
    conn.commit()
    conn.close()
    return df

db_name = "*****"
df = confi_database(db_name)

プロキシ設定

セレニウム

設定

モジュール

import zipfile

メイン

def make_proxy(save_path):
    for i in range(10925, 10935):
        PROXY_HOST = '******************'
        PROXY_PORT =  '******************'
        PROXY_USER = '******************'
        PROXY_PASS = '******************'
        manifest_json = """
        {
            "version": "1.0.0",
            "manifest_version": 2,
            "name": "Chrome Proxy",
            "permissions": [
                "proxy",
                "tabs",
                "unlimitedStorage",
                "storage",
                "<all_urls>",
                "webRequest",
                "webRequestBlocking"
            ],
            "background": {
                "scripts": ["background.js"]
            },
            "minimum_chrome_version":"22.0.0"
        }
        """
        background_js = """
        var config = {
                mode: "fixed_servers",
                rules: {
                singleProxy: {
                    scheme: "http",
                    host: "%s",
                    port: parseInt(%s)
                },
                bypassList: ["localhost"]
                }
            };
        chrome.proxy.settings.set({value: config, scope: "regular"}, function() {});
        function callbackFn(details) {
            return {
                authCredentials: {
                    username: "%s",
                    password: "%s"
                }
            };
        }
        chrome.webRequest.onAuthRequired.addListener(
                    callbackFn,
                    {urls: ["<all_urls>"]},
                    ['blocking']
        );
        """ % (PROXY_HOST, PROXY_PORT, PROXY_USER, PROXY_PASS)
        pluginfile = f'{save_path}/proxy_auth_plugin_' +  str(i) +  '.zip' 
        with zipfile.ZipFile(pluginfile, 'w') as zp:
            zp.writestr("manifest.json", manifest_json)
            zp.writestr("background.js", background_js)

確認

モジュール

import random
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager

メイン

save_path = ""
pluginfile =  f"*****.zip"
chrome_options = webdriver.ChromeOptions()
chrome_options.add_extension(pluginfile)
browser = webdriver.Chrome(ChromeDriverManager().install(),chrome_options=chrome_options)
browser.implicitly_wait(10)
browser.get('https://expressvpn.com/what-is-my-ip')
ip_address = browser.find_element_by_class_name("ip-address").text
browser.quit()
print(ip_address)

リクエスト

確認

モジュール

import  urllib.request, json, urllib.parse, requests, urllib3, json, cchardet, random
from bs4 import BeautifulSoup

メイン

proxy = '******************'
ENDPOINT = "https://www.expressvpn.com/what-is-my-ip"
proxies = {"http" :proxy,"https":proxy}
responce = requests.get(ENDPOINT, proxies = proxies, verify = False) 
encode = responce.apparent_encoding
soup = BeautifulSoup(responce.text.encode(encode), "html.parser")
text = soup.find_all('h4',class_= "ip-address")[0].text
print(text)
1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1