Help us understand the problem. What is going on with this article?

Secure Spark

More than 3 years have passed since last update.

この記事はSpark Advent Calendar 2015の記事です。

SparkのSecurity

HadoopもSparkと同じようにデータ処理のための分散フレームワークなのでsecureなデータを使って処理を行うことはままあります。Sparkはそれ単体ではストレージレイヤーを含まないものの(Tachiyonとか最近でてきましたが)そのデータを使うjobを好きなように投げることができます。通常HDFSやS3にあるデータは認証、認可を経てアクセスされるようになっていると思います。今回はそれよりも上の層のSparkのjob自体をsubmitするとき、jobの情報を参照するときの認証、認可がどのように行われているかを見てみたいと思います。

Sparkでの認証

Shared secret

最もシンプルな方法はshared secret tokenを使う方法です。shared secret tokenは予めSparkクラスタの方に登録されたtokenを使ってしかjobがsubmitできないようにする方法です。簡単なパスワードみたいなものです。

このtokenはクラスタ側ではspark.authenticate.secretというconfigurationで設定をします。

spark.authenticateは認証を有効にするかどうかを決める設定値です。この2つの設定をおこない、Standaloneクラスタを立ち上げてみます。
spark-defaults.confファイルに下記のように記載します。

spark.authenticate=true
spark.authenticate.secret=mysecret

Standaloneクラスタを立ち上げます。

$ $SPARK_HOME/sbin/start-master.sh
$ $SPARK_HOME/sbin/start-slave.sh spark://your-spark-master:7077

このStandaloneクラスタにjobをsubmitする場合には下記のようにsecretを同じように付与する必要があります。

$ ./bin/spark-shell \
    --conf spark.authenticate.secret=mysecret \
    --master spark://your-spark-master:7077 

ここでsecretを付与しなかったり、違うsecretを付与すると以下のようなエラーがでます。

$ /bin/spark-shell \
    --conf spark.authenticate.secret=wrongsecret \
    --master spark://your-spark-master:7077
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.0.0-SNAPSHOT
      /_/

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_60)
Type in expressions to have them evaluated.
Type :help for more information.
15/12/22 18:07:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/12/22 18:07:35 ERROR TransportClientFactory: Exception while bootstrapping client after 77 ms
java.lang.RuntimeException: java.lang.RuntimeException: javax.security.sasl.SaslException: DIGEST-MD5: digest response format violation. Mismatched response.
    at org.spark-project.guava.base.Throwables.propagate(Throwables.java:160)
    at org.apache.spark.network.sasl.SparkSaslServer.response(SparkSaslServer.java:121)
    at org.apache.spark.network.sasl.SaslRpcHandler.receive(SaslRpcHandler.java:101)
    at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:159)
    at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:107)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:119)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)

Servlet Filter

jobの実行状況や設定を勝手に見られて困る場合はapplicationのUIにJavaのServlet Filterをかませることができます。この設定はspark.ui.filtersでクラス名を設定することがで実現できます。Servlet Filter自体はSparkとは関係ないので自前で好きなように作ることができます。例えばBasic認証を行うFilter(ユーザ名、パスワードは決め打ちですが)をかませたい場合は以下のように書きます。

package my.application.filter

import com.sun.jersey.core.util.Base64;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.StringTokenizer;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.Filter;
import javax.servlet.FilterChain;

public class BasicAuthFilter implements Filter {
    String username = null;
    String password = null;

    @Override
    public void init(FilterConfig filterConfig) throws ServletException {
        // ユーザ名 "spark-user"
        this.username = "spark-user";
        // パスワード "spark-password"
        this.password = "spark-password";
    }

    @Override
    public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse,
                         FilterChain filterChain) throws IOException, ServletException {
        HttpServletRequest request = (HttpServletRequest)servletRequest;
        HttpServletResponse response = (HttpServletResponse)servletResponse;

        String authHeader = request.getHeader("Authorization");
        if (authHeader != null) {
            StringTokenizer st = new StringTokenizer(authHeader);
            if (st.hasMoreTokens()) {
                String basic = st.nextToken();
                if (basic.equalsIgnoreCase("Basic")) {
                    try {
                        String credentials = new String(Base64.decode(st.nextToken()), "UTF-8");
                        int pos = credentials.indexOf(":");
                        if (pos != -1) {
                            String username = credentials.substring(0, pos).trim();
                            String password = credentials.substring(pos + 1).trim();

                            if (!username.equals(this.username) || 
                                !password.equals(this.password)) {
                                // 認証されない場合は401を返す
                                unauthorized(response, "Unauthorized:" +
                                        this.getClass().getCanonicalName());
                            }

                            filterChain.doFilter(servletRequest, servletResponse);
                        } else {
                            unauthorized(response, "Unauthorized:" +
                                    this.getClass().getCanonicalName());
                        }
                    } catch (UnsupportedEncodingException e) {
                        throw new Error("Counldn't retrieve authorization information", e);
                    }
                }
            }
        } else {
            unauthorized(response, "Unauthorized:" + this.getClass().getCanonicalName());
        }
    }

    @Override
    public void destroy() {}

    private void unauthorized(HttpServletResponse response, String message) throws IOException {
        response.setHeader("WWW-Authenticate", "Basic realm=\"Spark Realm\"");
        response.sendError(401, message);
    }

}

これを自分のapplicationに組み込んでjarにまとめておきます。実際にjobをsubmitするときには以下のようにjarとともに指定します。

$ $SPARK_HOME/bin/spark-submit \
    --jars your-application.jar \
    --master spark://your-spark-master:7077 \
    --conf spark.ui.filters=my.application.filter.BasicAuthFilter

このapplicationのWeb UIにアクセスしにいくと下記のようなpromptがブラウザからでます。(ちなみにデフォルトであればmasterのUI portは8088でapplicationのUIはそこからたどれます)

Screen Shot 2015-12-22 at 6.18.43 PM.png

  • username: spark-user
  • password: spark-password

を入力すれば無事にWeb UIにたどり着けます。間違えるとたどりつけません。ここは普通のBasic認証の挙動と同じです。

Sparkでの認可

認証されたユーザごとのACLも作成することができます。これはServletのHTTPリクエストに入っているユーザ名を使います。もちろん通常は外部の認証システムがこのHTTPリクエストに対して認証されたユーザ名を設定してくれるのですが今回はめんどくさいのでそれ用のFilterを作って無理やり固定の名前で認証させます。

import javax.servlet.*;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;

public class UserRoleFilter implements Filter {
    @Override
    public void init(FilterConfig filterConfig) throws ServletException {
    }

    @Override
    public void doFilter(ServletRequest servletRequest, 
      ServletResponse servletResponse, FilterChain filterChain) 
      throws IOException, ServletException {
        HttpServletRequest request = (HttpServletRequest)servletRequest;
        String user = "spark-userA";
        List<String> userList = Arrays.asList("spark-userA", "spark-userB", "spark-userC");

        // このfilterで無理やりspark-usreAという名前で認証させたことにして
        // HTTPリクエストをWrapする
        filterChain.doFilter(new UserRoleRequestWrapper(user, userList, request), servletResponse);
    }

    @Override
    public void destroy() {

    }
}

こちらがそのリクエストのWrapperクラス。

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletRequestWrapper;
import java.util.List;

public class UserRoleRequestWrapper extends HttpServletRequestWrapper {
    String user;
    List<String> userList = null;
    HttpServletRequest request;

    public UserRoleRequestWrapper(String user, List<String> userList, HttpServletRequest originalRequest) {
        super(originalRequest);
        this.user = user;
        this.userList = userList;
        this.request = originalRequest;
    }

    @Override
    public String getRemoteUser() {
        // ユーザリストに含まれていたら"spark-userA"
        // でログインさせるというなんの意味もない認証を通す
        if (this.userList.contains(this.user)) {
            return this.user;
        }
        return null;
    }
}

これで擬似的にWeb UIにアクセスすると"spark-userA"という名前でログインしてUIを見ようとしていることになります。これらクラスを含めてapplication用のjarを作っておきます。

このユーザをACLに加えるには

  1. ACLをクラスタ側で有効にする
  2. JobにACLに"spark-userA"を加える

とします。

ACLをクラスタ側で有効にする

spark-defaults.confに下記を記載します。

spark.acls.enable=true

これで先ほどと同じようにStandalone modeでクラスタを起動します。

JobにACLに"spark-userA"を加える

$ ./bin/spark-shell --jars your-application.jar \
    --master spark://your-spark-master:7077 \
    --conf spark.ui.filters=org.apache.spark.examples.UserRoleFilter 
    --conf spark.ui.view.acls=spark-userA

これで通常と同じようにWeb UIを見ることができます。このFilterではいつもspark-userAとして認証しますのでもしspark-userBのみをACLに含めると下記のような画面が見えるはずです。

Screen Shot 2015-12-23 at 12.18.33 AM.png

これでACLに含まれているユーザのみがWeb UIを見られることが確認できました。
この他にもAdmin権限やjobのkillに関する権限などを別途管理することもできます。

Sparkでの暗号化

Sparkは部分的にSSL/TLS通信をサポートしています。現状ではSpark clusterで動くコンポーネント間の通信をプロトコルごとにSSL/TLSにすることができます。現在サポートされているのは

  • spark.ssl.akka : Akkaを使った通信
  • spark.ssl.fs : Broadcastなどに使われるHTTP

で設定できます。SSL/TLSを有効にするにはいくつかの設定が必要ですが、それらはプロトコルごとに分けることができます。例えばJavaのSASLで使うTrust Storeの設定はAkkaに対してならspark.ssl.akka.trustStore, HTTPに対してならspark.ssl.fs.trustStore, 両方のプロトコルに対してならspark.ssl.trustStoreに設定をします。

あまりSparkと直接関係なくなってくる上に、見える挙動が変わらないので面白くないのですがSSL/TLS通信を行うSparkクラスタを立ててみたいと思います。
この鍵などを準備するにはkeytoolというJDKに付属のツールを使います。

手順は以下のようにします。

  1. サーバ側用の秘密鍵を作成します
  2. 証明書署名要求(CSR)を作成します
  3. 認証局(CA)に署名された証明書を取得します
  4. 署名された証明書をtrusted storeに入れます。

今回はオレオレでもとりあえず試せればいいので認証局での署名は行いません。このあたりはもっと詳しい記事がいっぱいありますのでとりあえずSparkでの設定ができるまでは先を急ぎたいと思います。

サーバ側用の秘密鍵を用意

$ keytool -genkey \
          -alias ssltest \          # 秘密鍵の名前。これからも使うので覚えておく
          -keyalg RSA \
             # 鍵の暗号アルゴリズム
          -keysize 2048 \
          -keypass key_password \
  # 鍵のパスワード。使うので覚えておく
          -storetype JKS \
          -keystore my_key_store \-storepass store_password #  鍵を入れたkey storeのパスワード。覚えておく。
  1. は飛ばして

さっきの鍵から自分で証明書を作ります。

$ keytool -export \
          -alias ssltest \       # さっき決めた秘密鍵の名前
          -file my_cert.cer \    # exportする証明書の名前
          -keystore my_key_store # さっき鍵をいれておいてkey storeのpath

この証明書をTrusted Storeにいれます

$ keytool -import -v \
 
          -trustcacerts \
 
         -alias ssltest \

          -file my_cert.cer \         # さっきexportした証明書ファイル
 
         -keyStore my_trust_store \ # Trust Storeの名前
 
         -keypass store_password     # Trust Storeのパスワード。覚えておく

あとは必要なファイル群はできたのでこれらを設定するだけです。必要な設定値はすべて出揃っているので記述するだけです。
spark-defaults.confに下記のように設定します

# SSL/TLSを有効にするかどうか
spark.ssl.enabled                  true
# SSLで使う暗号スイート
spark.ssl.enabledAlgorithms        TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA
spark.ssl.protocol                 TLSv1.2
# 暗号化に使う秘密鍵のパスワード。最初に作成したもののパスワード
spark.ssl.keyPassword              key_password
# Key Storeへのpath
spark.ssl.keyStore                 /path/to/my_key_store
# Key Storeのパスワード
spark.ssl.keyStorePassword         store_password
# Trust Storeへのpath
spark.ssl.trustStore               /path/to/my_trust_store
# Trust Storeのパスワード
spark.ssl.trustStorePassword       store_password

これでまたStandalone modeでクラスタを起動します。下記のようなログがでていればSSL/TLS通信が有効になっているはずです。(ただしDEBUGレベルを出力できるようにlog4j.propertiesを設定してください)

15/12/06 14:01:07 DEBUG SecurityManager: SSLConfiguration for file server: SSLOptions{enabled=true, keyStore=Some(/Users/sasakikai/my_key_store), keyStorePassword=Some(xxx), trustStore=Some(/Users/sasakikai/my_trust_store), trustStorePassword=Some(xxx), protocol=Some(TLSv1.2), enabledAlgorithms=Set(TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA)}

15/12/06 14:01:07 DEBUG SecurityManager: SSLConfiguration for Akka: SSLOptions{enabled=true, keyStore=Some(/Users/sasakikai/my_key_store), keyStorePassword=Some(xxx), trustStore=Some(/Users/sasakikai/my_trust_store), trustStorePassword=Some(xxx), protocol=Some(TLSv1.2), enabledAlgorithms=Set(TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA)}

まとめ

今回の内容はすべてSecurityConfigurationに記載されているのですが、読んでもあまりよくわからなかったので自分で手を動かして調べてみました。
オレオレ認証を通すようなことをしてしまったので実用的な側面が少ないですが、それぞれの設定がどのように使われるかの理解の助けになればと思います。

Lewuathe
Java/Scala/Python/Hadoop/Presto
http://www.lewuathe.com
treasuredata
Customer Data Platformの開発・提供をしています。
https://www.treasuredata.co.jp/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away