tmp = [[1, '2012/01/05', '岡野 徹'],
[2, '2012/1/0', '庄野 弘一'],
[3, '2012/01/3', '若山 みどり'],
[4, '2012/1/19', '岡野 徹']]
from pyspark.sql.types import *
schema = StructType([
StructField("id", IntegerType(), True),
StructField("date", StringType(), True),
StructField("name", StringType(), True)]
)
df = spark.createDataFrame(tmp, schema=schema)
from typing import List
import pyspark
import re
def get_particular_type_columns(_type: pyspark.sql.types.StringType) -> List:
stringcol_list = []
for col in range(len(df.schema)):
if type(df.schema[col].dataType) == _type:
stringcol_list.append((df.schema[col].name))
return stringcol_list
pattern = re.compile(r'[0-9]{4}/[0-9]{1}/[0-9]{1}|[0-9]{4}/[0-9]{2}/[0-9]{2}|[0-9]{4}/[0-9]{2}/[0-9]{1}|[0-9]{4}/[0-9]{1}/[0-9]{2}')
tablename_list = ['df']
col_dict = {}
for tablename in tablename_list:
col_list = get_particular_type_columns(_type=pyspark.sql.types.StringType)
col_dict[f'{tablename}'] = col_list
print(col_dict)
out_dict = {}
for tablename in tablename_list:
# print(','.join(stringcol_dict[f'{table}']))
selected_col = ','.join(col_dict[f'{tablename}'])
# df = spark.sql(f'''
# select {selected_col}
# from {table}
# limit 10
# ''')
collection = df.collect()
# 10レコードに対して正規表現マッチを行ってマッチしたレコードをリストに追加していく
out_record = []
for record in collection:
for value in record:
if type(pattern.match(str(value))) == re.Match: # 外す
# print(value)
out_record.append(record)
out_dict[f'{tablename}'] = out_record
print('out_record\n',out_record)
print('out_dict\n',out_dict)
# print(f'table name:{tablename}')
# print(selected_col)
{'df': ['date', 'name']}
out_record
[Row(id=1, date='2012/01/05', name='岡野 徹'), Row(id=2, date='2012/1/0', name='庄野 弘一'), Row(id=3, date='2012/01/3', name='若山 みどり'), Row(id=4, date='2012/1/19', name='岡野 徹')]
out_dict
{'df': [Row(id=1, date='2012/01/05', name='岡野 徹'), Row(id=2, date='2012/1/0', name='庄野 弘一'), Row(id=3, date='2012/01/3', name='若山 みどり'), Row(id=4, date='2012/1/19', name='岡野 徹')]}