1. Qiita
  2. Items
  3. R

{sparklyr}でS3バケット上のファイルをRで扱う

  • 3
    Like
  • 0
    Comment

概要

 少し前に{sparklyr}というRからSparkを使うパッケージがRStudio社から公開されました。この{sparklyr}にはS3上のファイルも読み込めるspark_read_csvという関数が提供されており、Amazon Athenaが東京リージョンに来るまで代わりに使えないかと試してみました。
 今回はAWS Public Datasetsにあるデータセットを読み込んでみましたが、入力対象のS3バケットに権限があれば同じように扱えると思います。

事前準備

 {sparklyr}の活用にあたって対象パッケージのインストールと、Spark環境の設定が必要になります。後者については{sparklyr}に関数が用意されているので、今回はそれを使用してローカルに環境構築します。
 今回は試しませんが、ローカルではなくAWS上などに別構築したSpark環境も利用できます。この場合、タスクノードをスポットインスタンスで構成するなどで比較的安価に大規模データ処理環境の用意が可能になります。
 また、AWS CLI(コマンドラインインターフェイス)を用いたパッケージでS3にアクセスしたりしますので、一緒にインストールします。

Spark環境設定

 前述の通り、{sparklyr}の関数でローカルにSpark/Hadoopの環境を構築します。ただし、{sparklyr}のバージョン次第で対応していないSpark/Hadoopのバージョンがありますのでご注意ください。

Spark環境設定
# {sparklyr}のインストール
# 再現できるようにrepos引数にMicrosoft R Openのスナップショットリポジトリを指定
# 特に支障がなくインストールされるはずなので、実行履歴は省略
install.packages(pkgs = "sparklyr", repos = "https://mran.microsoft.com/snapshot/2017-01-11/")


# インストールできたらパッケージ読み込み
library(sparklyr)
library(tidyverse)

# ローカル上にSparkとHadoop環境があるか確認
# まだ構築していないので、空のデータフレームが返ってくる
> sparklyr::spark_installed_versions()
[1] spark  hadoop dir   
<0> (または長さ 0row.names) 

# {sparklyr}で用意されている関数でインストールできるバージョンの候補を表示
# ただし、インストールできるバージョンでも{sparklyr}では利用できないバージョンも表示される
> sparklyr::spark_available_versions()
spark hadoop                                                   install
5  1.6.2    2.6  spark_install(version = "1.6.2", hadoop_version = "2.6")
6  1.6.2    2.4  spark_install(version = "1.6.2", hadoop_version = "2.4")
7  1.6.2    2.3  spark_install(version = "1.6.2", hadoop_version = "2.3")
8  1.6.2   cdh4 spark_install(version = "1.6.2", hadoop_version = "cdh4")
9  1.6.1    2.6  spark_install(version = "1.6.1", hadoop_version = "2.6")
10 1.6.1    2.4  spark_install(version = "1.6.1", hadoop_version = "2.4")
11 1.6.1    2.3  spark_install(version = "1.6.1", hadoop_version = "2.3")
12 1.6.1   cdh4 spark_install(version = "1.6.1", hadoop_version = "cdh4")
13 1.6.0    2.6  spark_install(version = "1.6.0", hadoop_version = "2.6")
14 1.6.0    2.4  spark_install(version = "1.6.0", hadoop_version = "2.4")
15 1.6.0    2.3  spark_install(version = "1.6.0", hadoop_version = "2.3")
16 1.6.0   cdh4 spark_install(version = "1.6.0", hadoop_version = "cdh4")
17 2.0.0    2.7  spark_install(version = "2.0.0", hadoop_version = "2.7")
18 2.0.0    2.6  spark_install(version = "2.0.0", hadoop_version = "2.6")
19 2.0.0    2.4  spark_install(version = "2.0.0", hadoop_version = "2.4")
20 2.0.0    2.3  spark_install(version = "2.0.0", hadoop_version = "2.3")
21 2.0.1    2.7  spark_install(version = "2.0.1", hadoop_version = "2.7")
22 2.0.1    2.6  spark_install(version = "2.0.1", hadoop_version = "2.6")
23 2.0.1    2.4  spark_install(version = "2.0.1", hadoop_version = "2.4")
24 2.0.1    2.3  spark_install(version = "2.0.1", hadoop_version = "2.3")
25 2.0.2    2.7  spark_install(version = "2.0.2", hadoop_version = "2.7")
26 2.0.2    2.6  spark_install(version = "2.0.2", hadoop_version = "2.6")
27 2.0.2    2.4  spark_install(version = "2.0.2", hadoop_version = "2.4")
28 2.0.2    2.3  spark_install(version = "2.0.2", hadoop_version = "2.3")
29 2.1.0    2.7  spark_install(version = "2.1.0", hadoop_version = "2.7")
30 2.1.0    2.6  spark_install(version = "2.1.0", hadoop_version = "2.6")
31 2.1.0    2.4  spark_install(version = "2.1.0", hadoop_version = "2.4")
32 2.1.0    2.3  spark_install(version = "2.1.0", hadoop_version = "2.3")

# 上記の候補のうち、「version = "2.1.0", hadoop_version = "2.7"」を指定してインストール
> sparklyr::spark_install(version = "2.1.0", hadoop_version = "2.7")
Installing Spark 2.1.0 for Hadoop 2.7 or later.
Downloading from:
  - 'https://d3kbcqa49mib13.cloudfront.net/spark-2.1.0-bin-hadoop2.7.tgz'
Installing to:
  - '~/Library/Caches/spark/spark-2.1.0-bin-hadoop2.7'
URL 'https://d3kbcqa49mib13.cloudfront.net/spark-2.1.0-bin-hadoop2.7.tgz' を試しています 
Content type 'application/x-tar' length 195636829 bytes (186.6 MB)
==================================================
  downloaded 186.6 MB

Installation complete.


# ローカル上にSparkとHadoop環境があるか、再度確認
# インストールされたバージョンからなるデータフレームが返ってくる
> sparklyr::spark_installed_versions()
spark hadoop                       dir
1 2.1.0    2.7 spark-2.1.0-bin-hadoop2.7

# 試しに接続すると現バージョン(0.5.1)ではSpark 2.1.0には対応していない
> sc <- sparklyr::spark_connect(master = "local")
sparklyr does not currently support Spark version: 2.1.0

# インストールしたバージョンを削除
> sparklyr::spark_uninstall(version = "2.1.0", hadoop_version = "2.7")
spark-2.1.0-bin-hadoop2.7 successfully uninstalled.

# 改めて「version = "2.1.0", hadoop_version = "2.7"」を指定してインストール
> sparklyr::spark_install(version = "2.0.2", hadoop_version = "2.7")
Installing Spark 2.0.2 for Hadoop 2.7 or later.
Downloading from:
  - 'https://d3kbcqa49mib13.cloudfront.net/spark-2.0.2-bin-hadoop2.7.tgz'
Installing to:
  - '~/Library/Caches/spark/spark-2.0.2-bin-hadoop2.7'
URL 'https://d3kbcqa49mib13.cloudfront.net/spark-2.0.2-bin-hadoop2.7.tgz' を試しています 
Content type 'application/x-tar' length 187426587 bytes (178.7 MB)
==================================================
  downloaded 178.7 MB

Installation complete.


# 確認
> sparklyr::spark_installed_versions()
spark hadoop                       dir
1 2.0.2    2.7 spark-2.0.2-bin-hadoop2.7

# 接続できても何も返さない(1系以降のRStudioでは右上のSparkパネルが切り替わる)
sc <- sparklyr::spark_connect(master = "local")
# 接続されているか確認
> sparklyr::spark_connection_is_open(sc = sc)
[1] TRUE

# 接続を切って再確認
sparklyr::spark_disconnect(sc = sc)
> sparklyr::spark_connection_is_open(sc = sc)
[1] FALSE

AWS CLIのインストール

 EC2であればAWS CLIが事前にインストールされていますが、自分のMac環境では用意されていないので別途インストールします。調べてみるとHomebrewでインストールできるようなので、今回はそちらで準備しました。

AWS_CLI
# 一部のみ表示
$ brew info awscli
awscli: stable 1.11.36 (bottled), HEAD
Official Amazon AWS command-line interface
https://aws.amazon.com/cli/

# インストール(履歴は省略)
$ brew install awscli

# インストール後にaws configureでアクセスキー、シークレットアクセスキー、デフォルトリージョンを自前のものを設定
# aws_pdatasetという名前のprofileを作って参照できるようにしておく
$ aws configure --profile aws_pdataset

AWS Public Datasetsのバケット構造を確認

 AWS Public Datasetsにあるデータセットのバケット構造を調べるため、下記のパッケージを利用してS3にアクセスして確認しました。前述のとおり、こちらのパッケージではAWS CLIを利用しています。
 
- Amazon Simple Storage Service (S3) API Client

S3バケット確認
library(aws.signature)
library(aws.s3)

# aws configureでアクセスキーなどを設定していない場合は下記のように環境変数をセットする必要がある
# セキュリティ観点から実キーは伏せています
Sys.setenv(AWS_ACCESS_KEY_ID = "***************")
Sys.setenv(AWS_SECRET_ACCESS_KEY = "***************")
Sys.setenv(AWS_DEFAULT_REGION = "us-east-1")

# profile名を指定して、aws configureで設定したアクセスキーやシークレットアクセスキーなどを読み込み
aws.signature::use_credentials(profile = "aws_pdataset")

# "1000genomes"のデータセットのバケット構造を閲覧
# ここでは"alignment_indices/2012"が付くものを15個表示
> genomes <- aws.s3::get_bucket(bucket = "1000genomes", prefix = "alignment_indices/2012", max = 15, parse_response = TRUE)
Bucket: 1000genomes 

$Contents
Key:            alignment_indices/20120522.alignment.chr20_cram.index 
LastModified:   2013-05-29T17:35:36.000Z 
ETag:           "ea7863d5db6acc3b6ca997bfa6d2c6c9" 
Size (B):       270984 
Owner:          1000genomes 
Storage class:  STANDARD 

$Contents
Key:            alignment_indices/20120522.alignment.chr20_lossy_cram.index 
LastModified:   2013-05-27T16:33:30.000Z 
ETag:           "aeae5be28807b89f67ebdfe85e2fd676" 
Size (B):       284256 
Owner:          1000genomes 
Storage class:  STANDARD 

$Contents
Key:            alignment_indices/20120522.alignment.index 
LastModified:   2013-06-03T18:15:51.000Z 
ETag:           "82d9523c2a48d18185f6ba905c306d7e" 
Size (B):       1561723 
Owner:          1000genomes 
Storage class:  STANDARD 

$Contents
Key:            alignment_indices/20120522.alignment.index.bas.gz 
LastModified:   2013-06-03T18:14:53.000Z 
ETag:           "275801bf228b1ff33664dc0d538e320f" 
Size (B):       1233947 
Owner:          1000genomes 
Storage class:  STANDARD 

$Contents
Key:            alignment_indices/20120522.alignment.mapped.binned_csra.index 
LastModified:   2014-09-02T22:15:39.000Z 
ETag:           "ab60350c0873f9527ce1347541027a8e" 
Size (B):       131623 
Owner:          1000genomes 
Storage class:  STANDARD 

$Contents
Key:            alignment_indices/20120522.exome.alignment.chr20_cram.index 
LastModified:   2013-05-29T08:43:56.000Z 
ETag:           "7cf05ef22b7d2c9b5e64bf5cb9f03916" 
Size (B):       215312 
Owner:          1000genomes 
Storage class:  STANDARD 

$Contents
Key:            alignment_indices/20120522.exome.alignment.chr20_lossy_cram.index 
LastModified:   2014-09-03T04:15:33.000Z 
ETag:           "fce87b37792304ab79e9584e224bf1b8" 
Size (B):       225944 
Owner:          1000genomes 
Storage class:  STANDARD 

$Contents
Key:            alignment_indices/20120522.exome.alignment.index 
LastModified:   2013-06-03T18:14:54.000Z 
ETag:           "83ff92283546ad85101b80cf9e04e10b" 
Size (B):       1240451 
Owner:          1000genomes 
Storage class:  STANDARD 

$Contents
Key:            alignment_indices/20120522.exome.alignment.index.HsMetrics.gz 
LastModified:   2014-09-03T03:54:02.000Z 
ETag:           "6742b3655a3cd388cc1ec3406b61f835" 
Size (B):       118484 
Owner:          1000genomes 
Storage class:  STANDARD 

$Contents
Key:            alignment_indices/20120522.exome.alignment.index.HsMetrics.stats 
LastModified:   2013-05-28T11:39:20.000Z 
ETag:           "6757ae9b582c46174b764f4efe9097a6" 
Size (B):       376 
Owner:          1000genomes 
Storage class:  STANDARD 

$Contents
Key:            alignment_indices/20120522.exome.alignment.index.bas.gz 
LastModified:   2013-06-03T18:15:57.000Z 
ETag:           "a7e71fab455a9f9634e0b4ec2c208658" 
Size (B):       1617379 
Owner:          1000genomes 
Storage class:  STANDARD 

$Contents
Key:            alignment_indices/20120522.exome.alignment.mapped.binned_csra.index 
LastModified:   2014-09-02T12:22:40.000Z 
ETag:           "3b77f700a59c1f27b0f04a05bda4fc63" 
Size (B):       104557 
Owner:          1000genomes 
Storage class:  STANDARD 

$Contents
Key:            alignment_indices/20120522_20111114.alignment_stats.exome.csv 
LastModified:   2013-06-03T18:08:02.000Z 
ETag:           "e4149d8e97a324892571fad80e0c6a6e" 
Size (B):       1344 
Owner:          1000genomes 
Storage class:  STANDARD 

$Contents
Key:            alignment_indices/20120522_20111114.alignment_stats.low_coverage.csv 
LastModified:   2013-06-03T18:08:02.000Z 
ETag:           "b99c5dc76cd28a83cee962ecb33e6235" 
Size (B):       1500 
Owner:          1000genomes 
Storage class:  STANDARD 

$Contents
Key:            alignment_indices/20121211.alignment.index 
LastModified:   2014-09-02T08:48:22.000Z 
ETag:           "9668dcf90d6d149ae612581cf404d7a1" 
Size (B):       2332675 
Owner:          1000genomes 
Storage class:  STANDARD 

 ここでは次のふたつのファイルを読み込んでみます。

  • alignment_indices/20120522.alignment.index
  • alignment_indices/20120522.alignment.index.bas.gz

 データフォーマットは下記を参考までに。
- IGSR: The International Genome Sample Resource - Data file formats

AWS Public Datasetsにあるファイルを読み込む

 {sparklyr}ではS3にあるファイルを読み込める関数が定義されていますが、デフォルトの設定のままではローカルまたはEC2からは読み込めません(EMRからだと可能?)。
 この件については、下記の通りヘルプドキュメントには記述がありません。

 『spark_read_csv {sparklyr} - R Documentation』より

You can read data from HDFS (hdfs://), S3 (s3n://), as well as the local file system (file://).

デフォルト設定のまま
sc <- sparklyr::spark_connect(master = "local")
> genome_alignment_indices <- sparklyr::spark_read_csv(
+     sc = sc, name = "alignment_indices",
+     path = "s3a://1000genomes/20120522.alignment.index",
+     infer_schema = TRUE, header = TRUE, delimiter = "\t"
+ )
 エラー: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
    at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2195)
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2654)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:381)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:379)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.immutable.List.flatMap(List.scala:344)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:379)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:149)
    at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:413)
    at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:349)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at sparklyr.Invoke$.invoke(invoke.scala:94)
    at sparklyr.StreamHandler$.handleMethodCall(stream.scala:89)
    at sparklyr.StreamHandler$.read(stream.scala:55)
    at sparklyr.BackendHandler.channelRead0(handler.scala:49)
    at sparklyr.BackendHandler.channelRead0(handler.scala:14)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
    at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)
    at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
    ... 45 more


# 接続を切る
sparklyr::spark_disconnect(sc = sc)

 これに対応するには、接続時の設定にhadoop-awsをデフォルトパッケージとして追加しておく必要があります。これをR上でするにはspark_configを使います。

設定追加
# sparklyr::spark_configでデフォルト設定ファイルを読み込み
spark_config <- sparklyr::spark_config()

# デフォルトパッケージにhadoop-awsを追加
spark_config$sparklyr.defaultPackages <- append(
  x = spark_config$sparklyr.defaultPackages, "org.apache.hadoop:hadoop-aws:2.7.3"
)
# インストール状況によってバージョンは異なりそう
> list.files(path = "~/.ivy2/cache/org.apache.hadoop/hadoop-aws/jars/")
[1] "hadoop-aws-2.7.2.jar" "hadoop-aws-2.7.3.jar"


# config引数にセットした設定を渡して再度接続
sc <- sparklyr::spark_connect(master = "local", config = spark_config)
> spark_connection_is_open(sc = sc)
[1] TRUE

# エラーがなく結果が得られる
# infer_schema引数をTRUEにしておくと各カラムの型を推定してくれる
genome_alignment_indices <- sparklyr::spark_read_csv(
  sc = sc, name = "alignment_indices", path = "s3a://1000genomes/alignment_indices/20100517.alignment.index",
  infer_schema = TRUE, header = TRUE, delimiter = ","
)
> genome_alignment_indices
Source:   query [4,424 x 6]
Database: spark connection master=local[4] app=sparklyr local=TRUE

                                                                             BAM_FILE                          BAM_MD5
                                                                                <chr>                            <chr>
1   data/HG00096/alignment/HG00096.chrom11.ILLUMINA.bwa.GBR.low_coverage.20120522.bam e2425c6f57b2aa4ddb08f472d98221d0
2   data/HG00096/alignment/HG00096.chrom20.ILLUMINA.bwa.GBR.low_coverage.20120522.bam c5cb5c7b356e95df899f7e780fa22e2b
3    data/HG00096/alignment/HG00096.mapped.ILLUMINA.bwa.GBR.low_coverage.20120522.bam 336ea55913bc261b72875bd259753046
4  data/HG00096/alignment/HG00096.unmapped.ILLUMINA.bwa.GBR.low_coverage.20120522.bam 21f4323f02d6d2a888a02bdfb24dc143
5   data/HG00103/alignment/HG00103.chrom11.ILLUMINA.bwa.GBR.low_coverage.20120522.bam 09f5199ca81b87fba667a1d60d166474
6   data/HG00103/alignment/HG00103.chrom20.ILLUMINA.bwa.GBR.low_coverage.20120522.bam 34f0cfc7d3584a64e104a1ebbc9209ab
7    data/HG00103/alignment/HG00103.mapped.ILLUMINA.bwa.GBR.low_coverage.20120522.bam a09f7acc6a736daae40f689b8c9e40c2
8  data/HG00103/alignment/HG00103.unmapped.ILLUMINA.bwa.GBR.low_coverage.20120522.bam 2d355139e1c8fe220efbf47f0e050bcf
9   data/HG00106/alignment/HG00106.chrom11.ILLUMINA.bwa.GBR.low_coverage.20120522.bam cef7bda3308f06dcb9fe494c7addbd76
10  data/HG00106/alignment/HG00106.chrom20.ILLUMINA.bwa.GBR.low_coverage.20120522.bam 81b3973ae3fa78083a9737f3bf800182
# ... with 4,414 more rows, and 4 more variables: BAI_FILE <chr>, BAI_MD5 <chr>, BAS_FILE <chr>, BAS_MD5 <chr>


# .gzファイルも対応
# infer_schema引数をFALSEにすると各カラムの型が文字列として読み込まれる
genome_alignment_indices_bas <- sparklyr::spark_read_csv(
  sc = sc, name = "alignment_indices_bas", path = "s3a://1000genomes/alignment_indices/20120522.alignment.index.bas.gz",
  infer_schema = FALSE, header = TRUE, delimiter = "\t"
)
> genome_alignment_indices_bas
Source:   query [2.78e+04 x 21]
Database: spark connection master=local[4] app=sparklyr local=TRUE

                                              bam_filename                              md5     study  sample platform
                                                     <chr>                            <chr>     <chr>   <chr>    <chr>
1   HG00096.chrom11.ILLUMINA.bwa.GBR.low_coverage.20120522 e2425c6f57b2aa4ddb08f472d98221d0 SRP001294 HG00096 ILLUMINA
2   HG00096.chrom11.ILLUMINA.bwa.GBR.low_coverage.20120522 e2425c6f57b2aa4ddb08f472d98221d0 SRP001294 HG00096 ILLUMINA
3   HG00096.chrom11.ILLUMINA.bwa.GBR.low_coverage.20120522 e2425c6f57b2aa4ddb08f472d98221d0 SRP001294 HG00096 ILLUMINA
4   HG00096.chrom20.ILLUMINA.bwa.GBR.low_coverage.20120522 c5cb5c7b356e95df899f7e780fa22e2b SRP001294 HG00096 ILLUMINA
5   HG00096.chrom20.ILLUMINA.bwa.GBR.low_coverage.20120522 c5cb5c7b356e95df899f7e780fa22e2b SRP001294 HG00096 ILLUMINA
6   HG00096.chrom20.ILLUMINA.bwa.GBR.low_coverage.20120522 c5cb5c7b356e95df899f7e780fa22e2b SRP001294 HG00096 ILLUMINA
7    HG00096.mapped.ILLUMINA.bwa.GBR.low_coverage.20120522 336ea55913bc261b72875bd259753046 SRP001294 HG00096 ILLUMINA
8    HG00096.mapped.ILLUMINA.bwa.GBR.low_coverage.20120522 336ea55913bc261b72875bd259753046 SRP001294 HG00096 ILLUMINA
9    HG00096.mapped.ILLUMINA.bwa.GBR.low_coverage.20120522 336ea55913bc261b72875bd259753046 SRP001294 HG00096 ILLUMINA
10 HG00096.unmapped.ILLUMINA.bwa.GBR.low_coverage.20120522 21f4323f02d6d2a888a02bdfb24dc143 SRP001294 HG00096 ILLUMINA
# ... with 2.779e+04 more rows, and 16 more variables: library <chr>, readgroup <chr>, `_total_bases` <chr>,
#   `_mapped_bases` <chr>, `_total_reads` <chr>, `_mapped_reads` <chr>, `_mapped_reads_paired_in_sequencing` <chr>,
#   `_mapped_reads_properly_paired` <chr>, `_of_mismatched_bases` <chr>, average_quality_of_mapped_bases <chr>,
#   mean_insert_size <chr>, insert_size_sd <chr>, median_insert_size <chr>, insert_size_median_absolute_deviation <chr>,
#   `_duplicate_reads` <chr>, `_duplicate_bases` <chr>


# columns引数に名前付きベクトルを与えることで各カラムの型と名前を指定できる
# RとSpark間の型の違いは下記のドキュメントを
# http://spark.apache.org/docs/latest/sparkr.html#data-type-mapping-between-r-and-spark
cols <- c(
  rep(x = "string", length = 7), rep(x = "double", length = 7), rep(x = "float", length = 5), 
  "integer", "string"
)
names(x = cols) <- colnames(x = genome_alignment_indices_bas)

# 警告メッセージについては未調査
genome_alignment_indices_bas_cols <- sparklyr::spark_read_csv(
  sc = sc, name = "alignment_indices_bas_cols", path = "s3a://1000genomes/alignment_indices/20120522.alignment.index.bas.gz",
  infer_schema = FALSE, header = TRUE, delimiter = "\t",
  columns = cols
)
 警告メッセージ: 
 spark_csv_read(sc, spark_normalize_path(path), options, columns): 
  Dataset has 1 columns but 'columns' has length 21

> genome_alignment_indices_bas_cols
Source:   query [2.78e+04 x 21]
Database: spark connection master=local[4] app=sparklyr local=TRUE

                                              bam_filename                              md5     study  sample platform    library readgroup
                                                     <chr>                            <chr>     <chr>   <chr>    <chr>      <chr>     <chr>
1   HG00096.chrom11.ILLUMINA.bwa.GBR.low_coverage.20120522 e2425c6f57b2aa4ddb08f472d98221d0 SRP001294 HG00096 ILLUMINA 2845856850 SRR062634
2   HG00096.chrom11.ILLUMINA.bwa.GBR.low_coverage.20120522 e2425c6f57b2aa4ddb08f472d98221d0 SRP001294 HG00096 ILLUMINA 2845856850 SRR062635
3   HG00096.chrom11.ILLUMINA.bwa.GBR.low_coverage.20120522 e2425c6f57b2aa4ddb08f472d98221d0 SRP001294 HG00096 ILLUMINA 2845856850 SRR062641
4   HG00096.chrom20.ILLUMINA.bwa.GBR.low_coverage.20120522 c5cb5c7b356e95df899f7e780fa22e2b SRP001294 HG00096 ILLUMINA 2845856850 SRR062634
5   HG00096.chrom20.ILLUMINA.bwa.GBR.low_coverage.20120522 c5cb5c7b356e95df899f7e780fa22e2b SRP001294 HG00096 ILLUMINA 2845856850 SRR062635
6   HG00096.chrom20.ILLUMINA.bwa.GBR.low_coverage.20120522 c5cb5c7b356e95df899f7e780fa22e2b SRP001294 HG00096 ILLUMINA 2845856850 SRR062641
7    HG00096.mapped.ILLUMINA.bwa.GBR.low_coverage.20120522 336ea55913bc261b72875bd259753046 SRP001294 HG00096 ILLUMINA 2845856850 SRR062634
8    HG00096.mapped.ILLUMINA.bwa.GBR.low_coverage.20120522 336ea55913bc261b72875bd259753046 SRP001294 HG00096 ILLUMINA 2845856850 SRR062635
9    HG00096.mapped.ILLUMINA.bwa.GBR.low_coverage.20120522 336ea55913bc261b72875bd259753046 SRP001294 HG00096 ILLUMINA 2845856850 SRR062641
10 HG00096.unmapped.ILLUMINA.bwa.GBR.low_coverage.20120522 21f4323f02d6d2a888a02bdfb24dc143 SRP001294 HG00096 ILLUMINA 2845856850 SRR062634
# ... with 2.779e+04 more rows, and 14 more variables: `_total_bases` <dbl>, `_mapped_bases` <dbl>, `_total_reads` <dbl>, `_mapped_reads` <dbl>,
#   `_mapped_reads_paired_in_sequencing` <dbl>, `_mapped_reads_properly_paired` <dbl>, `_of_mismatched_bases` <dbl>,
#   average_quality_of_mapped_bases <dbl>, mean_insert_size <dbl>, insert_size_sd <dbl>, median_insert_size <dbl>,
#   insert_size_median_absolute_deviation <dbl>, `_duplicate_reads` <int>, `_duplicate_bases` <chr>


# `$`による参照はできないので注意
> genome_alignment_indices_bas_cols$bam_filename
NULL

# selectで参照
> genome_alignment_indices_bas_cols %>% 
+   dplyr::select(bam_filename) %>% 
+   head(n = 10)
Source:   query [10 x 1]
Database: spark connection master=local[4] app=sparklyr local=TRUE

                                              bam_filename
                                                     <chr>
1   HG00096.chrom11.ILLUMINA.bwa.GBR.low_coverage.20120522
2   HG00096.chrom11.ILLUMINA.bwa.GBR.low_coverage.20120522
3   HG00096.chrom11.ILLUMINA.bwa.GBR.low_coverage.20120522
4   HG00096.chrom20.ILLUMINA.bwa.GBR.low_coverage.20120522
5   HG00096.chrom20.ILLUMINA.bwa.GBR.low_coverage.20120522
6   HG00096.chrom20.ILLUMINA.bwa.GBR.low_coverage.20120522
7    HG00096.mapped.ILLUMINA.bwa.GBR.low_coverage.20120522
8    HG00096.mapped.ILLUMINA.bwa.GBR.low_coverage.20120522
9    HG00096.mapped.ILLUMINA.bwa.GBR.low_coverage.20120522
10 HG00096.unmapped.ILLUMINA.bwa.GBR.low_coverage.20120522


# とりあえず集計してみる
> genome_summarize <- genome_alignment_indices_bas_cols %>% 
  dplyr::group_by(readgroup) %>% 
  dplyr::summarize(
    total_bases = sum(`_total_bases`),
    mapped_bases = sum(`_mapped_bases`),
    total_reads = sum(`_total_reads`),
    mapped_reads = sum(`_mapped_reads`)
  ) %>% 
  print
Source:   query [6,949 x 5]
Database: spark connection master=local[4] app=sparklyr local=TRUE

   readgroup total_bases mapped_bases total_reads mapped_reads
       <chr>       <dbl>        <dbl>       <dbl>        <dbl>
1  ERR015528  6983935452   5952262482    64666069     62451949
2  ERR020229 19111863316  16893939598   210020476    199362355
3  ERR013151  4090595256   3654598436    37875882     36786031
4  ERR015759  1341103068   1249066562    12417621     11811715
5  ERR019905  2264878476   2181465658    20971097     20421069
6  ERR013138  6858917784   6220244192    63508498     60258668
7  SRR037779  2251299740   1995203950    29622365     27061710
8  SRR037781  2203411836   1942146896    28992261     26343528
9  SRR043409  4294927960   3987005465    59268712     57122507
10 SRR044230  5247820064   4803974017    69050264     66265436
# ... with 6,939 more rows

# とりあえずPCA
> genome_summarize %>% 
+   sparklyr::ml_pca(features = c("total_bases", "mapped_bases", "total_reads", "mapped_reads"))
* No rows dropped by 'na.omit' call
Explained variance:

         PC1          PC2          PC3          PC4 
9.977596e-01 2.239423e-03 9.743283e-07 1.377467e-08 

Rotation:
                      PC1          PC2           PC3          PC4
total_bases  -0.732068499  0.681081744  0.0140674519  0.002340459
mapped_bases -0.681151089 -0.732137051  0.0001951216 -0.002914359
total_reads  -0.007463459  0.009455473 -0.7330409740 -0.680077805
mapped_reads -0.007294029  0.003686524 -0.6800389689  0.733130416

# とりあえず、K-means
> genome_cls <- genome_summarize %>% 
+   sparklyr::ml_kmeans(centers = 4, features = c("total_bases", "mapped_bases", "total_reads", "mapped_reads"))
* No rows dropped by 'na.omit' call
> table(fitted(object = genome_cls))

   0    1    2    3 
4801   72 1815  261 

 ゲノムの知識もデータソースの素性もよくわからないので、分析は先送り。

まとめ

 {sparklyr}のspark_read_csvを用いることで、S3バケット上にあるデータを読み込めました。これにより、S3バケットにあるファイルをお手軽に分析できるようになりますし、データ検証のためにRedshiftにコピーとかダウンロードしてファイルを開くとか不要になります。
 Amazon Athenaがどれくらい便利か判断しかねますが、RStudio上で処理できますのでRおじさんはますます捗りますね。
 ちなみに今回と同じことは{sparkr}のread.dfでも可能です。どちらのパッケージがいいか悩むかもしれませんが、お好みでとしか言えません。

 また、Java関係のライブラリをRで扱う場合によくある話ですが、使えるメモリサイズはあらかじめ設定しておきましょう。

Sparkメモリ周りの設定
# SPARK_MEMは設定しなくてもいい?
# Sys.setenv("SPARK_MEM" = "64g")
spark_conf <- sparklyr::spark_config()
spark_conf$spark.executor.cores <- 2
spark_conf$spark.executor.memory <- "4G"
spark_conf$spark.driver.memory <- "12G"

参考

実行環境

実行環境
> devtools::session_info()
Session info -----------------------------------------------------------------------------------------------------------------
 setting  value                       
 version  R version 3.3.2 (2016-10-31)
 system   x86_64, darwin15.6.0        
 ui       RStudio (1.0.136)           
 language (EN)                        
 collate  ja_JP.UTF-8                 
 tz       Asia/Tokyo                  
 date     2017-01-19                  

Packages ---------------------------------------------------------------------------------------------------------------------
 package       * version    date       source                            
 assertthat      0.1        2013-12-06 CRAN (R 3.3.2)                    
 aws.s3        * 0.1.34     2017-01-11 local                             
 aws.signature * 0.2.6      2017-01-11 local                             
 backports       1.0.4      2016-10-24 CRAN (R 3.3.2)                    
 base64enc       0.1-3      2015-07-28 CRAN (R 3.3.2)                    
 colorspace      1.3-2      2016-12-14 CRAN (R 3.3.2)                    
 config          0.2        2016-08-02 cran (@0.2)                       
 curl            2.3        2016-11-24 cran (@2.3)                       
 DBI             0.5-1      2016-09-10 CRAN (R 3.3.2)                    
 devtools        1.12.0     2016-12-05 CRAN (R 3.3.2)                    
 digest          0.6.10     2016-08-02 CRAN (R 3.3.2)                    
 dplyr         * 0.5.0      2016-06-24 CRAN (R 3.3.2)                    
 ggfortify       0.3.0.9000 2016-12-31 Github (sinhrks/ggfortify@907a044)
 ggplot2       * 2.2.0      2016-11-11 CRAN (R 3.3.2)                    
 gridExtra       2.2.1      2016-02-29 CRAN (R 3.3.2)                    
 gtable          0.2.0      2016-02-26 CRAN (R 3.3.2)                    
 httr            1.2.1      2016-07-03 CRAN (R 3.3.2)                    
 jsonlite        1.1        2016-09-14 CRAN (R 3.3.2)                    
 lazyeval        0.2.0      2016-06-12 CRAN (R 3.3.2)                    
 magrittr        1.5        2014-11-22 CRAN (R 3.3.2)                    
 memoise         1.0.0      2016-01-29 CRAN (R 3.3.2)                    
 munsell         0.4.3      2016-02-13 CRAN (R 3.3.2)                    
 plyr            1.8.4      2016-06-08 CRAN (R 3.3.2)                    
 purrr         * 0.2.2      2016-06-18 CRAN (R 3.3.2)                    
 R6              2.2.0      2016-10-05 CRAN (R 3.3.2)                    
 rappdirs        0.3.1      2016-03-28 CRAN (R 3.3.2)                    
 Rcpp            0.12.8     2016-11-17 CRAN (R 3.3.2)                    
 readr         * 1.0.0      2016-08-03 CRAN (R 3.3.2)                    
 RevoUtils       10.0.2     2016-11-22 local                             
 rprojroot       1.1        2016-10-29 CRAN (R 3.3.2)                    
 scales          0.4.1      2016-11-09 CRAN (R 3.3.2)                    
 sparklyr        0.5.1      2016-12-19 CRAN (R 3.3.2)                    
 tibble        * 1.2        2016-08-26 CRAN (R 3.3.2)                    
 tidyr         * 0.6.0      2016-08-12 CRAN (R 3.3.2)                    
 tidyverse     * 1.0.0      2016-09-09 CRAN (R 3.3.2)                    
 withr           1.0.2      2016-06-20 CRAN (R 3.3.2)                    
 xml2            1.1.0      2017-01-07 CRAN (R 3.3.2)                    
 yaml            2.1.14     2016-11-12 CRAN (R 3.3.2)