概要
少し前に{sparklyr}というRからSparkを使うパッケージがRStudio社から公開されました。この{sparklyr}にはS3上のファイルも読み込めるspark_read_csv
という関数が提供されており、Amazon Athenaが東京リージョンに来るまで代わりに使えないかと試してみました。
今回はAWS Public Datasetsにあるデータセットを読み込んでみましたが、入力対象のS3バケットに権限があれば同じように扱えると思います。
事前準備
{sparklyr}の活用にあたって対象パッケージのインストールと、Spark環境の設定が必要になります。後者については{sparklyr}に関数が用意されているので、今回はそれを使用してローカルに環境構築します。
今回は試しませんが、ローカルではなくAWS上などに別構築したSpark環境も利用できます。この場合、タスクノードをスポットインスタンスで構成するなどで比較的安価に大規模データ処理環境の用意が可能になります。
また、AWS CLI(コマンドラインインターフェイス)を用いたパッケージでS3にアクセスしたりしますので、一緒にインストールします。
- AWS Documentation » Amazon Elastic MapReduce Documentation » リリースガイド » Apache Spark
- Cluster Deployment
Spark環境設定
前述の通り、{sparklyr}の関数でローカルにSpark/Hadoopの環境を構築します。ただし、{sparklyr}のバージョン次第で対応していないSpark/Hadoopのバージョンがありますのでご注意ください。
# {sparklyr}のインストール
# 再現できるようにrepos引数にMicrosoft R Openのスナップショットリポジトリを指定
# 特に支障がなくインストールされるはずなので、実行履歴は省略
install.packages(pkgs = "sparklyr", repos = "https://mran.microsoft.com/snapshot/2017-01-11/")
# インストールできたらパッケージ読み込み
library(sparklyr)
library(tidyverse)
# ローカル上にSparkとHadoop環境があるか確認
# まだ構築していないので、空のデータフレームが返ってくる
> sparklyr::spark_installed_versions()
[1] spark hadoop dir
<0 行> (または長さ 0 の row.names)
# {sparklyr}で用意されている関数でインストールできるバージョンの候補を表示
# ただし、インストールできるバージョンでも{sparklyr}では利用できないバージョンも表示される
> sparklyr::spark_available_versions()
spark hadoop install
5 1.6.2 2.6 spark_install(version = "1.6.2", hadoop_version = "2.6")
6 1.6.2 2.4 spark_install(version = "1.6.2", hadoop_version = "2.4")
7 1.6.2 2.3 spark_install(version = "1.6.2", hadoop_version = "2.3")
8 1.6.2 cdh4 spark_install(version = "1.6.2", hadoop_version = "cdh4")
9 1.6.1 2.6 spark_install(version = "1.6.1", hadoop_version = "2.6")
10 1.6.1 2.4 spark_install(version = "1.6.1", hadoop_version = "2.4")
11 1.6.1 2.3 spark_install(version = "1.6.1", hadoop_version = "2.3")
12 1.6.1 cdh4 spark_install(version = "1.6.1", hadoop_version = "cdh4")
13 1.6.0 2.6 spark_install(version = "1.6.0", hadoop_version = "2.6")
14 1.6.0 2.4 spark_install(version = "1.6.0", hadoop_version = "2.4")
15 1.6.0 2.3 spark_install(version = "1.6.0", hadoop_version = "2.3")
16 1.6.0 cdh4 spark_install(version = "1.6.0", hadoop_version = "cdh4")
17 2.0.0 2.7 spark_install(version = "2.0.0", hadoop_version = "2.7")
18 2.0.0 2.6 spark_install(version = "2.0.0", hadoop_version = "2.6")
19 2.0.0 2.4 spark_install(version = "2.0.0", hadoop_version = "2.4")
20 2.0.0 2.3 spark_install(version = "2.0.0", hadoop_version = "2.3")
21 2.0.1 2.7 spark_install(version = "2.0.1", hadoop_version = "2.7")
22 2.0.1 2.6 spark_install(version = "2.0.1", hadoop_version = "2.6")
23 2.0.1 2.4 spark_install(version = "2.0.1", hadoop_version = "2.4")
24 2.0.1 2.3 spark_install(version = "2.0.1", hadoop_version = "2.3")
25 2.0.2 2.7 spark_install(version = "2.0.2", hadoop_version = "2.7")
26 2.0.2 2.6 spark_install(version = "2.0.2", hadoop_version = "2.6")
27 2.0.2 2.4 spark_install(version = "2.0.2", hadoop_version = "2.4")
28 2.0.2 2.3 spark_install(version = "2.0.2", hadoop_version = "2.3")
29 2.1.0 2.7 spark_install(version = "2.1.0", hadoop_version = "2.7")
30 2.1.0 2.6 spark_install(version = "2.1.0", hadoop_version = "2.6")
31 2.1.0 2.4 spark_install(version = "2.1.0", hadoop_version = "2.4")
32 2.1.0 2.3 spark_install(version = "2.1.0", hadoop_version = "2.3")
# 上記の候補のうち、「version = "2.1.0", hadoop_version = "2.7"」を指定してインストール
> sparklyr::spark_install(version = "2.1.0", hadoop_version = "2.7")
Installing Spark 2.1.0 for Hadoop 2.7 or later.
Downloading from:
- 'https://d3kbcqa49mib13.cloudfront.net/spark-2.1.0-bin-hadoop2.7.tgz'
Installing to:
- '~/Library/Caches/spark/spark-2.1.0-bin-hadoop2.7'
URL 'https://d3kbcqa49mib13.cloudfront.net/spark-2.1.0-bin-hadoop2.7.tgz' を試しています
Content type 'application/x-tar' length 195636829 bytes (186.6 MB)
==================================================
downloaded 186.6 MB
Installation complete.
# ローカル上にSparkとHadoop環境があるか、再度確認
# インストールされたバージョンからなるデータフレームが返ってくる
> sparklyr::spark_installed_versions()
spark hadoop dir
1 2.1.0 2.7 spark-2.1.0-bin-hadoop2.7
# 試しに接続すると現バージョン(0.5.1)ではSpark 2.1.0には対応していない
> sc <- sparklyr::spark_connect(master = "local")
sparklyr does not currently support Spark version: 2.1.0
# インストールしたバージョンを削除
> sparklyr::spark_uninstall(version = "2.1.0", hadoop_version = "2.7")
spark-2.1.0-bin-hadoop2.7 successfully uninstalled.
# 改めて「version = "2.1.0", hadoop_version = "2.7"」を指定してインストール
> sparklyr::spark_install(version = "2.0.2", hadoop_version = "2.7")
Installing Spark 2.0.2 for Hadoop 2.7 or later.
Downloading from:
- 'https://d3kbcqa49mib13.cloudfront.net/spark-2.0.2-bin-hadoop2.7.tgz'
Installing to:
- '~/Library/Caches/spark/spark-2.0.2-bin-hadoop2.7'
URL 'https://d3kbcqa49mib13.cloudfront.net/spark-2.0.2-bin-hadoop2.7.tgz' を試しています
Content type 'application/x-tar' length 187426587 bytes (178.7 MB)
==================================================
downloaded 178.7 MB
Installation complete.
# 確認
> sparklyr::spark_installed_versions()
spark hadoop dir
1 2.0.2 2.7 spark-2.0.2-bin-hadoop2.7
# 接続できても何も返さない(1系以降のRStudioでは右上のSparkパネルが切り替わる)
sc <- sparklyr::spark_connect(master = "local")
# 接続されているか確認
> sparklyr::spark_connection_is_open(sc = sc)
[1] TRUE
# 接続を切って再確認
sparklyr::spark_disconnect(sc = sc)
> sparklyr::spark_connection_is_open(sc = sc)
[1] FALSE
AWS CLIのインストール
EC2であればAWS CLIが事前にインストールされていますが、自分のMac環境では用意されていないので別途インストールします。調べてみるとHomebrewでインストールできるようなので、今回はそちらで準備しました。
# 一部のみ表示
$ brew info awscli
awscli: stable 1.11.36 (bottled), HEAD
Official Amazon AWS command-line interface
https://aws.amazon.com/cli/
# インストール(履歴は省略)
$ brew install awscli
# インストール後にaws configureでアクセスキー、シークレットアクセスキー、デフォルトリージョンを自前のものを設定
# aws_pdatasetという名前のprofileを作って参照できるようにしておく
$ aws configure --profile aws_pdataset
AWS Public Datasetsのバケット構造を確認
AWS Public Datasetsにあるデータセットのバケット構造を調べるため、下記のパッケージを利用してS3にアクセスして確認しました。前述のとおり、こちらのパッケージではAWS CLIを利用しています。
library(aws.signature)
library(aws.s3)
# aws configureでアクセスキーなどを設定していない場合は下記のように環境変数をセットする必要がある
# セキュリティ観点から実キーは伏せています
Sys.setenv(AWS_ACCESS_KEY_ID = "***************")
Sys.setenv(AWS_SECRET_ACCESS_KEY = "***************")
Sys.setenv(AWS_DEFAULT_REGION = "us-east-1")
# profile名を指定して、aws configureで設定したアクセスキーやシークレットアクセスキーなどを読み込み
aws.signature::use_credentials(profile = "aws_pdataset")
# "1000genomes"のデータセットのバケット構造を閲覧
# ここでは"alignment_indices/2012"が付くものを15個表示
> genomes <- aws.s3::get_bucket(bucket = "1000genomes", prefix = "alignment_indices/2012", max = 15, parse_response = TRUE)
Bucket: 1000genomes
$Contents
Key: alignment_indices/20120522.alignment.chr20_cram.index
LastModified: 2013-05-29T17:35:36.000Z
ETag: "ea7863d5db6acc3b6ca997bfa6d2c6c9"
Size (B): 270984
Owner: 1000genomes
Storage class: STANDARD
$Contents
Key: alignment_indices/20120522.alignment.chr20_lossy_cram.index
LastModified: 2013-05-27T16:33:30.000Z
ETag: "aeae5be28807b89f67ebdfe85e2fd676"
Size (B): 284256
Owner: 1000genomes
Storage class: STANDARD
$Contents
Key: alignment_indices/20120522.alignment.index
LastModified: 2013-06-03T18:15:51.000Z
ETag: "82d9523c2a48d18185f6ba905c306d7e"
Size (B): 1561723
Owner: 1000genomes
Storage class: STANDARD
$Contents
Key: alignment_indices/20120522.alignment.index.bas.gz
LastModified: 2013-06-03T18:14:53.000Z
ETag: "275801bf228b1ff33664dc0d538e320f"
Size (B): 1233947
Owner: 1000genomes
Storage class: STANDARD
$Contents
Key: alignment_indices/20120522.alignment.mapped.binned_csra.index
LastModified: 2014-09-02T22:15:39.000Z
ETag: "ab60350c0873f9527ce1347541027a8e"
Size (B): 131623
Owner: 1000genomes
Storage class: STANDARD
$Contents
Key: alignment_indices/20120522.exome.alignment.chr20_cram.index
LastModified: 2013-05-29T08:43:56.000Z
ETag: "7cf05ef22b7d2c9b5e64bf5cb9f03916"
Size (B): 215312
Owner: 1000genomes
Storage class: STANDARD
$Contents
Key: alignment_indices/20120522.exome.alignment.chr20_lossy_cram.index
LastModified: 2014-09-03T04:15:33.000Z
ETag: "fce87b37792304ab79e9584e224bf1b8"
Size (B): 225944
Owner: 1000genomes
Storage class: STANDARD
$Contents
Key: alignment_indices/20120522.exome.alignment.index
LastModified: 2013-06-03T18:14:54.000Z
ETag: "83ff92283546ad85101b80cf9e04e10b"
Size (B): 1240451
Owner: 1000genomes
Storage class: STANDARD
$Contents
Key: alignment_indices/20120522.exome.alignment.index.HsMetrics.gz
LastModified: 2014-09-03T03:54:02.000Z
ETag: "6742b3655a3cd388cc1ec3406b61f835"
Size (B): 118484
Owner: 1000genomes
Storage class: STANDARD
$Contents
Key: alignment_indices/20120522.exome.alignment.index.HsMetrics.stats
LastModified: 2013-05-28T11:39:20.000Z
ETag: "6757ae9b582c46174b764f4efe9097a6"
Size (B): 376
Owner: 1000genomes
Storage class: STANDARD
$Contents
Key: alignment_indices/20120522.exome.alignment.index.bas.gz
LastModified: 2013-06-03T18:15:57.000Z
ETag: "a7e71fab455a9f9634e0b4ec2c208658"
Size (B): 1617379
Owner: 1000genomes
Storage class: STANDARD
$Contents
Key: alignment_indices/20120522.exome.alignment.mapped.binned_csra.index
LastModified: 2014-09-02T12:22:40.000Z
ETag: "3b77f700a59c1f27b0f04a05bda4fc63"
Size (B): 104557
Owner: 1000genomes
Storage class: STANDARD
$Contents
Key: alignment_indices/20120522_20111114.alignment_stats.exome.csv
LastModified: 2013-06-03T18:08:02.000Z
ETag: "e4149d8e97a324892571fad80e0c6a6e"
Size (B): 1344
Owner: 1000genomes
Storage class: STANDARD
$Contents
Key: alignment_indices/20120522_20111114.alignment_stats.low_coverage.csv
LastModified: 2013-06-03T18:08:02.000Z
ETag: "b99c5dc76cd28a83cee962ecb33e6235"
Size (B): 1500
Owner: 1000genomes
Storage class: STANDARD
$Contents
Key: alignment_indices/20121211.alignment.index
LastModified: 2014-09-02T08:48:22.000Z
ETag: "9668dcf90d6d149ae612581cf404d7a1"
Size (B): 2332675
Owner: 1000genomes
Storage class: STANDARD
ここでは次のふたつのファイルを読み込んでみます。
- alignment_indices/20120522.alignment.index
- alignment_indices/20120522.alignment.index.bas.gz
データフォーマットは下記を参考までに。
AWS Public Datasetsにあるファイルを読み込む
{sparklyr}ではS3にあるファイルを読み込める関数が定義されていますが、デフォルトの設定のままではローカルまたはEC2からは読み込めません(EMRからだと可能?)。
この件については、下記の通りヘルプドキュメントには記述がありません。
『spark_read_csv {sparklyr} - R Documentation』より
You can read data from HDFS (hdfs://), S3 (s3n://), as well as the local file system (file://).
sc <- sparklyr::spark_connect(master = "local")
> genome_alignment_indices <- sparklyr::spark_read_csv(
+ sc = sc, name = "alignment_indices",
+ path = "s3a://1000genomes/20120522.alignment.index",
+ infer_schema = TRUE, header = TRUE, delimiter = "\t"
+ )
エラー: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2195)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2654)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:381)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:379)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:344)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:379)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:149)
at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:413)
at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:349)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at sparklyr.Invoke$.invoke(invoke.scala:94)
at sparklyr.StreamHandler$.handleMethodCall(stream.scala:89)
at sparklyr.StreamHandler$.read(stream.scala:55)
at sparklyr.BackendHandler.channelRead0(handler.scala:49)
at sparklyr.BackendHandler.channelRead0(handler.scala:14)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
... 45 more
# 接続を切る
sparklyr::spark_disconnect(sc = sc)
これに対応するには、接続時の設定にhadoop-awsをデフォルトパッケージとして追加しておく必要があります。これをR上でするにはspark_config
を使います。
# sparklyr::spark_configでデフォルト設定ファイルを読み込み
spark_config <- sparklyr::spark_config()
# デフォルトパッケージにhadoop-awsを追加
spark_config$sparklyr.defaultPackages <- append(
x = spark_config$sparklyr.defaultPackages, "org.apache.hadoop:hadoop-aws:2.7.3"
)
# インストール状況によってバージョンは異なりそう
> list.files(path = "~/.ivy2/cache/org.apache.hadoop/hadoop-aws/jars/")
[1] "hadoop-aws-2.7.2.jar" "hadoop-aws-2.7.3.jar"
# config引数にセットした設定を渡して再度接続
sc <- sparklyr::spark_connect(master = "local", config = spark_config)
> spark_connection_is_open(sc = sc)
[1] TRUE
# エラーがなく結果が得られる
# infer_schema引数をTRUEにしておくと各カラムの型を推定してくれる
genome_alignment_indices <- sparklyr::spark_read_csv(
sc = sc, name = "alignment_indices", path = "s3a://1000genomes/alignment_indices/20100517.alignment.index",
infer_schema = TRUE, header = TRUE, delimiter = ","
)
> genome_alignment_indices
Source: query [4,424 x 6]
Database: spark connection master=local[4] app=sparklyr local=TRUE
BAM_FILE BAM_MD5
<chr> <chr>
1 data/HG00096/alignment/HG00096.chrom11.ILLUMINA.bwa.GBR.low_coverage.20120522.bam e2425c6f57b2aa4ddb08f472d98221d0
2 data/HG00096/alignment/HG00096.chrom20.ILLUMINA.bwa.GBR.low_coverage.20120522.bam c5cb5c7b356e95df899f7e780fa22e2b
3 data/HG00096/alignment/HG00096.mapped.ILLUMINA.bwa.GBR.low_coverage.20120522.bam 336ea55913bc261b72875bd259753046
4 data/HG00096/alignment/HG00096.unmapped.ILLUMINA.bwa.GBR.low_coverage.20120522.bam 21f4323f02d6d2a888a02bdfb24dc143
5 data/HG00103/alignment/HG00103.chrom11.ILLUMINA.bwa.GBR.low_coverage.20120522.bam 09f5199ca81b87fba667a1d60d166474
6 data/HG00103/alignment/HG00103.chrom20.ILLUMINA.bwa.GBR.low_coverage.20120522.bam 34f0cfc7d3584a64e104a1ebbc9209ab
7 data/HG00103/alignment/HG00103.mapped.ILLUMINA.bwa.GBR.low_coverage.20120522.bam a09f7acc6a736daae40f689b8c9e40c2
8 data/HG00103/alignment/HG00103.unmapped.ILLUMINA.bwa.GBR.low_coverage.20120522.bam 2d355139e1c8fe220efbf47f0e050bcf
9 data/HG00106/alignment/HG00106.chrom11.ILLUMINA.bwa.GBR.low_coverage.20120522.bam cef7bda3308f06dcb9fe494c7addbd76
10 data/HG00106/alignment/HG00106.chrom20.ILLUMINA.bwa.GBR.low_coverage.20120522.bam 81b3973ae3fa78083a9737f3bf800182
# ... with 4,414 more rows, and 4 more variables: BAI_FILE <chr>, BAI_MD5 <chr>, BAS_FILE <chr>, BAS_MD5 <chr>
# .gzファイルも対応
# infer_schema引数をFALSEにすると各カラムの型が文字列として読み込まれる
genome_alignment_indices_bas <- sparklyr::spark_read_csv(
sc = sc, name = "alignment_indices_bas", path = "s3a://1000genomes/alignment_indices/20120522.alignment.index.bas.gz",
infer_schema = FALSE, header = TRUE, delimiter = "\t"
)
> genome_alignment_indices_bas
Source: query [2.78e+04 x 21]
Database: spark connection master=local[4] app=sparklyr local=TRUE
bam_filename md5 study sample platform
<chr> <chr> <chr> <chr> <chr>
1 HG00096.chrom11.ILLUMINA.bwa.GBR.low_coverage.20120522 e2425c6f57b2aa4ddb08f472d98221d0 SRP001294 HG00096 ILLUMINA
2 HG00096.chrom11.ILLUMINA.bwa.GBR.low_coverage.20120522 e2425c6f57b2aa4ddb08f472d98221d0 SRP001294 HG00096 ILLUMINA
3 HG00096.chrom11.ILLUMINA.bwa.GBR.low_coverage.20120522 e2425c6f57b2aa4ddb08f472d98221d0 SRP001294 HG00096 ILLUMINA
4 HG00096.chrom20.ILLUMINA.bwa.GBR.low_coverage.20120522 c5cb5c7b356e95df899f7e780fa22e2b SRP001294 HG00096 ILLUMINA
5 HG00096.chrom20.ILLUMINA.bwa.GBR.low_coverage.20120522 c5cb5c7b356e95df899f7e780fa22e2b SRP001294 HG00096 ILLUMINA
6 HG00096.chrom20.ILLUMINA.bwa.GBR.low_coverage.20120522 c5cb5c7b356e95df899f7e780fa22e2b SRP001294 HG00096 ILLUMINA
7 HG00096.mapped.ILLUMINA.bwa.GBR.low_coverage.20120522 336ea55913bc261b72875bd259753046 SRP001294 HG00096 ILLUMINA
8 HG00096.mapped.ILLUMINA.bwa.GBR.low_coverage.20120522 336ea55913bc261b72875bd259753046 SRP001294 HG00096 ILLUMINA
9 HG00096.mapped.ILLUMINA.bwa.GBR.low_coverage.20120522 336ea55913bc261b72875bd259753046 SRP001294 HG00096 ILLUMINA
10 HG00096.unmapped.ILLUMINA.bwa.GBR.low_coverage.20120522 21f4323f02d6d2a888a02bdfb24dc143 SRP001294 HG00096 ILLUMINA
# ... with 2.779e+04 more rows, and 16 more variables: library <chr>, readgroup <chr>, `_total_bases` <chr>,
# `_mapped_bases` <chr>, `_total_reads` <chr>, `_mapped_reads` <chr>, `_mapped_reads_paired_in_sequencing` <chr>,
# `_mapped_reads_properly_paired` <chr>, `_of_mismatched_bases` <chr>, average_quality_of_mapped_bases <chr>,
# mean_insert_size <chr>, insert_size_sd <chr>, median_insert_size <chr>, insert_size_median_absolute_deviation <chr>,
# `_duplicate_reads` <chr>, `_duplicate_bases` <chr>
# columns引数に名前付きベクトルを与えることで各カラムの型と名前を指定できる
# RとSpark間の型の違いは下記のドキュメントを
# http://spark.apache.org/docs/latest/sparkr.html#data-type-mapping-between-r-and-spark
cols <- c(
rep(x = "string", length = 7), rep(x = "double", length = 7), rep(x = "float", length = 5),
"integer", "string"
)
names(x = cols) <- colnames(x = genome_alignment_indices_bas)
# 警告メッセージについては未調査
genome_alignment_indices_bas_cols <- sparklyr::spark_read_csv(
sc = sc, name = "alignment_indices_bas_cols", path = "s3a://1000genomes/alignment_indices/20120522.alignment.index.bas.gz",
infer_schema = FALSE, header = TRUE, delimiter = "\t",
columns = cols
)
警告メッセージ:
spark_csv_read(sc, spark_normalize_path(path), options, columns) で:
Dataset has 1 columns but 'columns' has length 21
> genome_alignment_indices_bas_cols
Source: query [2.78e+04 x 21]
Database: spark connection master=local[4] app=sparklyr local=TRUE
bam_filename md5 study sample platform library readgroup
<chr> <chr> <chr> <chr> <chr> <chr> <chr>
1 HG00096.chrom11.ILLUMINA.bwa.GBR.low_coverage.20120522 e2425c6f57b2aa4ddb08f472d98221d0 SRP001294 HG00096 ILLUMINA 2845856850 SRR062634
2 HG00096.chrom11.ILLUMINA.bwa.GBR.low_coverage.20120522 e2425c6f57b2aa4ddb08f472d98221d0 SRP001294 HG00096 ILLUMINA 2845856850 SRR062635
3 HG00096.chrom11.ILLUMINA.bwa.GBR.low_coverage.20120522 e2425c6f57b2aa4ddb08f472d98221d0 SRP001294 HG00096 ILLUMINA 2845856850 SRR062641
4 HG00096.chrom20.ILLUMINA.bwa.GBR.low_coverage.20120522 c5cb5c7b356e95df899f7e780fa22e2b SRP001294 HG00096 ILLUMINA 2845856850 SRR062634
5 HG00096.chrom20.ILLUMINA.bwa.GBR.low_coverage.20120522 c5cb5c7b356e95df899f7e780fa22e2b SRP001294 HG00096 ILLUMINA 2845856850 SRR062635
6 HG00096.chrom20.ILLUMINA.bwa.GBR.low_coverage.20120522 c5cb5c7b356e95df899f7e780fa22e2b SRP001294 HG00096 ILLUMINA 2845856850 SRR062641
7 HG00096.mapped.ILLUMINA.bwa.GBR.low_coverage.20120522 336ea55913bc261b72875bd259753046 SRP001294 HG00096 ILLUMINA 2845856850 SRR062634
8 HG00096.mapped.ILLUMINA.bwa.GBR.low_coverage.20120522 336ea55913bc261b72875bd259753046 SRP001294 HG00096 ILLUMINA 2845856850 SRR062635
9 HG00096.mapped.ILLUMINA.bwa.GBR.low_coverage.20120522 336ea55913bc261b72875bd259753046 SRP001294 HG00096 ILLUMINA 2845856850 SRR062641
10 HG00096.unmapped.ILLUMINA.bwa.GBR.low_coverage.20120522 21f4323f02d6d2a888a02bdfb24dc143 SRP001294 HG00096 ILLUMINA 2845856850 SRR062634
# ... with 2.779e+04 more rows, and 14 more variables: `_total_bases` <dbl>, `_mapped_bases` <dbl>, `_total_reads` <dbl>, `_mapped_reads` <dbl>,
# `_mapped_reads_paired_in_sequencing` <dbl>, `_mapped_reads_properly_paired` <dbl>, `_of_mismatched_bases` <dbl>,
# average_quality_of_mapped_bases <dbl>, mean_insert_size <dbl>, insert_size_sd <dbl>, median_insert_size <dbl>,
# insert_size_median_absolute_deviation <dbl>, `_duplicate_reads` <int>, `_duplicate_bases` <chr>
# `$`による参照はできないので注意
> genome_alignment_indices_bas_cols$bam_filename
NULL
# selectで参照
> genome_alignment_indices_bas_cols %>%
+ dplyr::select(bam_filename) %>%
+ head(n = 10)
Source: query [10 x 1]
Database: spark connection master=local[4] app=sparklyr local=TRUE
bam_filename
<chr>
1 HG00096.chrom11.ILLUMINA.bwa.GBR.low_coverage.20120522
2 HG00096.chrom11.ILLUMINA.bwa.GBR.low_coverage.20120522
3 HG00096.chrom11.ILLUMINA.bwa.GBR.low_coverage.20120522
4 HG00096.chrom20.ILLUMINA.bwa.GBR.low_coverage.20120522
5 HG00096.chrom20.ILLUMINA.bwa.GBR.low_coverage.20120522
6 HG00096.chrom20.ILLUMINA.bwa.GBR.low_coverage.20120522
7 HG00096.mapped.ILLUMINA.bwa.GBR.low_coverage.20120522
8 HG00096.mapped.ILLUMINA.bwa.GBR.low_coverage.20120522
9 HG00096.mapped.ILLUMINA.bwa.GBR.low_coverage.20120522
10 HG00096.unmapped.ILLUMINA.bwa.GBR.low_coverage.20120522
# とりあえず集計してみる
> genome_summarize <- genome_alignment_indices_bas_cols %>%
dplyr::group_by(readgroup) %>%
dplyr::summarize(
total_bases = sum(`_total_bases`),
mapped_bases = sum(`_mapped_bases`),
total_reads = sum(`_total_reads`),
mapped_reads = sum(`_mapped_reads`)
) %>%
print
Source: query [6,949 x 5]
Database: spark connection master=local[4] app=sparklyr local=TRUE
readgroup total_bases mapped_bases total_reads mapped_reads
<chr> <dbl> <dbl> <dbl> <dbl>
1 ERR015528 6983935452 5952262482 64666069 62451949
2 ERR020229 19111863316 16893939598 210020476 199362355
3 ERR013151 4090595256 3654598436 37875882 36786031
4 ERR015759 1341103068 1249066562 12417621 11811715
5 ERR019905 2264878476 2181465658 20971097 20421069
6 ERR013138 6858917784 6220244192 63508498 60258668
7 SRR037779 2251299740 1995203950 29622365 27061710
8 SRR037781 2203411836 1942146896 28992261 26343528
9 SRR043409 4294927960 3987005465 59268712 57122507
10 SRR044230 5247820064 4803974017 69050264 66265436
# ... with 6,939 more rows
# とりあえずPCA
> genome_summarize %>%
+ sparklyr::ml_pca(features = c("total_bases", "mapped_bases", "total_reads", "mapped_reads"))
* No rows dropped by 'na.omit' call
Explained variance:
PC1 PC2 PC3 PC4
9.977596e-01 2.239423e-03 9.743283e-07 1.377467e-08
Rotation:
PC1 PC2 PC3 PC4
total_bases -0.732068499 0.681081744 0.0140674519 0.002340459
mapped_bases -0.681151089 -0.732137051 0.0001951216 -0.002914359
total_reads -0.007463459 0.009455473 -0.7330409740 -0.680077805
mapped_reads -0.007294029 0.003686524 -0.6800389689 0.733130416
# とりあえず、K-means
> genome_cls <- genome_summarize %>%
+ sparklyr::ml_kmeans(centers = 4, features = c("total_bases", "mapped_bases", "total_reads", "mapped_reads"))
* No rows dropped by 'na.omit' call
> table(fitted(object = genome_cls))
0 1 2 3
4801 72 1815 261
ゲノムの知識もデータソースの素性もよくわからないので、分析は先送り。
まとめ
{sparklyr}のspark_read_csv
を用いることで、S3バケット上にあるデータを読み込めました。これにより、S3バケットにあるファイルをお手軽に分析できるようになりますし、データ検証のためにRedshiftにコピーとかダウンロードしてファイルを開くとか不要になります。
Amazon Athenaがどれくらい便利か判断しかねますが、RStudio上で処理できますのでRおじさんはますます捗りますね。
ちなみに今回と同じことは{sparkr}のread.df
でも可能です。どちらのパッケージがいいか悩むかもしれませんが、お好みでとしか言えません。
また、Java関係のライブラリをRで扱う場合によくある話ですが、使えるメモリサイズはあらかじめ設定しておきましょう。
# SPARK_MEMは設定しなくてもいい?
# Sys.setenv("SPARK_MEM" = "64g")
spark_conf <- sparklyr::spark_config()
spark_conf$spark.executor.cores <- 2
spark_conf$spark.executor.memory <- "4G"
spark_conf$spark.driver.memory <- "12G"
参考
- Data Science in Spark with sparklyr Cheat Sheet
- Tutorial: Scalable R on Spark with SparkR, sparklyr and RevoScaleR
- Running sparklyr – RStudio’s R Interface to Spark on Amazon EMR
- AWS EMR bootstrap to install RStudio Server along with sparklyr
実行環境
> devtools::session_info()
Session info -----------------------------------------------------------------------------------------------------------------
setting value
version R version 3.3.2 (2016-10-31)
system x86_64, darwin15.6.0
ui RStudio (1.0.136)
language (EN)
collate ja_JP.UTF-8
tz Asia/Tokyo
date 2017-01-19
Packages ---------------------------------------------------------------------------------------------------------------------
package * version date source
assertthat 0.1 2013-12-06 CRAN (R 3.3.2)
aws.s3 * 0.1.34 2017-01-11 local
aws.signature * 0.2.6 2017-01-11 local
backports 1.0.4 2016-10-24 CRAN (R 3.3.2)
base64enc 0.1-3 2015-07-28 CRAN (R 3.3.2)
colorspace 1.3-2 2016-12-14 CRAN (R 3.3.2)
config 0.2 2016-08-02 cran (@0.2)
curl 2.3 2016-11-24 cran (@2.3)
DBI 0.5-1 2016-09-10 CRAN (R 3.3.2)
devtools 1.12.0 2016-12-05 CRAN (R 3.3.2)
digest 0.6.10 2016-08-02 CRAN (R 3.3.2)
dplyr * 0.5.0 2016-06-24 CRAN (R 3.3.2)
ggfortify 0.3.0.9000 2016-12-31 Github (sinhrks/ggfortify@907a044)
ggplot2 * 2.2.0 2016-11-11 CRAN (R 3.3.2)
gridExtra 2.2.1 2016-02-29 CRAN (R 3.3.2)
gtable 0.2.0 2016-02-26 CRAN (R 3.3.2)
httr 1.2.1 2016-07-03 CRAN (R 3.3.2)
jsonlite 1.1 2016-09-14 CRAN (R 3.3.2)
lazyeval 0.2.0 2016-06-12 CRAN (R 3.3.2)
magrittr 1.5 2014-11-22 CRAN (R 3.3.2)
memoise 1.0.0 2016-01-29 CRAN (R 3.3.2)
munsell 0.4.3 2016-02-13 CRAN (R 3.3.2)
plyr 1.8.4 2016-06-08 CRAN (R 3.3.2)
purrr * 0.2.2 2016-06-18 CRAN (R 3.3.2)
R6 2.2.0 2016-10-05 CRAN (R 3.3.2)
rappdirs 0.3.1 2016-03-28 CRAN (R 3.3.2)
Rcpp 0.12.8 2016-11-17 CRAN (R 3.3.2)
readr * 1.0.0 2016-08-03 CRAN (R 3.3.2)
RevoUtils 10.0.2 2016-11-22 local
rprojroot 1.1 2016-10-29 CRAN (R 3.3.2)
scales 0.4.1 2016-11-09 CRAN (R 3.3.2)
sparklyr 0.5.1 2016-12-19 CRAN (R 3.3.2)
tibble * 1.2 2016-08-26 CRAN (R 3.3.2)
tidyr * 0.6.0 2016-08-12 CRAN (R 3.3.2)
tidyverse * 1.0.0 2016-09-09 CRAN (R 3.3.2)
withr 1.0.2 2016-06-20 CRAN (R 3.3.2)
xml2 1.1.0 2017-01-07 CRAN (R 3.3.2)
yaml 2.1.14 2016-11-12 CRAN (R 3.3.2)