Java
SOAP
Salesforce

SalesforceのSOAPとBulk APIをJavaで使う時のtips

はじめに

Salesforce Platform Advent Calendar 2017 の7日目の記事です。

今年に入ってSalesforce関連の案件をやることになったので、SOAPとBulk APIについてのtipsです。

先に結論を言うと、

になります。Salesforce周りのツールは意外とオープンソースになっているものが多いので、実践・実用的なサンプルはそちらを参考にした方がよいという印象です。

SalesforceにはどんなAPIがあるか?

Salesforceには主に次の3つのAPIが用意されています。

  • SOAP
  • REST
  • Bulk

基本のデータ構造はSOAP APIの仕様に基づいているので、SOAPの仕様をしっていると他のAPIの理解も早いです。他にはメタデータにアクセスするためのAPIなどがあります。

SOAP API

公式サイト で基本的には仕様がわかります。データ構造周りはよいのですが、ところどころ、記述が古いバージョンのままがあるのが難です。

次の2種類のWSDLがあります。これらにSalesforce上のオブジェクトが定義されており、これをもとにJavaのクライアントコードを生成することで、Javaのプログラムからアクセスできるようになります。

WSDL メリット デメリット use case
Enterprise WSDL 型安全なコードが書きやすい Salesforceにオブジェクトや項目が追加される度にクライアントコードの再作成が必要 単一のSalesforceのみとやりとり
Partner WSDL オブジェクトと項目の名前だけで、どのSalesforceに対してもアクセス可能 型付けが弱くリフレクションのようなコードになる 複数のSalesforceに対して統一的な処理をする

wscと上記のWSDLを使ってクライアントコードを生成します。SOAP APIなので、実態はXMLファイルのやりとりですが、クライアントコードを使えばその部分はほぼ気にせずに開発できます。型付けもされているので、場合によってはRESTよりも早く動くアプリを作れるでしょう。

いずれにせよ、以下のようにwscとクライアントコードを依存ライブラリに追加することで使用できます。

build.gradle
dependencies {
    compile group: 'com.force.api', name: 'force-wsc', version: '39.0.0'

    // クライアントコード
    compile files ('lib/enterprise.jar')  //Enterprise WSDL
  compile files ('lib/partner.jar')    // Partner WSDL
}

Enterprise WSDL

クライアントコードには、AccountやContactなどの標準オブジェクトと独自に作成したカスタムオブジェクトのクラスと、ログインなどの基本機能のためのものが含まれます。

Salesforceのオブジェクトクラスには、IDなどの標準項目とカスタム項目のgetter,setterも生成されています。当然、Salesforce上のテキスト型・数値型になどにあわせて、String・Doubleなどの型で生成されます。

EnterpriseSamle.java
// 生成したクライアントコード
import com.sforce.soap.enterprise.Connector;
import com.sforce.soap.enterprise.EnterpriseConnection;

// wscにあるリポジトリ
import com.sforce.ws.ConnectionException;
import com.sforce.ws.ConnectorConfig;


        //ログイン処理
        String userName = "SF_USER";
        String password = "SF_PASSWORD";
        ConnectorConfig config = new ConnectorConfig();
        config.setUsername(userName);
        config.setPassword(password);
        EnterpriseConnection connection = Connector.newConnection(config);
        // クエリの実行
        // 全てのフィールドが返却されるが、値がセットされるのはselect句で指定されたもののみ
        String soql = "select Id, FirstName, sample__c from Contact limit 5";
        QueryResult results = connection.query(soql);
        if (results.getSize() > 0) {
            for (SObject so: results.getRecords()) {
                Contact c = (Contact) so;
                // getter が用意されている
                logger.info("contact.Id:" + c.getId());
                logger.info("contact.FirstName:" + c.getFirstName());
                // カスタム項目のgetterも生成されている
                logger.info("contact.sample__c:" + c.getSample__c());
            }
        }

一見すると、Accountなどの標準オブジェクトはどの組織で生成しても同じコードになりそうですが、そうはなりません。 カスタム項目などで差異がでるからです。

質が悪いことに com.sforce.soap.enterprise というパッケージ名とオブジェクト名まで同じで生成されます。ほぼ同じなのに、Javaでは別オブジェクトとして扱う必要があり、共通処理をするためのinterfaceもないです。

Partner WSDL

コードはほとんどEnterprise WSDLと同じですが、ログインのためのConnectionオブジェクトが違います。APIのエンドポイントも、EnterpriseとPartnerでは異なっています。

Enterprise WSDL:
String authEndPoint = "https://login.salesforce.com/services/Soap/c/37.0/"
String authEndPoint = "https://community-domain/path-prefix/Soap/c/37.0/"
Partner WSDL:
String authEndPoint = "https://login.salesforce.com/services/Soap/u/37.0/"
String authEndPoint = "https://community-domain/path-prefix/Soap/u/37.0/"

(https://developer.salesforce.com/docs/atlas.en-us.api.meta/api/sforce_api_calls_login.htm)

PartnerSample.java
         // Partner WSDLのコネクション
         PartnerConnection connection = Connector.newConnection(config);
        // クエリの実行
        // 全てのフィールドが返却されるが、値がセットされるのはselect句で指定されたもののみ
        String soql = "select Id, FirstName, sample__c from Contact limit 5";
        QueryResult results = connection.query(soql);
        if (results.getSize() > 0) {
            for (SObject so: results.getRecords()) {
                // IDのgetterは用意されている
                logger.info("Id:" + so.getId());
                // フィールド名で取得する
                logger.info("FirstName:" + so.getField("FirstName"));
                // カスタム項目も同様
                logger.info("sample__c:" + so.getField("sample__c"));

                // 値はオブジェクト型
          // XMLファイルからパースするときに型付けをしていないので、実質はString  
         Object firstName = so.getField("FirstName"); 
            }
        }

見ての通り、Javaのリフレクションに似たようなコードになります。その点ではEnterprise WSDLよりも取り扱うための技術が必要になります。

Enterprise WSDLで複数組織を扱うには

以前の記事でクライアントコードの生成方法で、生成されるコードのパッケージ名のprefixを設定することで、複数組織のクライアントコードをひとつのプロジェクトにとりこめます。

java -Dpackage-prefix=XXX -jar target/force-wsc-x.y.z-uber.jar enterprise.wsdl enterprise-XXX.jar

Enterprise WSDLによる型付けの微妙なところ

  • 数値は全てDouble
  • 日付はCalendar

数値はSalesforceがDoubleでしか持っていないので、これはやむを得ないでしょう。知っていればなんということもないです。

日付がCalendarなのはつらいのひとことです... Java8から導入されたDate and Time APIと逆行するので、なにがしかで変換する必要があります。

Bulk API

SOAP APIでは1APIコールあたりのクエリの取得件数は2000件まで、登録・更新・削除は200件までの制約があります。APIコール数をおさえて大量データを処理するにはBulk APIを使う方が有利です。

dataloaderというGUIツールがSalesforceから提供されており、Bulk APIでのデータのimport,exportができます。(SOAP APIとBulk APIのどちらを使うかも設定画面から選択できます)

Bulk Queryの例

公式の一括クエリのサンプル がよくまとまっています。XMLのやりとりと対応するコードを交えての説明なので、実際に何をやっているかがわかりやすいです。

  1. ジョブの作成
  2. ジョブにバッチ(実行したいクエリ)を追加
  3. バッチのステータスが完了になるまでチェック
  4. 結果の取得

という流れになります。

BulkBasicSample.java
    // ログイン処理
    ConnectorConfig config = new ConnectorConfig();
    config.setUsername(userId);
    config.setPassword(passwd);
    config.setAuthEndpoint(soapAuthEndPoint);
    config.setRestEndpoint(bulkAuthEndPoint);
    PartnerConnection connection = new PartnerConnection(config);
    bulkConnection = new BulkConnection(config);

    // 1. ジョブの作成
    JobInfo job = new JobInfo();
    job.setObject("Account");
    job.setOperation(OperationEnum.query);
    job.setConcurrencyMode(ConcurrencyMode.Parallel);
    job.setContentType(ContentType.CSV);
    job = bulkConnection.createJob(job);

    // 2. バッチの登録
    String query =  "SELECT Name, Id FROM Account";
    BatchInfo info = null;
    ByteArrayInputStream bout = new 
 ByteArrayInputStream(query.getBytes());
    info = bulkConnection.createBatchFromStream(job, bout);

   // 3. バッチのステータスチェック
   String[] queryResults = null;

    for(int i=0; i<10000; i++) {
      Thread.sleep(30000); //30 sec
      info = bulkConnection.getBatchInfo(job.getId(), 
          info.getId());

      if (info.getState() == BatchStateEnum.Completed) {
        QueryResultList list = 
            bulkConnection.getQueryResultList(job.getId(), 
                info.getId());
        queryResults = list.getResult();
        break;
      } else if (info.getState() == BatchStateEnum.Failed) {
        System.out.println("-------------- failed ----------" 
            + info);
        break;
      } else {
        System.out.println("-------------- waiting ----------" 
            + info);
      }
    }

   // 4. 結果の取得
    if (queryResults != null) {
      for (String resultId : queryResults) {
        bulkConnection.getQueryResultStream(job.getId(), 
            info.getId(), resultId);
      }
    }

非同期処理っぽく書く例

バッチのステータスが完了チェックを、for文やwhile文でやるのは原始的すぎる印象があります。その部分を非同期処理に書くのを試みてみました。

非同期処理として扱うために、CompletableFutureで、result Idの配列の結果を受け取るようにします。定期的にステータスをチェックするのに、ScheduledExecutorServiceを利用します。ラムダ式も使って書くとすこし今風になります。

BulkSample.java
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.sforce.async.AsyncApiException;
import com.sforce.async.BatchInfo;
import com.sforce.async.BulkConnection;
import com.sforce.async.ConcurrencyMode;
import com.sforce.async.ContentType;
import com.sforce.async.JobInfo;
import com.sforce.async.OperationEnum;
import com.sforce.async.QueryResultList;
import com.sforce.ws.ConnectionException;

import util.ConnectionUtil;

/**
 * bulk APIの一括クエリのサンプル.
 * @author nakamura_jun
 *
 */
public class BulkSample {

    private static final Logger logger = LoggerFactory.getLogger(BulkSample.class);

    public static void main(String[] args) throws ConnectionException, AsyncApiException, InterruptedException, ExecutionException, IOException {
        BulkConnection connection = ConnectionUtil.createBulk();
        JobInfo job = createJob(connection);
        String query = "select Id, Name, Phone from Account";
        BatchInfo batch = createBatch(job, connection, query);

        // バッチが完了したかどうかを定期的にステータスを取得するリクエストをすることで判定
        ScheduledExecutorService checkBatchStatus = Executors.newSingleThreadScheduledExecutor();
        CompletableFuture<String[]> result = new CompletableFuture<>();
        checkBatchStatus.scheduleAtFixedRate(() -> {
            logger.info("--- checking batch status ---");
            try {
                BatchInfo info = connection.getBatchInfo(job.getId(), batch.getId());
                switch (info.getState()) {
                case Completed:
                    QueryResultList queryResults = connection.getQueryResultList(job.getId(), batch.getId());
                    result.complete(queryResults.getResult());
                    break;
                case Failed:
                    logger.warn("batch:" + batch.getId() + " failed.");
                    result.complete(new String[]{});
                    break;
                default:
                    logger.info("-- waiting --");
                    logger.info("state: " + info.getState());
                }
            } catch (AsyncApiException e) {
                result.completeExceptionally(e);
            }
        }, 1, 15, TimeUnit.SECONDS);

        result.whenComplete((results, thrown) -> {
            checkBatchStatus.shutdownNow();
            logger.info("--- batch is done. ---");
        });

        // バッチ完了後に結果を取得
        String[] resultIds = result.get();
        logger.info("--- results ---");
        for (String resultId: resultIds) {
            InputStream is = connection.getQueryResultStream(job.getId(), batch.getId(), resultId);
            // とりあえず標準出力. 実際はファイルとして保存してそれをDBに登録するなど
            try(BufferedReader br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
                String line = null;
                while((line = br.readLine()) != null) {
                    logger.info(line);
                }
            }

        }

        connection.closeJob(job.getId());
    }

    private static JobInfo createJob(BulkConnection connection) throws AsyncApiException {
        JobInfo job = new JobInfo();
        job.setObject("Account");
        job.setOperation(OperationEnum.query);
        job.setConcurrencyMode(ConcurrencyMode.Parallel);
        job.setContentType(ContentType.CSV);
        job = connection.createJob(job);
        assert job.getId() != null;
        return connection.getJobStatus(job.getId());
    }

    private static BatchInfo createBatch(JobInfo job, BulkConnection connection, String query) throws AsyncApiException {
        ByteArrayInputStream bout = new ByteArrayInputStream(query.getBytes());
        return connection.createBatchFromStream(job, bout);
    }



}

PK Chunking を使った例

クエリの結果件数が多い場合、PK Chunking という機能で、結果を分割する仕組みがあります。SalesforceのIDで、指定した件数ごとに結果を分割するという、ページングのようなことをします。

Bulkでは1ファイル1GBサイズまでを結果としてダウンロードできるので、1000万件くらいにならないと使用する機会はおそらくないでしょう。

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.sforce.async.AsyncApiException;
import com.sforce.async.BatchInfo;
import com.sforce.async.BatchInfoList;
import com.sforce.async.BulkConnection;
import com.sforce.async.ConcurrencyMode;
import com.sforce.async.ContentType;
import com.sforce.async.JobInfo;
import com.sforce.async.OperationEnum;
import com.sforce.async.QueryResultList;
import com.sforce.ws.ConnectionException;

import util.ConnectionUtil;

public class BulkChunkSample {

    private static final Logger logger = LoggerFactory.getLogger(BulkChunkSample.class);

    public static void main(String[] args) throws ConnectionException, AsyncApiException, InterruptedException, ExecutionException, IOException {
        BulkConnection connection = ConnectionUtil.createBulk();
        JobInfo job = createJob(connection);
        logger.info("jobId: " + job.getId());
        String query = "select Id, Name, Phone from Account";
        // PK-chunkが有効の場合、クエリ全体を処理するためのバッチが自動で追加されるので、最初のバッチは実行されない.
        createBatch(job, connection, query);

        // クエリ全体を処理するために追加されたバッチの情報を取得
        List<BatchInfo> batchList = getChunkedBatch(job, connection);

        logger.info("chunked batch size : " + batchList.size());

        // バッチごとにステータスチェック+結果の取得を、非同期に行う
        ExecutorService executor = Executors.newFixedThreadPool(batchList.size());
        CompletionService<CompletableFuture<ChunkBatchResult>> completionService = new ExecutorCompletionService<>(executor);
        for (BatchInfo chunkBatch: batchList) {
            completionService.submit(() -> BulkChunkSample.getResultIds(job, connection, chunkBatch));
        }

        logger.info("submitting is done.");

        // 終わったものから結果を取得
        for (int i = 0; i < batchList.size(); i++) {
            CompletableFuture<ChunkBatchResult> resultIds = completionService.take().get();

            resultIds.whenComplete((batchResult, thrown) -> {
                logger.info("--- results --- : batchId:" + batchResult.batchInfo.getId());
                // とりあえず標準出力. マルチスレッドなので、順番はでたらめになる

                for (String resultId : batchResult.resultIds) {
                    // ラムダ式内では例外スローできないので、try-catch
                    try {
                        InputStream is = connection.getQueryResultStream(job.getId(), batchResult.batchInfo.getId(), resultId);
                        try(BufferedReader br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
                            String line = null;
                            while((line = br.readLine()) != null) {
                                logger.info(line);
                            }
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    } catch (AsyncApiException e) {
                        e.printStackTrace();
                    } 
                }
                logger.info("--- end --- : batchId:" + batchResult.batchInfo.getId());
            });
        }
        // ジョブ内のバッチを終了させるのと、モニタリングのため
        connection.closeJob(job.getId());
    }

    private static JobInfo createJob(BulkConnection connection) throws AsyncApiException {
        JobInfo job = new JobInfo();
        job.setObject("Account");
        job.setOperation(OperationEnum.query);
        job.setConcurrencyMode(ConcurrencyMode.Parallel);
        job.setContentType(ContentType.CSV);
        connection.addHeader("Sforce-Enable-PKChunking", "chunksize=5"); // もとのデータがすくないため、わざと分割の単位を小さくする
        job = connection.createJob(job);
        assert job.getId() != null;
        return connection.getJobStatus(job.getId());
    }

    private static BatchInfo createBatch(JobInfo job, BulkConnection connection, String query) throws AsyncApiException {
        ByteArrayInputStream bout = new ByteArrayInputStream(query.getBytes());
        return connection.createBatchFromStream(job, bout);
    }

    private static List<BatchInfo> getChunkedBatch(JobInfo job, BulkConnection connection) throws AsyncApiException, InterruptedException {
        logger.info("--- getting chunk batch list ---");
        // PK-chunkが有効の場合、クエリ全体を処理するためのバッチが自動で追加される
        // 最初のバッチがNotProcessedになった後にそれらが実行されるので、それまで遅延させる.
        TimeUnit.SECONDS.sleep(5);
        BatchInfoList infoList = connection.getBatchInfoList(job.getId());
        List<BatchInfo> _infoList = new ArrayList<>(Arrays.asList(infoList.getBatchInfo()));
        _infoList.remove(0);
        if (_infoList.isEmpty()) {
            logger.warn("--- retrying chunk batch list ---");
            // XXX 一回だけretryさせる
            TimeUnit.SECONDS.sleep(5);
            infoList = connection.getBatchInfoList(job.getId());
            _infoList = new ArrayList<>(Arrays.asList(infoList.getBatchInfo()));
        }
        return _infoList;
    }

    private static CompletableFuture<ChunkBatchResult> getResultIds(JobInfo job, BulkConnection connection, BatchInfo chunkBatch) {
        logger.info("getting resultIds for " + chunkBatch.getId());
        ScheduledExecutorService checkBatchStatus = Executors.newSingleThreadScheduledExecutor();
        CompletableFuture<ChunkBatchResult> result = new CompletableFuture<>();
        checkBatchStatus.scheduleAtFixedRate(() -> {
            logger.info("--- checking batch status --- : " + chunkBatch.getId());
            try {
                BatchInfo info = connection.getBatchInfo(job.getId(), chunkBatch.getId());
                switch (info.getState()) {
                case Completed:
                    QueryResultList queryResults = connection.getQueryResultList(job.getId(), chunkBatch.getId());
                    result.complete(ChunkBatchResult.newInstance(chunkBatch, queryResults.getResult()));
                    break;
                case Failed:
                    logger.warn("batch:" + chunkBatch.getId() + " failed.");
                    result.complete(ChunkBatchResult.newInstance(chunkBatch, new String[]{}));
                    break;
                default:
                    logger.info("-- waiting --");
                    logger.info("state: " + info.getState());
                }
            } catch (AsyncApiException e) {
                result.completeExceptionally(e);
            }
        }, 1, 15, TimeUnit.SECONDS);

        result.whenComplete((results, thrown) -> {
            checkBatchStatus.shutdownNow();
            logger.info("--- batch is done. --- : " + chunkBatch.getId());
        });

        // バッチ完了後に結果を取得
        return result;
    }

    /**
     * 分割されたバッチごとの結果を表すクラス
     * クエリの結果を受け取るには、resultIdの配列とbatchIdが必要なため作成.
     * @author nakamura_jun
     *
     */
    private static class ChunkBatchResult {

        public final BatchInfo batchInfo;

        public final String[] resultIds;

        public static ChunkBatchResult newInstance(BatchInfo batchInfo, String[] resultIds) {
             return new ChunkBatchResult(batchInfo, resultIds);
         }

        private ChunkBatchResult(BatchInfo batchInfo, String[] resultIds) {
            this.batchInfo = batchInfo;
            this.resultIds = resultIds;
        }




    }

おまけ: force.com-apiにあるCSVReaderの使い方

com.sforce.async.CSVReader (https://github.com/forcedotcom/wsc/blob/master/src/main/java/com/sforce/async/CSVReader.java) にCSVReaderがあります。csvのライブラリを別途いれたくないなら、こちらを使うと便利です。

注意するのは、初期値で読み込みする行数や文字数に制限が設定されているので、それらを自分の状況にあわせて変更する必要があります。 Exceeded number of records : 10002 などのエラーが出た場合はそれが原因です。

private int maxSizeOfIndividualCell = 32000;
    private int maxColumnsPerRow = 5000;
    private int maxRowSizeInCharacters = 400000; //400K of characters in a row..

    //by default, giving a 10m character limit. Note that this limit is in characters. if you want to limit
    // by bytes, do it separately.
    //Call the mutator to set the limit for following.
    private int maxFileSizeInCharacters = 10000000;
    private int maxRowsInFile = 10001;

datloaderのBulk処理では、以下のように文字数と行数の制限は、Integerの最大値にしています。

BulkQueryVisitor.java
    final CSVReader rdr = new CSVReader(resultStream, Config.BULK_API_ENCODING);
    rdr.setMaxCharsInFile(Integer.MAX_VALUE);
    rdr.setMaxRowsInFile(Integer.MAX_VALUE);

参考リンク