Python+requestsで、Azure StorageのREST APIにて画像を保存&取得する方法をまとめました。
通常はREST APIを使って実装するより、以下のPython用SDKを使った方がよいと思います。
https://docs.microsoft.com/ja-jp/azure/python-how-to-install
開発時の環境
Windows 10 Home
Python 3.5.2
Azure Storage
https://azure.microsoft.com/ja-jp/services/storage/
Amazon s3のように、APIを備えたストレージです。
事前にアカウントを取得しておきます。
クラス
upload→画像のアップロード
download→画像のダウンロード
です。
画像以外のデータにも使えると思いますが、実験していません。
azure_storage_sample.py
import base64
import sys
import datetime
import hmac
import hashlib
import requests
class AzureStorage:
storage_account = ""
storage_container = ""
access_key = ""
api_ver = "2014-02-14"
response = None
def __init__(self, storage_account, storage_container, access_key):
self.storage_account = storage_account
self.storage_container = storage_container
self.access_key = access_key
pass
def upload(self, filename, body, type, metadata = {}):
date = self.get_date()
size = str(len(body))
string_to_sign = [
'PUT',
'',
type,
date,
]
#meta data
ar_metadata = []
for k, v in metadata.items():
ar_metadata.append(k+":"+v)
#認証文字列作成
authorization = self.get_authorization(filename, string_to_sign, ar_metadata)
#httpヘッダ作成
http_headers = {
'x-ms-blob-type': 'BlockBlob',
'x-ms-version': self.api_ver,
'Authorization': authorization,
'Date': date,
'Content-Type': type,
'Content-Length': size,
}
# metadataを結合
for k, v in metadata.items():
http_headers[k] = v
#http通信
url = "https://%s.blob.core.windows.net/%s/%s" % (self.storage_account, self.storage_container, filename)
try:
self.response = requests.put(url, data=body, headers=http_headers)
if (self.response.status_code == 201):
#ok
return True
else:
#ng
return False
except requests.exceptions.RequestException as e:
return False
def download(self, filename):
date = self.get_date()
string_to_sign = [
'GET',
'',
'',
date,
]
#認証文字列作成
authorization = self.get_authorization(filename, string_to_sign)
#httpヘッダ作成
http_headers = {
'x-ms-blob-type': 'BlockBlob',
'x-ms-version': self.api_ver,
'Authorization': authorization,
'Date': date,
}
#http通信
url = "https://%s.blob.core.windows.net/%s/%s" % (self.storage_account, self.storage_container, filename)
try:
self.response = requests.get(url, headers=http_headers)
if (self.response.status_code == 200):
#ok
return True
else:
#ng
return False
except requests.exceptions.RequestException as e:
return False
def get_authorization(self, filename, string_to_sign, ar_metadata = None):
sign_str = "/%s/%s/%s" % (self.storage_account, self.storage_container, filename)
string_to_sign.extend(['x-ms-blob-type:BlockBlob'])
if (ar_metadata != None):
string_to_sign.extend(ar_metadata)
string_to_sign.extend(['x-ms-version:'+self.api_ver])
string_to_sign.extend([sign_str])
string_to_sign = "\n".join(string_to_sign)
signature = self.make_hash(self.access_key, string_to_sign)
authorization = "SharedKeyLite %s:%s" % (self.storage_account, signature)
return authorization
def get_date(self):
now = datetime.datetime.now()
#rfc2822_format = "%a, %d %b %Y %H:%M:%S %z" # ex: Thu, 02 Aug 2001 10:45:23 GMT
rfc2822_format = "%a, %d %b %Y %H:%M:%S" # 最後の%zが不安なので
date = now.strftime(rfc2822_format)
date = date + " GMT"
return date
def get_last_response(self):
return self.response
def encode_base64(self, data):
encoded = base64.b64encode(data)
return encoded.decode('utf-8')
def decode_base64(self, data):
return base64.b64decode(data)
def make_hash(self, key, str):
key = self.decode_base64(key)
str = str.encode('utf-8')
str_hashed = hmac.HMAC(key, str, hashlib.sha256).digest()
str_enc = self.encode_base64(str_hashed)
return str_enc
使い方(アップロード)
#認証情報
storage_account = "xxxxxx"
storage_container = "xxxxxx"
access_key = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
#アップロード
data_filename = "sample.jpg"
data_body = open("sample.jpg", "rb").read()
data_type = "image/jpeg"
metadata = {
#'x-ms-meta-m1': 'aaaaaaaa',
#'x-ms-meta-m2': 'bbbbbbbb',
}
AzureStorage = AzureStorage(storage_account, storage_container, access_key)
result = AzureStorage.upload(data_filename, data_body, data_type, metadata)
if (result == True):
print("upload ok\n")
else:
print("upload ng\n")
response = AzureStorage.get_last_response()
print(response.text)
使い方(ダウンロード)
#認証情報
storage_account = "xxxxxx"
storage_container = "xxxxxx"
access_key = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
#ダウンロード
data_filename = "sample.jpg"
AzureStorage = AzureStorage(storage_account, storage_container, access_key)
result = AzureStorage.download(data_filename)
if (result == True):
print("download ok\n")
response = AzureStorage.get_last_response()
#ダウンロードしたファイルの保存
with open("download.jpg", "wb") as f:
data = bytearray(response.content)
f.write(data)
else:
print("download ng\n")
アップロード時に任意のmetadataを付与できます。私が実験したときは、metadataを付けるとなぜかアップロードが失敗することがありました。
参考
以下を参考にさせて頂きました。
http://qiita.com/ngyuki/items/eb9b890801e49171a0c6
https://docs.microsoft.com/ja-jp/rest/api/storageservices/put-blob