LoginSignup
10
12

More than 3 years have passed since last update.

PythonからAzure Comos DBのデータ操作

Last updated at Posted at 2019-05-01

1. はじめに

PythonからAzure Cosmos DBのデータ操作について確認をしたので備忘録としてまとめておきたいと思います。(はじまりはいつも同じような文言ですが...)

最近サーバーレスな構成(AWS AppSync+Lambda+DynamoDB)に関わったのですが、「ではAzureだとどうするのかな?」と気になったので調べてみました。
(Azureでサーバーレス構成を実現するには、Azure Functions(HttpTrigger/Graphene) + CosmosDBの2つ構成が必要になってくるのかなと思います。)

今回はまずはAzure CosmosDBのデータ操作に関して実装していきたいと思います。

Azure CosmosDBに関しては公式ドキュメントに概要・詳細ともに充実しているようです。
またAzure Cosmos DB:Azure Cosmos DB SQL API アカウントを使用して Python アプリケーションを構築するという公式ドキュメントには設定・実装方法・チュートリアルが用意されているので、こちらを参照しながら理解を深めていくと良いかなと思います。

2. Azure Cosmos DBのアカウント作成

Azure Cosmos DBを利用するためには、まずAzure Cosmos DBアカウントの作成が必要になります。
Azure Cosmos DBアカウントは(乱暴な言い方をすると)一種のDBクラスターのようなもので、アカウントには複数のDBと複数のコンテナ(テーブル)を作成することができます。
(VirtualNetworkの指定や接続のためのエンドポイントURIとキーなどの接続情報もあります。)

APIの種類はコア(SQL)にしています。他にもMongDB用API,Cassandra,Azureテーブル,Gremlin(グラフ)といったAPIが用意されていました。
(CosmosClientのQueryItemsのクエリをどのAPI種別で実行するかというものだと思います)

Azure Cosmos DBアカウントはAzurePortalかARMテンプレートから展開できるようですが、今回はAzurePortalを利用します。

1.Azure Cosmos DBを選択、追加を押下し、基本設定を開きます。展開先リソースグループやアカウント名、APIなどを指定します。アカウント名はグローバルで一意の必要があるので他と被らないようにします。またAPIは今回コア(SQL)にしました。
cosmosdb01.png

2.Cosmos DBアカウントを展開する仮想ネットワークとファイアウォールの設定を行います。
ファイアウォールの設定として開発環境(会社や自宅など)からもアクセスできるよう、Allow access from my IPはAllowにしておきます。
cosmosdb02.png
#ここでの仮想ネットワークの指定はCosmosDBへの接続を許可するネットワークのようです。

3.タグの設定が出ますが今回は指定なしで進みます。

4.最後に確認画面が表示されるので、「作成」を押下します。
5分ほどでデプロイが完了します。
cosmosdb03.png

  1. Azure Functionsなどのサービスから接続できるようファイアウォールの構成を変更します。 CosmosDBの「ファイアウォールと仮想ネットワーク」にて、「パブリック Azure データセンター内からの接続を受け入れる」をチェックし保存します。 cosmosdb05.png

3. Python実行環境の用意

今後Azure Functionsを試していくので、CosmosDBのデータ操作を行うPythonプログラムをPython3.6仮想環境で動かしたいと思います。

はじめに仮想環境を作成してアクティブにします。

python3.6 -m venv .env
source .env/bin/activate

続いて、今回利用するPythonモジュール(azure-cosmos)をインストールします。

pip3.6 install azure-cosmos

4. PythonでのCosmosDBデータ操作

4.1. サンプルの実行

今回用意したコードはこちらにおいています。
#こちらで説明しているコードはちょいちょい更新をかけているのでGitHubにあげている方が最新になります。

cosmosdb/config.pyに環境情報を用意しており、CosmosDBのURIとプライマリキーを記載します。
(設定内容はAzurePortal->CosmosDB->キーからURIとプライマリキーで確認)

git clone https://github.com/moyota/python-cosmosdb.git
cd python-cosmosdb
python3.6 -m venv .env
source .env/bin/activate
pip install -r requirements.txt
vi cosmosdb/config.py
python -m Program

他にも公式ドキュメントGitHub(Azure/azure-cosmos-python)もあるので、こちらも参考になります。

4.2. Cosmos DBへのデータ操作関連の実装

Azure SDK for Python(azure.cosmos関連)のドキュメントを参考にしながら実装していきます。

DB定義とコンテナ(テーブル定義)を行うinitialize_xxxメソッドと、項目(レコード)のCRUD操作を行うxxx_itemを用意しています。(DB接続はコンストラクタ内で実施)

cosmosdb.py
import azure.cosmos.cosmos_client as cosmos_client
import azure.cosmos.errors as errors
from .config import config
from logging import getLogger
logger = getLogger(__name__)

class DatabaseConnection():
    def __init__(self):
        # Initialize the Cosmos client
        self.client = cosmos_client.CosmosClient(url_connection=config['ENDPOINT'],
            auth={'masterKey': config['PRIMARYKEY']})

        # Read a database
        self.database_link = 'dbs/' + config['DATABASE']

        # Read a container
        self.container_link = self.database_link + '/colls/{0}'.format(config['CONTAINER'])


    def get_options(self):
        options = {
            'enableCrossPartitionQuery': True,
            'maxItemCount': 5,
        }
        return options


    # Create a database
    def initialize_database(self, database_id=config['DATABASE']):
        try:        
            return self.client.CreateDatabase({'id': database_id})
            # database_link = self.database_link
            # database_link = db['_self']
        except errors.HTTPFailure as e:
            if e.status_code == 409:
               print('A database with id \'{0}\' already exists'.format(self.database_link))
            else:
                raise errors.HTTPFailure(e.status_code)
        except Exception as e:
            raise e


    # Create a container
    def initialize_container(self, database_id=config['DATABASE'], container_id=config['CONTAINER']):
        try:
            database_link = 'dbs/' + database_id
            container_definition = {
                'id': container_id,
                "indexingPolicy": {
                    "indexingMode": "consistent", # consistent or lazy
                    "automatic": True,
                },
                "partitionKey": {
                    "paths": [
                      "/partitionKey"
                    ],
                    "kind": "Hash",
                }
            }
            return self.client.CreateContainer(database_link, container_definition, {'offerThroughput': 400})
        except errors.CosmosError as e:
            if e.status_code == 409:
                logger.error('A collection with id \'{0}\' already exists'.format(self.container_link))
            else:
                raise errors.HTTPFailure(e.status_code) 
        except Exception as e:
            raise e


    # Create and add a item to the container
    def create_item(self, item):
        query = {'query': 'SELECT * FROM c WHERE c.id = "{0}"'.format(item["id"])}

        try:
            self.client.CreateItem(self.container_link, item)
            results = list(self.client.QueryItems(self.container_link, query, self.get_options()))
            for item in results:
                logger.info(item)
            return results
        except errors.HTTPFailure as e:
            if e.status_code == 404:
                print('A collection with id \'{0}\' does not exist'.format(self.container_link))
            elif e.status_code == 409:
                logger.error('A Item with id \'{0}\' already exists'.format(item['id']))            
            else: 
                raise errors.HTTPFailure(e.status_code)
        except Exception as e:
            raise e


    # Delete a item from the container
    def delete_item(self, item):
        query = {'query': 'SELECT * FROM c WHERE c.id = "{0}"'.format(item["id"])}

        try:
            results = self.client.QueryItems(self.container_link, query, self.get_options())
            for item in list(results):
                logger.info(item)
                options = self.get_options()
                options['partitionKey'] = item['partitionKey']
                self.client.DeleteItem(item['_self'], options)
            return results
        except errors.HTTPFailure as e:
            if e.status_code == 404:
                print('A collection with id \'{0}\' does not exist'.format(self.container_link))
            else:
                raise errors.HTTPFailure(e.status_code)
        except Exception as e:
            raise e


    # Upsert a item from the container
    def upsert_item(self, item):
        try:
            result = self.client.UpsertItem(self.container_link, item, self.get_options())
            logger.info(result)
            return result
        except errors.HTTPFailure as e:
            if e.status_code == 404:
                print('A collection with id \'{0}\' does not exist'.format(self.container_link))
            else:
                raise errors.HTTPFailure(e.status_code)
        except Exception as e:
            raise e


    def read_item(self, id):
        query = {'query': 'SELECT * FROM c WHERE c.id = "{0}"'.format(id)}

        try:
            results = list(self.client.QueryItems(self.container_link, query, self.get_options()))
            return results
        except errors.HTTPFailure as e:
            if e.status_code == 404:
                print('A collection with id \'{0}\' does not exist'.format(self.container_link))
            else:
                raise errors.HTTPFailure(e.status_code)
        except Exception as e:
raise e


    def read_items(self):
        try:
            itemList = list(self.client.ReadItems(self.container_link, {'maxItemCount': 10}))

            logger.info('Found {0} documents'.format(itemList.__len__()))

            for item in itemList:
                logger.info(item)
            return itemList
        except errors.HTTPFailure as e:
            if e.status_code == 404:
                print('A collection with id \'{0}\' does not exist'.format(self.container_link))
            else:
                raise errors.HTTPFailure(e.status_code)
        except Exception as e:
            raise e


# Sample Data1
def getReplacedItem(id):
    return {
        'id': 'id{0}'.format(id),
        'partitionKey': id,
        'message': 'Hello World CosmosDB!',
        'addition': 'test replace {0}'.format(id),
    }
# Sample Data2
def getItem(id):
    return {
        'id': 'id{0}'.format(id),
        'partitionKey': id,
        'message': 'Hello World CosmosDB!',
    }

コンテナ定義(CreateContainer)では、定義時にパーティションキーやインデックスポリシー,スループット(RU)の指定も可能です。
(一意キーポリシーなどもできるようですが要確認)

またCosmosClientのCreateItem/DeleteItem/UpsertItem/ReadItemsメソッドはコンテナリンクの指定が必要ですが、'dbs/データベース名/colls/コンテナ名'かReadContainerのどちらかで取得して指定します。

UpsertItemメソッドはupdate/insertを行うメソッドで、項目が同一idの場合は値を更新、同一idでない場合には項目の新規作成となります。

Database/コンテナ/項目はすでに存在する場合は例外(errors.HTTPFailure.status_codeが409)となるので、適宜例外処理を挟んでおきます。

注意点としてcosmos_clientのDeleteItemメソッドに関しては、オプションにてpartitionKey値の指定が必要のようです。この指定がない場合以下の例外が発生します(参考)。項目にパーティションキーが指定されていない場合はupsertメソッドなりでキー値を登録しておきます。

The partition key supplied in x-ms-partitionkey header has fewer components than defined in the the collection

あとは上記用意したコードをAzure Functionsで利用していくことになりますが、今回はサンプルコードとして実行しています。
(getItemやgetReplacedItemはダミーデータを生成するために用意した関数です)

Program.py
from cosmosdb.cosmosdb import DatabaseConnection
from cosmosdb.cosmosdb import getItem, getReplacedItem

if __name__ == "__main__":
    print("---")
    dbConnection = DatabaseConnection()

    print(dbConnection.initialize_database())
    print(dbConnection.initialize_container())

    dbConnection.create_item(getItem("1"))
    dbConnection.create_item(getItem("2"))
    dbConnection.create_item(getItem("3"))
    dbConnection.upsert_item(getReplacedItem("3"))
    dbConnection.upsert_item(getReplacedItem("4"))
    dbConnection.delete_item(getItem("2"))

    print("---")
    itemList = dbConnection.read_items()
    for item in itemList:
        print(item)

改めて実行し、データエクスプローラを見ると以下のようになります。

python -m Program

cosmosdb04.png

5.まとめ

今回PythonでCosmosDBのデータ操作を用意しましたが、公式ドキュメント群が充実しているので、実際にはドキュメントを参照しながら実装していくことになると思います。
コンテナ定義やオプションのパラメータ詳細に関してはドキュメントに記載がなさそうなので手探り調べながらとなるのが注意点かなと思います。
あとは実行環境をAzure Functionsにし、GraphQLのAPIを実装していきたいと思います。

参考情報

Azure Cosmos DB のドキュメント(公式)
Azure Cosmos DB:Azure Cosmos DB SQL API アカウントを使用して Python アプリケーションを構築する
Azure Cosmos DB の Python サンプル
Azure/azure-cosmos-python(GitHub)
Azure SDK for Python(azure.cosmos.cosmos_client)

10
12
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
12