制限付きデータトークンRDTを取得したい
解決したいこと
Amazonに入った注文の詳細情報を取得したいです。
取得のためには、RDTを取得する必要がありますが、トークンが取得できず、質問に至りました。
Requestsについて理解できていないことが問題かと考えています。
何卒、よろしくお願いいたします。
発生している問題・エラー
{'access_token': 'Atza|~~~~~~~~~~', 'refresh_token': 'Atzr|~~~~~~~~~~', 'token_type': 'bearer', 'expires_in': 3600}
400
{
"errors": [
{
"code": "InvalidInput",
"message": "Empty target resource argument found",
"details": ""
}
]
}
該当するソースコード
### https://www.hands-lab.com/tech/t12840/
import json
import urllib
import requests
# トークン取得
def xxx() -> dict :
# ApiCallに必要な情報を宣言
url = 'https://api.amazon.com/auth/o2/token'
headers = {
'Content-Type': 'application/x-www-form-urlencoded'
}
# keyの並び順とかでエラーになったりするので順番はこの順番のままがよいです
payload = {
'grant_type': 'refresh_token',
'refresh_token': 'Atzr|~~~~~~~~~~',
'client_id': 'amzn1.application-oa2-client.~~~~~~~~~~',
'client_secret': 'amzn1.oa2-cs.v1.~~~~~~~~~~'
}
# payloadは url形式指定なので変換する
# grant_type=refeash&refresh_token=xxxx&....
str = urllib.parse.urlencode(payload)
# 実際にリクエストを飛ばします
response = requests.post(url, headers=headers, data=str)
response.raise_for_status()
# response.txtは文字列でかえってくるので辞書型に変換する
token_info_dict = json.loads(response.text)
# 実際の返却はこんな感じ
# token_info_dict
# { "access_token": "Atza|IwExxx"
# "refresh_token": "Atzr|IwEBIxxxxx",
# "token_type": "bearer",
# "expires_in": 3600}
return token_info_dict
tid = xxx()
print(tid)
### 引数を渡したり以外は、公式の関数をそのまま使用しています。
### これでAutorizeton Keyを作成します, credential_info_dict
def make_get_authorization_header(method, canonical_uri, request_parameters, token):
# ************* REQUEST VALUES *************
service = 'execute-api'
host = 'sellingpartnerapi-fe.amazon.com'
region = 'us-west-2'
user_agent = "My Selling Tool/2.0 (Language=Java/1.8.0.221; Platform=Windows/10)"
access_key = '~~~~~~~~~~'
secret_key = '~~~~~~~~~~'
# Key derivation functions. See:
# http://docs.aws.amazon.com/general/latest/gr/signature-v4-examples.html#signature-v4-examples-python
def sign(key, msg):
return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()
def getSignatureKey(key, dateStamp, regionName, serviceName):
kDate = sign(('AWS4' + key).encode('utf-8'), dateStamp)
kRegion = sign(kDate, regionName)
kService = sign(kRegion, serviceName)
kSigning = sign(kService, 'aws4_request')
return kSigning
# Read AWS access key from env. variables or configuration file. Best practice is NOT
# to embed credentials in code.
if access_key is None or secret_key is None:
raise Exception('No access key is available.')
# Create a date for headers and the credential string
t = datetime.datetime.utcnow()
amzdate = t.strftime('%Y%m%dT%H%M%SZ')
# Date w/o time, used in credential scope
datestamp = t.strftime('%Y%m%d')
# ************* TASK 1: CREATE A CANONICAL REQUEST *************
# http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
canonical_querystring = request_parameters
canonical_headers = 'host:' + host + '\n' + 'x-amz-date:' + amzdate + '\n'
signed_headers = 'host;x-amz-date'
payload_hash = hashlib.sha256(('').encode('utf-8')).hexdigest()
canonical_request = method + '\n' + canonical_uri + '\n' + canonical_querystring + \
'\n' + canonical_headers + '\n' + signed_headers + '\n' + payload_hash
# ************* TASK 2: *************
algorithm = 'AWS4-HMAC-SHA256'
credential_scope = datestamp + '/' + region + \
'/' + service + '/' + 'aws4_request'
string_to_sign = algorithm + '\n' + amzdate + '\n' + credential_scope + \
'\n' + \
hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()
# ************* TASK 3: CALCULATE THE SIGNATURE *************
signing_key = getSignatureKey(secret_key, datestamp, region, service)
# Sign the string_to_sign using the signing_key
signature = hmac.new(signing_key, (string_to_sign).encode(
'utf-8'), hashlib.sha256).hexdigest()
# ************* TASK 4: ADD SIGNING INFORMATION TO THE REQUEST *************
authorization_header = algorithm + ' ' + 'Credential=' + access_key + '/' + \
credential_scope + ', ' + 'SignedHeaders=' + \
signed_headers + ', ' + 'Signature=' + signature
# x-amz-date はPostmanでは自動で付与してくれるので設定してませんが必要です
headers = {'x-amz-access-token': token, 'user-agent': user_agent, 'x-amz-date': amzdate, 'Authorization': authorization_header}
return headers
### https://www.greenwich.co.jp/blog-archives/p/26120
### https://www.hands-lab.com/tech/t12842/
import sys
import hashlib
import hmac
import datetime
import requests
import json
import time
import sys
request_parameters_unencode = {
'MarketplaceIds': 'A1VC38T7YXB528',
'CreatedAfter': '2024-04-01',
'NextToken': ''
}
request_parameters = urllib.parse.urlencode(
sorted(request_parameters_unencode.items()))
request_url = 'https://sellingpartnerapi-fe.amazon.com/tokens/2021-03-01/restrictedDataToken?' + request_parameters
params = {"restrictedResources" : [{"method": "GET","path": "/orders/v0/orders/***-*******-*******","dataElements": ["buyerInfo", "shippingAddress"]}]}
headers = make_get_authorization_header("GET", f'/orders/v0/orders/***-*******-*******', request_parameters, tid["access_token"])
response = requests.post(request_url, params=params, headers=headers)
print(response.status_code) # HTTPのステータスコード取得
print(response.text) # レスポンスのHTMLを文字列で取得
自分で試したこと
公式しか情報がなさそうで、公式より情報収集しました。
最終的にしたいことのドキュメント
https://developer-docs.amazon.com/sp-api/lang-ja_JP/docs/orders-api-v0-reference#getorderaddress
0