はじめに
Beginning Apache Spark using Azure Databricksやっていきます
開発環境
Chapter 5: Getting Data into Databricks
%fs ls /
path,name,size,modificationTime
dbfs:/FileStore/,FileStore/,0,1649813340000
dbfs:/databricks-datasets/,databricks-datasets/,0,0
dbfs:/databricks-results/,databricks-results/,0,0
dbfs:/local_disk0/,local_disk0/,0,1650294537000
dbfs:/tmp/,tmp/,0,1650699514000
dbfs:/user/,user/,0,1650294511000
%fs ls /databricks-datasets/
path,name,size,modificationTime
dbfs:/databricks-datasets/COVID/,COVID/,0,1650702268109
dbfs:/databricks-datasets/README.md,README.md,976,1532468253000
dbfs:/databricks-datasets/Rdatasets/,Rdatasets/,0,1650702268109
dbfs:/databricks-datasets/SPARK_README.md,SPARK_README.md,3359,1455043490000
dbfs:/databricks-datasets/adult/,adult/,0,1650702268109
dbfs:/databricks-datasets/airlines/,airlines/,0,1650702268109
...
%fs ls /databricks-datasets/airlines/
path,name,size,modificationTime
dbfs:/databricks-datasets/airlines/README.md,README.md,1089,1454697889000
dbfs:/databricks-datasets/airlines/_SUCCESS,_SUCCESS,0,1436493184000
dbfs:/databricks-datasets/airlines/part-00000,part-00000,67108879,1436493184000
dbfs:/databricks-datasets/airlines/part-00001,part-00001,67108862,1436493185000
...
%fs head /databricks-datasets/airlines/README.md
================================================
Airline On-Time Statistics and Delay Causes
================================================
## Background
The U.S. Department of Transportation's (DOT) Bureau of Transportation Statistics (BTS) tracks the on-time performance of domestic flights operated by large air carriers.
...
%fs cp /databricks-datasets/airlines/README.md /
res2: Boolean = true
%fs rm /README.md
res3: Boolean = true
dbutils.fs.ls("/databricks-datasets")
Out[1]: [FileInfo(path='dbfs:/databricks-datasets/COVID/', name='COVID/', size=0, modificationTime=1650702243729),
FileInfo(path='dbfs:/databricks-datasets/README.md', name='README.md', size=976, modificationTime=1532468253000),
FileInfo(path='dbfs:/databricks-datasets/Rdatasets/', name='Rdatasets/', size=0, modificationTime=1650702243729),
FileInfo(path='dbfs:/databricks-datasets/SPARK_README.md', name='SPARK_README.md', size=3359, modificationTime=1455043490000),
FileInfo(path='dbfs:/databricks-datasets/adult/', name='adult/', size=0, modificationTime=1650702243729),
FileInfo(path='dbfs:/databricks-datasets/airlines/', name='airlines/', size=0, modificationTime=1650702243729),
...
dbutils.fs.head("/databricks-datasets/airlines/README.md")
Out[3]: "================================================\nAirline On-Time Statistics and Delay Causes\n================================================\n\n## Background\nThe U.S. Department of Transportation's (DOT) Bureau of Transportation Statistics (BTS) tracks the on-time performance of domestic flights operated by large air carriers.
files = dbutils.fs.ls("/")
for f in files:
print(f.name)
FileStore/
databricks-datasets/
databricks-results/
local_disk0/
tmp/
user/
x = [print(f.name) for f in dbutils.fs.ls("/")]
FileStore/
databricks-datasets/
databricks-results/
local_disk0/
tmp/
user/
displayHTML("<img src = '/files/images/logo.png'>")
%sh
cd /tmp
wget http://ergast.com/downloads/f1db_csv.zip
%sh ls /tmp
...
f1db_csv.zip
...
%sh
unzip -Z1 /tmp/f1db_csv.zip
circuits.csv
constructor_results.csv
constructors.csv
constructor_standings.csv
drivers.csv
driver_standings.csv
lap_times.csv
pit_stops.csv
qualifying.csv
races.csv
results.csv
seasons.csv
sprint_results.csv
status.csv
%sh
unzip -j /tmp/f1db_csv.zip constructors.csv -d /tmp
Archive: /tmp/f1db_csv.zip
inflating: /tmp/constructors.csv
%sh ls /tmp/*.csv
/tmp/constructors.csv
%sh
mv /tmp/constructors.csv /dbfs/tmp
from requests import get
with open('/tmp/f1.zip', "wb") as file:
response = get('http://ergast.com/downloads/f1db_csv.zip')
file.write(response.content)
from zipfile import ZipFile
with ZipFile('/tmp/f1.zip', 'r') as zip:
files = zip.namelist()
for file in files:
print(file)
circuits.csv
constructor_results.csv
constructors.csv
constructor_standings.csv
drivers.csv
driver_standings.csv
lap_times.csv
pit_stops.csv
qualifying.csv
races.csv
results.csv
seasons.csv
sprint_results.csv
status.csv
from zipfile import ZipFile
with ZipFile('/tmp/f1.zip', 'r') as zip:
zip.extract('seasons.csv','/tmp')
import os
os.listdir("/tmp")
Out[22]: [ 'f1db_csv.zip',
'seasons.csv',]
dbutils.fs.mv("file:/tmp/seasons.csv", "dbfs:/tmp/seasons.csv")
Out[24]: True
df = spark \
.read \
.format("csv") \
.option("inferSchema","true") \
.option("header","false") \
.load("dbfs:/tmp/seasons.csv") \
.selectExpr("_c0 as year", "_c1 as url")
df.write.saveAsTable('seasons')
%sql
create temporary table test (year INT, url STRING) using csv options (path
"dbfs:/tmp/seasons.csv", header "false", mode "FAILFAST");
select * from test;
FileReadException: Error while reading file dbfs:/tmp/seasons.csv.
Caused by: SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
Caused by: BadRecordException: java.lang.NumberFormatException: For input string: "year"
Caused by: NumberFormatException: For input string: "year"
%sql
show tables;
database,tableName,isTemporary
default,seasons,false
,test,true
%sql
drop table test;
OK
%sql
create temporary table test (year INT, url STRING) using csv options (path
"dbfs:/tmp/seasons.csv", header "false", mode "PERMISSIVE");
select * from test;
year,url
null,url
2009,http://en.wikipedia.org/wiki/2009_Formula_One_season
2008,http://en.wikipedia.org/wiki/2008_Formula_One_season
...
S3のマウント
Blobのマウント