はじめに
この記事はシスコの同志による Advent Calendar の一部として投稿しています
- 2017年版: https://qiita.com/advent-calendar/2017/cisco
- 2018年版: https://qiita.com/advent-calendar/2018/cisco
- 2019年版: https://qiita.com/advent-calendar/2019/cisco
- 2020年版: https://qiita.com/advent-calendar/2020/cisco
- 2020年版(2枚目): https://qiita.com/advent-calendar/2020/cisco2
- 2021年版: https://qiita.com/advent-calendar/2021/cisco
- 2021年半(2枚目): https://qiita.com/advent-calendar/2021/cisco2
本記事は2021年版Advent Calendarの1日目の投稿になり、私自身としては2回目の投稿となります。
Cisco DNA Centerとは
ご存じない方もいらっしゃるかと思いますので、少しだけDNA Centerについてご紹介させて頂きますが、データシートには下記のように記述があります。
「Cisco DNA Center は、ネットワークの管理、シスコへの投資の最適化、IT 支出の削減などを実現する強力なネットワークコントローラであり、管理ダッシュボードでもあります。Cisco DNA Center は、ネットワークの実行を簡素化するために、すべての基本的な管理タスクに対して単一のダッシュボードを提供します。このプラットフォームにより、IT 部門は変化や課題に迅速かつよりインテリジェントに対応できます。」
記載の通り様々な機能がありそうですが、今回ターゲットにするのは、DNA Assuranceと言う機能になります。DNA Assuranceのソリューション概要には次のように記載されております。
「詳細な情報と充実したコンテキストを組み合わせて一貫性のあるエクスペリエンスを実現し、ネットワークをプロアクティブに最適化」
もう少し具体的に書くと、従来のネットワーク管理装置のようにログを集めて可視化するだけに留まらず、収集したログやテレメトリを分析して、ネットワークや端末に発生している問題、原因、緩和策を自動で提供することが出来る非常に運用の効率化を実現するための機能が実装されています。
今まで、ユーザから問合せがあってから、MACアドレスを聞いて、複数の機器のログを分析して、原因を調査したり、毎回メーカーにエスカレーションしたりしていた運用をしているお客様からすると、上記機能が使えることで、ユーザにユーザIDを聞くだけで問題を認識しておれば、原因と緩和策まですぐに確認することが出来るようになります。
また、以下では、DNA CenterはDNACという略称で記載していきます。
DNAC利用における課題
こんな便利な機能を持つDNACではありますが、ダッシュボードを見れるのは基本的には管理者だけになっています。つまりユーザから問合せがあるたびに管理者がダッシュボードを開いて、MACアドレスやユーザIDを聞いて、問題の確認をする必要があります。欲しい情報は見るだけで分かるにも関わらず、毎度、管理者が確認する必要があるところは、なんとかならないのかという課題があります。
解決方法
流行りのチャットボットをユーザに提供して、チャットボットから自身の端末のMACアドレスやユーザ名を入力することで、DNA Assuranceが提供可能な情報を見れるようにすることができます。また、良くある問合せについては、予め辞書登録しておくことで、定型的な問合せについてはチャットボットが自動で回答することができるようになります。本機能を実装することで、管理者負荷の軽減とユーザ向けサービスレベルの向上につながることが期待できるのではないでしょうか。
※こちらの図はSecurity製品との連携も考慮したものになっています。
アプリケーションフロー
想定している基本のアプリケーションフロー概要は次の絵のようになっています。
※実際は「入力値」で、IPアドレスを入力するとセキュリティ製品にクエリが飛ぶようになっています。
プログラムソース
ここまでの機能については、社内のハッカソンイベントに参加した際に、社内の有志3名で作成しました。プログラム一式はCisco Devnet Code ExchangeとGitHubに登録されています。DNACをご利用頂いている方がいらっしゃれば、ぜひ、一度お試し頂ければと思います。
※ただし、諸事情により、チャットボットレスポンス含めて英語で表示されるものになっています。
今回の変更点
今回の記事においては、上記のコードをベースにしながら下記の変更を加えております。
- チャットボットのレスポンスを日本語メッセージに変更
- HTMLファイル自体もPython Flask上で動作するように変更
- ログイン時にDuo Web SDKを使ったなんちゃって多要素認証を実装
利用するAPI
利用するAPIは、DNAC、Umbrella、AMP for Endpoints1になりますが、使わないサービスがあれば、無視して頂ければ大丈夫です。後で提示するコードから該当箇所をコメントアウト頂くことで、処理をスキップ出来ます。
DNAC API
DNACのAPIを利用して端末の情報を入手するためには、まずDNACからTokenを入手する必要があります。TokenはユーザIDとパスワードが分かれば下記のコードで取得可能です。
import requests
url = "https://<DNAC IP address>/dna/system/api/v1/auth/token"
payload = None
headers = {
"Content-Type": "application/json",
"Authorization": "Basic <Base64 of username:password>",
"Accept": "application/json"
}
response = requests.request('POST', url, headers=headers, data = payload)
print(response.text.encode('utf8’))
{ "Token": "<Token>"}
Tokenを入手出来れば、後は入手したTokenを使ってDNACが保持している情報を入手することが出来るようになりあす。次のコードはユーザID(ここでは、testuesr)を引数にして、該当端末の情報を収集するものです。
import requests
url = "https://<DNAC IP address>/dna/intent/api/v1/client-enrichment-details"
payload = None
headers = {
"entity_type": "network_user_id",
"entity_value": "testuser",
"issueCategory": "",
"Content-Type": "application/json",
"Accept": "application/json",
"x-auth-token": "<Token>"
}
response = requests.request('GET', url, headers=headers, data = payload)
print(response.text.encode('utf8'))
Client_Enrichment_Detailsのレスポンスのサンプル
[
{
"userDetails": {
"id": "70:F3:5A:7F:18:10",
"connectionStatus": "CONNECTED",
"hostType": "WIRELESS",
"userId": "testuser",
"hostName": "AP70F3.5A7E.2C80",
"hostOs": "Sensor-Client-1800S",
"hostVersion": null,
"subType": "Unclassified",
"firmwareVersion": null,
"deviceVendor": null,
"deviceForm": null,
"salesCode": null,
"countryCode": null,
"lastUpdated": 1621580460000,
"healthScore": [
{
"healthType": "OVERALL",
"reason": "",
"score": 4
},
{
"healthType": "ONBOARDED",
"reason": "",
"score": 4
},
{
"healthType": "CONNECTED",
"reason": "",
"score": 0
}
],
"hostMac": "70:F3:5A:7F:18:10",
"hostIpV4": "192.168.101.1",
"authType": "WPA/WPA2",
"vlanId": 101,
"vnid": 0,
"ssid": "DNA-kotei",
"frequency": "2.4",
"channel": "1",
"apGroup": "default-group",
"location": "Tokyo/Tokyo Midtown/27FCPOC",
"clientConnection": "AP4800-02",
"connectedDevice": [
{
"type": null,
"name": null,
"id": "7e5ccedd-3aa8-46c1-ad3d-a97b5e4911d5",
"ip address": null,
"band": null
}
],
"issueCount": 0,
"rssi": "0.0",
"avgRssi": "0.0",
"snr": "0.0",
"avgSnr": "0.0",
"dataRate": "0.0",
"txBytes": "0.0",
"rxBytes": "0.0",
"dnsResponse": "0.0",
"dnsRequest": "0.0",
"onboarding": {
"averageRunDuration": "106.0",
"maxRunDuration": "106.0",
"averageAssocDuration": "1.0",
"maxAssocDuration": "1.0",
"averageAuthDuration": "102.0",
"maxAuthDuration": "102.0",
"averageDhcpDuration": "0.0",
"maxDhcpDuration": "0.0",
"aaaServerIp": "UNKNOWN",
"dhcpServerIp": "UNKNOWN",
"authDoneTime": 1621579921685,
"assocDoneTime": 1621579921580,
"dhcpDoneTime": 0,
"latestRootCauseList": [
"otherRootCause"
]
},
"clientType": "OLD",
"onboardingTime": 1621579921685,
"port": null,
"iosCapable": false,
"usage": 0,
"linkSpeed": 0,
"remoteEndDuplexMode": null,
"txLinkError": 0,
"rxLinkError": 0,
"txRate": 0,
"rxRate": 0,
"versionTime": null,
"dot11Protocol": "PROTOCOL_802_11_AC",
"slotId": 0
},
"connectedDevice": [
{
"deviceDetails": {
"family": "Unified AP",
"type": "Cisco 4800 Series Unified Access Points",
"location": null,
"errorCode": "null",
"macAddress": "78:72:5d:41:5c:e0",
"role": "ACCESS",
"apManagerInterfaceIp": "192.168.11.251",
"associatedWlcIp": "192.168.11.251",
"bootDateTime": null,
"collectionStatus": "Managed",
"interfaceCount": "0",
"lineCardCount": "0",
"lineCardId": "",
"managementIpAddress": "192.168.11.14",
"memorySize": "NA",
"platformId": "AIR-AP4800-Q-K9",
"reachabilityFailureReason": "NA",
"reachabilityStatus": "Reachable",
"snmpContact": "",
"snmpLocation": "default location",
"tunnelUdpPort": "16666",
"waasDeviceMode": null,
"series": "Cisco 4800 Series Unified Access Points",
"inventoryStatusDetail": "NA",
"collectionInterval": "NA",
"serialNumber": "FGL2229A7G5",
"softwareVersion": "8.8.125.0",
"roleSource": "AUTO",
"hostname": "AP4800-02",
"upTime": "262 days, 01:57:32.860",
"lastUpdateTime": 1621580065856,
"errorDescription": null,
"locationName": null,
"tagCount": "0",
"lastUpdated": "2021-05-21 06:54:25",
"instanceUuid": "7e5ccedd-3aa8-46c1-ad3d-a97b5e4911d5",
"id": "7e5ccedd-3aa8-46c1-ad3d-a97b5e4911d5",
"neighborTopology": [
{
"nodes": [
{
"role": "Client",
"name": "2.4GHz Clients",
"id": "client2.4ghz",
"description": "2.4GHz Clients",
"deviceType": null,
"platformId": null,
"family": "client2.4ghz",
"ip": null,
"softwareVersion": null,
"userId": null,
"nodeType": null,
"radioFrequency": null,
"clients": 0,
"count": null,
"healthScore": null,
"level": 2,
"fabricGroup": null,
"connectedDevice": null,
"stackType": "NORMAL",
"additionalInfo": {
"macAddress": null
}
},
{
"role": "Client",
"name": "5GHz Clients",
"id": "client5ghz",
"description": "5GHz Clients",
"deviceType": null,
"platformId": null,
"family": "client5ghz",
"ip": null,
"softwareVersion": null,
"userId": null,
"nodeType": null,
"radioFrequency": null,
"clients": 0,
"count": null,
"healthScore": null,
"level": 2,
"fabricGroup": null,
"connectedDevice": null,
"stackType": "NORMAL",
"additionalInfo": {
"macAddress": null
}
},
{
"role": "ACCESS",
"name": "AP4800-02",
"id": "7e5ccedd-3aa8-46c1-ad3d-a97b5e4911d5",
"description": "AP",
"deviceType": "Cisco 4800 Series Unified Access Points",
"platformId": "AIR-AP4800-Q-K9",
"family": "Unified AP",
"ip": "192.168.11.14",
"softwareVersion": "8.8.125.0",
"userId": null,
"nodeType": "NetworkDevice",
"radioFrequency": null,
"clients": 0,
"count": null,
"healthScore": 6,
"level": 1,
"fabricGroup": null,
"connectedDevice": null,
"stackType": "NORMAL",
"additionalInfo": {
"macAddress": "78:72:5D:41:5C:E0"
}
},
{
"role": "DISTRIBUTION",
"name": "New-Kotei-L3.cisco.com",
"id": "fdfea4ab-d1fa-466c-8d33-95563be3c29d",
"description": "SWITCH",
"deviceType": "Cisco Catalyst38xx stack-able ethernet switch",
"platformId": "WS-C3850-12X48U-E",
"family": "Switches and Hubs",
"ip": "192.168.11.254",
"softwareVersion": "16.9.1",
"userId": null,
"nodeType": "NetworkDevice",
"radioFrequency": null,
"clients": null,
"count": null,
"healthScore": 10,
"level": 0,
"fabricGroup": null,
"connectedDevice": null,
"stackType": "NORMAL",
"additionalInfo": {
"macAddress": null
}
},
{
"role": "ACCESS",
"name": "CT5520-6-kotei",
"id": "e1e8e1fd-aa64-4485-98ca-eb7348de2efd",
"description": "WLC",
"deviceType": "Cisco 5520 Series Wireless Controllers",
"platformId": "AIR-CT5520-K9",
"family": "Wireless Controller",
"ip": "192.168.11.251",
"softwareVersion": "8.8.125.0",
"userId": null,
"nodeType": "NetworkDevice",
"radioFrequency": null,
"clients": null,
"count": null,
"healthScore": 10,
"level": 0,
"fabricGroup": null,
"connectedDevice": "fdfea4ab-d1fa-466c-8d33-95563be3c29d",
"stackType": "NORMAL",
"additionalInfo": {
"macAddress": null
}
}
],
"links": [
{
"source": "7e5ccedd-3aa8-46c1-ad3d-a97b5e4911d5",
"linkStatus": null,
"sourceLinkStatus": null,
"targetLinkStatus": null,
"target": "client2.4ghz",
"id": "client2.4ghz",
"portUtilization": null,
"sourceInterfaceName": null,
"targetInterfaceName": null,
"sourceDuplexInfo": null,
"targetDuplexInfo": null,
"sourcePortMode": null,
"targetPortMode": null,
"sourceAdminStatus": null,
"targetAdminStatus": null,
"apRadioAdminStatus": "UP",
"apRadioOperStatus": "UP",
"sourcePortVLANInfo": null,
"targetPortVLANInfo": null
},
{
"source": "7e5ccedd-3aa8-46c1-ad3d-a97b5e4911d5",
"linkStatus": null,
"sourceLinkStatus": null,
"targetLinkStatus": null,
"target": "client5ghz",
"id": "client5ghz",
"portUtilization": null,
"sourceInterfaceName": null,
"targetInterfaceName": null,
"sourceDuplexInfo": null,
"targetDuplexInfo": null,
"sourcePortMode": null,
"targetPortMode": null,
"sourceAdminStatus": null,
"targetAdminStatus": null,
"apRadioAdminStatus": "UP",
"apRadioOperStatus": "UP",
"sourcePortVLANInfo": null,
"targetPortVLANInfo": null
},
{
"source": "7e5ccedd-3aa8-46c1-ad3d-a97b5e4911d5",
"linkStatus": null,
"sourceLinkStatus": "UP",
"targetLinkStatus": "UP",
"target": "fdfea4ab-d1fa-466c-8d33-95563be3c29d",
"id": "192.168.11.254-GigabitEthernet0",
"portUtilization": null,
"sourceInterfaceName": "GigabitEthernet0",
"targetInterfaceName": "TenGigabitEthernet1/0/42",
"sourceDuplexInfo": null,
"targetDuplexInfo": "FullDuplex",
"sourcePortMode": null,
"targetPortMode": "access",
"sourceAdminStatus": null,
"targetAdminStatus": "UP",
"apRadioAdminStatus": null,
"apRadioOperStatus": null,
"sourcePortVLANInfo": null,
"targetPortVLANInfo": "11"
},
{
"source": "fdfea4ab-d1fa-466c-8d33-95563be3c29d",
"linkStatus": null,
"sourceLinkStatus": "UP",
"targetLinkStatus": "UP",
"label": [
"checkDirect"
],
"target": "e1e8e1fd-aa64-4485-98ca-eb7348de2efd",
"id": "192.168.11.251-GigabitEthernet1/0/2",
"portUtilization": null,
"sourceInterfaceName": "GigabitEthernet1/0/2",
"targetInterfaceName": "TenGigabitEthernet0/0/1",
"sourceDuplexInfo": "FullDuplex",
"targetDuplexInfo": "AutoNegotiate",
"sourcePortMode": "trunk",
"targetPortMode": "auto",
"sourceAdminStatus": "UP",
"targetAdminStatus": "UP",
"apRadioAdminStatus": null,
"apRadioOperStatus": null,
"sourcePortVLANInfo": "ALL",
"targetPortVLANInfo": "0"
},
{
"source": "7e5ccedd-3aa8-46c1-ad3d-a97b5e4911d5",
"linkStatus": null,
"sourceLinkStatus": null,
"targetLinkStatus": null,
"label": [
"overlay"
],
"target": "e1e8e1fd-aa64-4485-98ca-eb7348de2efd",
"id": null,
"portUtilization": null,
"sourceInterfaceName": null,
"targetInterfaceName": null,
"sourceDuplexInfo": null,
"targetDuplexInfo": null,
"sourcePortMode": null,
"targetPortMode": null,
"sourceAdminStatus": null,
"targetAdminStatus": null,
"apRadioAdminStatus": null,
"apRadioOperStatus": null,
"sourcePortVLANInfo": null,
"targetPortVLANInfo": null
}
]
}
]
}
},
null
],
"issueDetails": {
"issue": [
{
"issueId": "76ab1368-a88c-4479-8ba4-afa2040a35c2",
"issueSource": "Cisco DNA",
"issueCategory": "Onboarding",
"issueName": "wireless_client_excess_auth_aaa_fail_trigger",
"issueDescription": "This client is taking longer than expected time to connect to 'DNA-kotei' SSID due to excessive authentication time<ul><li>Onboarding time took 37.5 seconds (expected time should be less than 10.0 seconds).</li><li>Association took 0 seconds (expected time should be less than 2.0 seconds)</li><li>Authentication and Key Exchange took 34.6 seconds (expected time should be less than 3.0 seconds)</li><li>IP Addressing took 0 seconds (expected time should be less than 5.0 seconds)</li><li>This client tried to onboard 2 times which took 2.9 seconds before successfully connecting.</li></ul><br>The client is taking longer to authenticate because the AAA Server initially rejected the client credentials.The client was connecting to 'DNA-kotei' SSID on 2.4 GHz radio on 'AP4800-01' AP in 'Global/Tokyo/Tokyo Midtown/27FCPOC'. The AP was connected to 'CT5520-6-kotei' WLC.",
"issueEntity": "Client",
"issueEntityValue": "70:F3:5A:7F:18:10",
"issueSeverity": "HIGH",
"issuePriority": "",
"issueSummary": "Wireless client took a long time to connect (SSID: DNA-kotei, AP: AP4800-01, Band: 2.4 GHz, WLC: CT5520-6-kotei) - Excessive time due to failed credentials",
"issueTimestamp": 1621578357637,
"deviceId": "",
"suggestedActions": [
{
"message": "Verify whether the client provided the correct credentials."
},
{
"message": "Verify if there has been any failure on the AAA Server ."
}
],
"impactedHosts": [
{
"hostType": "WIRELESS",
"hostName": "AP70F3.5A7E.2C80",
"hostOs": "Cisco-Device",
"ssid": "DNA-kotei",
"connectedInterface": "Unknown",
"macAddress": "70:F3:5A:7F:18:10",
"failedAttempts": 1,
"location": {
"siteId": "8c5e742a-d5fe-480b-97a3-d7dfbe47672c",
"siteType": "FLOOR",
"area": "Global/Tokyo",
"building": "Tokyo Midtown",
"floor": "27FCPOC",
"apsImpacted": [
"78:72:5D:41:93:A0"
]
},
"timestamp": 1621578357637
}
]
}
]
}
}
]
上記のレスポンスを開いて見て頂ければ分かると思いますが、非常にたくさんの情報が返って来ており、その中には、接続されているSSID、AP、Location、そして抱えているIssueやSuggestionedActionが返って来ています。これらの情報をチャットボットに組み込むことで、ユーザは直接自身の端末の情報をDNACから引っ張って来て確認することが出来るようになります。
DNACのAPIの詳細は下記URLを参照ください。
DNAC Authentication API
DNAC Client Enrichment Detail API
Umbrella API
本記事では詳細は記載致しませんが、フローとしては下記になります。コードとしては、後述の「メインプログラム」に含まれています。
- ダッシュボードからAPI-IDとSECRETを入手する
- 入手したAPI-IDとSECRETのBase64を使いTokenを入手する
- 入手したTokenを使いIPアドレスと悪意あるドメインのカテゴリIDを引数にして検出したDomainを入手する
Umbrella APIの詳細については下記URLを参照ください。
Umbrella Reporting API
AMP for Endpoints API
本記事では詳細は記載致しませんが、フローとしては下記になります。コードとしては、後述の「メインプログラム」に含まれています。
- ダッシュボードからAPI-IDとAPI-KEYを入手する
- 入手したAPI-IEとAPI-KEYを使いIPアドレスを引数にしてConnector_GUIDを入手する
- 入手したConnector_GUIDを引数にしてdetectionを入手する
AMP for Endpoints APIの詳細については下記URLを参照ください。
Cisco AMP for Endpoints API
利用するチャットボット
チャットボットはchatuxと言う下記に公開されているインタフェースを活用させて頂きました。HTMLファイルにスクリプトを埋め込むだけで、チャットボットが実装出来るという非常にシンプルな仕組みなのがお気に入りです。使い方も含めて記載してありますのでご参照下さい。
Github chatux-examples-ja
また、実際のチャットボットとして機能するPythonプログラムは下記の記事を参考に作成致しました。
Pythonでチャットボットを作成する
Duo Web SDK
Duo Web SDKの前に少しだけDuoについて説明します。Duoはオンプレ、クラウド問わず全てのアプリケーションに多要素認証を付加するシスコのクラウドサービスになります。昨今、フィッシング詐欺によりユーザIDとパスワードを抜かれ、不正アクセスからの情報漏洩につながるケースが多いかと思いますが、Duoはアプリケーションに多要素認証を付加することにより不正アクセスを防止することが出来るアクセスマネジメントソリューションです。
下記、URLにも記載ありますが、ITreview が掲載している数あるビジネス向け IT 製品・クラウドサービスの中で市場の認知度が高く、顧客からの満足度も非常に高い製品として「Leader」を受賞しました。
Cisco Japan Blog
そんなDuoのコンポーネントの1つであるDuo Web SDKは、SAML対応していないWebアプリケーションに、多要素認証を組み込むためのSDKになります。Duo Web SDKを使うことでWebアプリケーションのプライマリ認証後に、Duoクラウドと連携してユーザに多要素認証を促すことができるようになります。今回は、自身のDuo Web SDKの勉強も兼ねて、下記URLを参考にチャットボットを使ったDNAC活用のPythonプログラムに多要素認証を組み込むために活用しています。
Duo Web v4 SDK - Duo Universal Prompt
Github duo_universal_python
また、DuoのFree trialは下記から申し込みが可能です。今回のWeb SDK利用はFree trialの権限でも可能です。
duo_universal_python
組み込み方法としては下記になります。
ライブラリインストール
Duo Web SDKに必要なPythonライブラリである、Flask, configparser, duo_universalをpip installする。
$ python --version
Python 3.9.7
$ cd duo-chatbot
$ python3 -m venv venv
$ source vent/Scripts/activate
$ pip install flask
$ pip install configparser
$ pip install duo_universal
Duo Protect Application設定
Duoダッシュボードから、Protect ApplicationでWeb SDKを選択して必要情報(Client ID、Client secret、API hostname)をコピーして記録しておく。
duo.conf作成
duo.confにDuoのコピーした情報(Clinet ID、Client secret、API hostname)を記載する。
; Duo integration config
[duo]
client_id =<Client ID>
client_secret =<Client secret>
api_hostname =<API hostname>.duosecurity.com
redirect_uri = http://localhost:8080/duo-callback
failmode = closed
Duoコードの追記
プログラムに追加していくDuoのコードは下記となります。追記するコードについては下記URLを参照下さい。
Github duo_universal_python
設定ファイルの読み込みのコード
config = configparser.ConfigParser()
config.read("duo.conf")
try:
duo_client = Client(
client_id=config['duo']['client_id'],
client_secret=config['duo']['client_secret'],
host=config['duo']['api_hostname'],
redirect_uri=config['duo']['redirect_uri'],
duo_certs=config['duo'].get('duo_certs', None),
)
except DuoException as e:
print("*** Duo config error. Verify the values in duo.conf are correct ***")
raise e
duo_failmode = config['duo']['failmode']
多要素認証を追加するコード
簡単のためWebアプリケーションのプライマリ認証は、ユーザIDとパスワードが入力されれば、なんでも認証成功がするようになっており、認証成功をトリガーにして、Duoの多要素認証のプロンプトにリダイレクトされます。多要素認証が成功すると、ページでチャットボットが使えるページにリダイレクトされるようになっています。
@app.route("/", methods=['GET'])
def login():
return render_template("login.html", message="これはチャットボットページのログイン用ページです。")
@app.route("/", methods=['POST'])
def login_post():
"""
respond to HTTP POST with 2FA as long as health check passes
"""
username = request.form.get('username')
password = request.form.get('password')
# Check user's first factor.
# (In a production application, actually verify that the password is correct)
if password is None or password == "":
return render_template("login.html",
message="Missing password")
try:
duo_client.health_check()
except DuoException:
traceback.print_exc()
if duo_failmode.upper() == "OPEN":
# If we're failing open errors in 2FA still allow for success
return render_template("success.html",
message="Login 'Successful', but 2FA Not Performed. Confirm Duo client/secret/host values are correct")
else:
# Otherwise the login fails and redirect user to the login page
return render_template("login.html",
message="2FA Unavailable. Confirm Duo client/secret/host values are correct")
# Generate random string to act as a state for the exchange
state = duo_client.generate_state()
session['state'] = state
session['username'] = username
prompt_uri = duo_client.create_auth_url(username, state)
# Redirect to prompt URI which will redirect to the client's redirect URI
# after 2FA
return redirect(prompt_uri)
# This route URL must match the redirect_uri passed to the duo client
@app.route("/duo-callback")
def duo_callback():
# Get state to verify consistency and originality
state = request.args.get('state')
# Get authorization token to trade for 2FA
code = request.args.get('duo_code')
if 'state' in session and 'username' in session:
saved_state = session['state']
username = session['username']
else:
# For flask, if url used to get to login.html is not localhost,
# (ex: 127.0.0.1) then the sessions will be different
# and the localhost session does not have the state
return render_template("login.html",
message="保存された状態がありません。再度ログイン願います。")
# Ensure nonce matches from initial request
if state != saved_state:
return render_template("login.html",
message="保存された状態と異なります。")
decoded_token = duo_client.exchange_authorization_code_for_2fa_result(code, username)
# Exchange happened successfully so render success page
return render_template("success.html",
message="これはテスト用のサイトです。右下にチャットボットが表示されます。")
プログラム全体コンポーネント
プログラムは下記のようなツリー構造で保存します。venvとpngファイル部分を除きそれぞれのファイルの中身について記述していきます。
duo-chatbot/
├ static/
│ └ images/
│ └ chatbot.png
├ template/
│ ├ login.html
│ └ success.html
├ venv/
├ chatux12_dnac_umbrella_amp4e_jp.py
├ config.json
├ dictionary.json
└ duo.conf
メインプログラム
チャットボットのメインプログラムになります。このプログラム内に、コンフィグ読み込みや、辞書読み込み、Duo多要素認証実装、チャットボットレスポンスの作成が含まれており、login.htmlの呼び出しや、認証成功後のsuccess.htmlページに表示されるチャットボットからのテキスト入力を受け付けて、レスポンスを返す仕組みが記述されています。非常に長いので折りたたんでいます。
chatux12_dnac_umbrella_amp4e_jp.py
# -*- coding: Shift-JIS -*-
import configparser
import base64
import json
import logging
import os
import re
import sys
from time import sleep
import traceback
from flask import Flask, request, redirect, session, render_template
from duo_universal.client import Client, DuoException
import requests
import urllib3
from urllib3.exceptions import InsecureRequestWarning
__updated__ = "2021-11-26"
with open("dictionary.json", "r", encoding="utf-8_sig") as f:
json_dict = json.load(f)
driver = None
umbrella_driver = None
amp4e_driver = None
#===============================================================================
# load Duo config
#===============================================================================
config = configparser.ConfigParser()
config.read("duo.conf")
try:
duo_client = Client(
client_id=config['duo']['client_id'],
client_secret=config['duo']['client_secret'],
host=config['duo']['api_hostname'],
redirect_uri=config['duo']['redirect_uri'],
duo_certs=config['duo'].get('duo_certs', None),
)
except DuoException as e:
print("*** Duo config error. Verify the values in duo.conf are correct ***")
raise e
duo_failmode = config['duo']['failmode']
#===============================================================================
# load_json_file
#===============================================================================
def load_json_file(file_path):
"""
Load the JSON file.
If there is no file, an empty dict is returned.
Parameters
----------
file_path : str
JSON file path
Returns
-------
_ : dict
parameter map
Raises
------
"""
if not os.path.isfile(file_path):
logger = logging.getLogger(__name__)
logger.warning(f"File not found: {file_path}")
return {}
with open(file_path, "r") as f:
return json.load(f)
#===============================================================================
# base64_encode
#===============================================================================
def base64_encode(inputStr):
"""
Encode strings in Base64.
Internally, it is encoded by converting it to a byte sequence once, and then decoded into str.
Parameters
----------
inputStr : str
String to be encoded
Returns
-------
_ : str
encoded string
Raises
------
"""
return base64.b64encode(inputStr.encode()).decode()
#===============================================================================
# CiscoAPIDriver
#===============================================================================
class CiscoAPIDriver:
"""
Super class for each API driver
"""
#===========================================================================
# __init__
#===========================================================================
def __init__(self, verify=True, logger=None):
self.verify = verify
self.logger = logger or logging.getLogger(__name__)
return
#===========================================================================
# _send_http_request
#===========================================================================
def _send_http_request(self, api, header=None, payload=None, return_all=False):
"""
A common process for sending HTTP requests and receiving HTTP responses.
Check the return code, and the data will be checked by the caller.
Parameters
----------
api : tuple
Tuple of HTTP requests and destination URLs to use
header : dict, default None
Request header
payload : dict, default None
Query data to be attached to the request
Connect to the URL for GET, or embed it in the Body for POST
return_all : bool, default False
Set to True if the entire response data is to be returned.
By default, it is converted to dictionary format and returned.
Returns
-------
ret : dict or Response or None
Return response data as a whole or in a converted format
None if the request fails.
Raises
------
"""
self.logger.debug("{} START".format(sys._getframe().f_code.co_name))
ret = None
try_limit = 3 # three attempts for now.
retry_interval = 5
for _ in range(1, try_limit, 1):
try:
self.logger.debug("Request URL: [{}] {}".format(api[0], api[1]))
# Send request
if api[0] == "GET":
res = requests.get(api[1], headers=header, params=payload, verify=self.verify)
elif api[0] == "POST":
res = requests.post(api[1], headers=header, data=json.dumps(payload), verify=self.verify)
# Send exception when return code is not 200 units.
res.raise_for_status()
status_code = res.status_code
self.logger.debug(f"Return Code: {status_code}")
ret = res if return_all else res.json()
break
# Re-run for connection error
except (requests.exceptions.ProxyError, requests.exceptions.ConnectionError) as e:
self.logger.exception(f"HTTP connection error: {e}")
sleep(retry_interval)
# Exceptions other than the above are abnormal termination.
except Exception as e:
self.logger.exception(f"Unexpected error: {e}")
if res is not None:
self.logger.error(f"Return data: {res.text}")
break
self.logger.debug("{} END".format(sys._getframe().f_code.co_name))
return ret
#------------------------------------------------------------------------------ CiscoAPIDriver end
#===============================================================================
# DNACDriver
#===============================================================================
class DNACDriver(CiscoAPIDriver):
#===========================================================================
# __init__
#===========================================================================
def __init__(self, verify=True, logger=None):
super().__init__(verify=verify, logger=logger)
config = load_json_file("config.json")
self.hostname = config["hostname"]
self.username = config["username"]
self.password = config["password"]
self.zip_pass = config["zip_pass"]
self.api_key = base64_encode(f"{self.username}:{self.password}")
api = config["api"]["authenticationAPI"]
self.authentication_api = (api[0], self.__create_url(api[1]))
api = config["api"]["getDeviceList"]
self.get_device_list_api = (api[0], self.__create_url(api[1]))
api = config["api"]["exportDeviceConfigurations"]
self.export_device_config_api = (api[0], self.__create_url(api[1]))
api = config["api"]["getTaskById"]
self.get_task_by_id_api = (api[0], self.__create_url(api[1]))
api = config["api"]["downloadAFileByFileId"]
self.download_a_file_by_file_id_api = (api[0], self.__create_url(api[1]))
api = config["api"]["getClientDetail"]
self.get_client_detail_api = (api[0], self.__create_url(api[1]))
api = config["api"]["getClientEnrichmentDetails"]
self.get_client_enrichment_details_api = (api[0], self.__create_url(api[1]))
self.token = None
return
#===========================================================================
# __create_url
#===========================================================================
def __create_url(self, api_path):
"""
Connect the protocol and FQDN to the path, and generate a URL for HTTP request.
Parameters
----------
api_path : str
API path
Must be preceded by a slash "/".
Returns
-------
_ : str
HTTP request URL
If the path is None (no setting), return None.
Raises
------
"""
if api_path is None: return None
return f"https://{self.hostname}:443{api_path}"
#===========================================================================
# __create_header
#===========================================================================
def __create_header(self, append=None, token=None):
"""
Generate HTTP request headers.
Set the following header information in common.
- Content-Type
- Accept
- x-auth-token
Parameters
----------
append : dict, default None
Map of header information to be set additionally
token : str, default None
Set to specify the token required for the request from outside.
If not set, use the token held in the instance
Returns
-------
ret : dict
HTTP request header
Raises
------
"""
ret = {"Content-Type":"application/json",
"Accept": "application/json",
"x-auth-token": token or self.token}
if append is not None: ret.update(append)
return ret
#===========================================================================
# get_token
#===========================================================================
def get_token(self):
"""
Obtain the token required when making a request to the API.
If you do not run this method first, you will not get the tokens needed for subsequent requests.
Since the token is also recorded inside the instance, it is not necessary to specify the token if the instance is to be retained.
Parameters
----------
Returns
-------
_ : str or None
Return the token
None if the request fails.
Raises
------
"""
header = self.__create_header(append={"Authorization": f"Basic {self.api_key}"})
data = self._send_http_request(self.authentication_api, header=header)
if data is None: return None
self.token = data["Token"]
return self.token
#===========================================================================
# get_devices
#===========================================================================
def get_devices(self, hostname=None, token=None):
"""
Get a list of the specified devices.
If no condition is specified, all devices registered in DNAC are acquired.
Parameters
----------
hostname : str, default None
Set to specify the host name of the device.
Wildcards can be used.
token : str, default None
Set to specify the token required for the request from outside.
If not set, use the token held in the instance
Returns
-------
_ : list or None
Return the list of devices
None if the request fails.
Raises
------
"""
header = self.__create_header(token=token)
payload = {}
if hostname is not None:
payload["hostname"] = hostname
data = self._send_http_request(self.get_device_list_api,
header=header,
payload=payload if len(payload) else None)
if data is None: return None
return data["response"]
#===========================================================================
# kick_export_configs
#===========================================================================
def kick_export_configs(self, ids, token=None):
"""
Exports the Config of the specified device in encrypted Zip format.
Config is output in clear text format, and passwords and other character strings are not masked.
The password for the encrypted Zip is "[username]:[password]".
This only kicks off the export process, not the download.
Parameters
----------
ids : list
ID list of devices to be Config output
token : str, default None
Set to specify the token required for the request from outside.
If not set, use the token held in the instance.
Returns
-------
_ : list or None
Return the task ID of the export process.
None if the request fails.
Raises
------
"""
header = self.__create_header(token=token)
payload = {"deviceId": ids,
"password": self.zip_pass}
data = self._send_http_request(self.export_device_config_api, header=header, payload=payload)
if data is None: return None
return data["response"]["taskId"]
#===========================================================================
# get_task_status
#===========================================================================
def get_task_status(self, task_id, token=None):
"""
Get the status of the specified task.
Parameters
----------
task_id : str
ID of the target task
token : str, default None
Set to specify the token required for the request from outside.
If not set, use the token held in the instance.
Returns
-------
_ : list or None
Response data
None if the request fails.
Raises
------
"""
header = self.__create_header(token=token)
api = (self.get_task_by_id_api[0],
self.get_task_by_id_api[1].format(taskId=task_id))
data = self._send_http_request(api, header=header)
if data is None: return None
return data["response"]
#===========================================================================
# download_file
#===========================================================================
def download_file(self, file_id=None, additional_status_url=None, token=None):
"""
Download the specified file.
Either a file ID or an additional URL should be specified.
If both are specified, the file ID has priority.
If both are not specified, false is returned.
Parameters
----------
file_id : str, default None
ID of the target file
additional_status_url : str, default None
Download URL obtained from the file generation task
token : str, default None
Set to specify the token required for the request from outside.
If not set, use the token held in the instance.
Returns
-------
_ : bool
True if the download was successful.
Otherwise, False.
Raises
------
"""
header = self.__create_header(token=token)
if file_id is not None:
api = (self.download_a_file_by_file_id_api[0],
self.download_a_file_by_file_id_api[1].format(fileId=file_id))
elif additional_status_url is not None:
api = (self.download_a_file_by_file_id_api[0],
self.__create_url(additional_status_url))
else:
self.logger.warning("It is mandatory to set either 'file_id' or 'additional_status_url'")
return False
res = self._send_http_request(api, header=header, return_all=True)
if res is None: return False
# Get the file name from the response header
content_disposition = res.headers["Content-Disposition"]
filename_attribute = "filename="
filename = content_disposition[content_disposition.find(filename_attribute) + len(filename_attribute):]
filename = filename.replace("\"", "")
# File output as binary data
with open(filename, "wb") as f:
f.write(res.content)
return True
#===========================================================================
# get_client
#===========================================================================
def get_client(self, mac, timestamp=None, token=None):
"""
Get detailed information about the specified client.
Parameters
----------
mac : str
MAC address of the target client
timestamp : int or None, default blank
Set when you want to get information for a specific time.
The set value is the epoch time (in milliseconds).
If not set, get the latest information
token : str, default None
Set to specify the token required for the request from outside.
If not set, use the token held in the instance.
Returns
-------
_ : dict or None
Response data
None if the request fails.
Raises
------
"""
header = self.__create_header(token=token)
payload = {"timestamp": "" if timestamp is None else str(timestamp),
"macAddress": mac}
data = self._send_http_request(self.get_client_detail_api, header=header, payload=payload)
if data is None: return None
return data
#===========================================================================
# get_client_enrichment
#===========================================================================
def get_client_enrichment(self, entity_type, entity_value, issue_category=None, token=None):
"""
Get the anomalies and remedies occurring in the specified client.
Parameters
----------
entity_type : str
Key to identify the target client.
"network_user_id" or "mac_address"
entity_value : str
Parameters for the key
User ID or MAC address
issue_category : str, default None
Set to refine the category of the event.
token : str, default None
Set to specify the token required for the request from outside.
If not set, use the token held in the instance.
Returns
-------
_ : dict or None
Response data
None if the request fails.
Raises
------
"""
header = self.__create_header(append={"entity_type": entity_type,
"entity_value": entity_value,
"issueCategory": "" if issue_category is None else issue_category},
token=token)
data = self._send_http_request(self.get_client_enrichment_details_api, header=header)
if data is None: return None
# HTTP response is 200, but there is a pattern of errors in the data.
if "errorCode" in data:
self.logger.error("Return error : [{}] {}".format(data["errorCode"], data["errorDescription"]))
return None
return data
#===========================================================================
# get_client_enrichment_by_mac
#===========================================================================
def get_client_enrichment_by_mac(self, mac, issue_category=None, token=None):
"""
get_client_enrichment() wrapper function.
"""
return self.get_client_enrichment("mac_address", mac, issue_category, token)
#===========================================================================
# get_client_enrichment_by_uid
#===========================================================================
def get_client_enrichment_by_uid(self, uid, issue_category=None, token=None):
"""
get_client_enrichment() wrapper function.
"""
return self.get_client_enrichment("network_user_id", uid, issue_category, token)
#------------------------------------------------------------------------------ DNACDriver end
#===============================================================================
# UmbrellaDriver
#===============================================================================
class UmbrellaDriver(CiscoAPIDriver):
#===========================================================================
# __init__
#===========================================================================
def __init__(self, verify=True, logger=None):
super().__init__(verify=verify, logger=logger)
config = load_json_file("config.json")
self.api_key = config["umbrella_api_key"]
self.authentication_api = config["umbrella_api"]["authentication"]
self.reporting_activity_api = config["umbrella_api"]["reportingActivity"]
self.token = None
return
#===========================================================================
# __create_header
#===========================================================================
def __create_header(self, append=None, token=None):
"""
Generate HTTP request headers.
Set the following header information in common.
- Content-Type
- Authorization
Parameters
----------
append : dict, default None
Map of header information to be set additionally
token : str, default None
Set to specify the token required for the request from outside.
If not set, use the token held in the instance.
Returns
-------
ret : dict
HTTP request header
Raises
------
"""
ret = {"Content-Type":"application/json",
"Authorization": "Bearer {}".format(token or self.token)}
if append is not None: ret.update(append)
return ret
#===========================================================================
# get_token
#===========================================================================
def get_token(self):
"""
Obtain the token required when making a request to the API.
If you do not run this method first, you will not get the tokens needed for subsequent requests.
Since the token is also recorded inside the instance, it is not necessary to specify the token if the instance is to be retained.
Parameters
----------
Returns
-------
_ : dict or None
Response data
None if the request fails.
Raises
------
"""
header = self.__create_header(append={"Authorization": f"Basic {self.api_key}"})
data = self._send_http_request(self.authentication_api, header=header)
if data is None: return None
self.token = data["access_token"]
return data
#===========================================================================
# get_activity
#===========================================================================
def get_activity(self, term_from, term_to, list_limit, act_type="all", act_category=None, ip=None, token=None):
"""
Get a list of Umbrella's activities within the specified period.
Parameters
----------
org_id : str
ID of the target organization
term_from : str or int
Start time of the period
Serial value of timestamp (e.g. 14205322422) or string of relative time (e.g. -1days)
term_to : str or int
End time of the period
Serial value of timestamp (e.g. 14205322422) or string of relative time (e.g. -1days)
list_limit : int
Maximum number of activities to retrieve
act_type : str, default "all"
Set if you want to filter by activity type.
Valid values are dns/proxy/firewall/ip
If omitted or an invalid value is specified, do not filter.
act_category : str, default None
Set if you want to filter by activity category.
Specify the category ID as a comma-separated list
ip : str, default None
Set if you want to filter by client IP address.
token : str, default None
Set to specify the token required for the request from outside.
If not set, use the token held in the instance.
Returns
-------
_ : list or None
Response data
None if the request fails.
Raises
------
"""
header = self.__create_header(token=token)
protocol = self.reporting_activity_api[0]
url = self.reporting_activity_api[1]
if act_type in ["dns", "proxy", "firewall", "ip"]:
url += "/" + act_type
payload = {"from": term_from,
"to": term_to,
"limit": list_limit,
"categories": "" if act_category is None else act_category,
"ip": "" if ip is None else ip}
# In requests.get(), comma is converted to "%2C" when generating query string, so URL with query must be generated and passed.
# https://stackoverflow.com/questions/56734910/python-converting-in-requests-get-parameters-to-2c-and-to-7c
url += "?" + "&".join([f"{k}={v}" for k, v in payload.items()])
data = self._send_http_request((protocol, url), header=header)
if data is None: return None
return data
#------------------------------------------------------------------------------ UmbrellaDriver end
#===============================================================================
# AMP4EDriver
#===============================================================================
class AMP4EDriver(CiscoAPIDriver):
#===========================================================================
# __init__
#===========================================================================
def __init__(self, verify=True, logger=None):
super().__init__(verify=verify, logger=logger)
config = load_json_file("config.json")
url = "https://{}".format(config["amp4e_hostname"])
self.uid = config["amp4e_id"]
self.password = config["amp4e_key"]
self.api_key = base64_encode(f"{self.uid}:{self.password}")
api = config["amp4e_api"]["computers"]
self.computers_api = (api[0], url + api[1])
api = config["amp4e_api"]["events"]
self.events_api = (api[0], url + api[1])
return
#===========================================================================
# __create_header
#===========================================================================
def __create_header(self, append=None):
"""
Generate HTTP request headers.
Set the following header information in common.
- Accept
- Content-Type
- Accept-Encoding
- Authorization
Parameters
----------
append : dict, default None
Map of header information to be set additionally
Returns
-------
ret : dict
HTTP request header
Raises
------
"""
ret = {"Accept": "application/json",
"Content-Type":"application/json",
"Accept-Encoding": "identity, gzip, deflate",
"Authorization": f"Basic {self.api_key}"}
if append is not None: ret.update(append)
return ret
#===========================================================================
# get_computers
#===========================================================================
def get_computers(self, list_limit, list_offset=0, ip=None):
"""
Get the list of specified host computers.
Parameters
----------
list_limit : int
Maximum number of lists
list_offset : int, default 0
Start position for getting the list
ip : str, default None
Set to filter by IP address of the host computer.
The address band can be specified by omitting the end (wildcards are not required)
Returns
-------
_ : list or None
Response data
None if the request fails.
Raises
------
"""
payload = {"limit": list_limit,
"offset": list_offset,
"internal_ip": "" if ip is None else ip}
data = self._send_http_request(self.computers_api,
header=self.__create_header(),
payload=payload)
if data is None: return None
return data
#===========================================================================
# get_events
#===========================================================================
def get_events(self, list_limit, list_offset=0, connector_guid=None):
"""
Get a list of the specified events.
Parameters
----------
list_limit : int
Maximum number of lists
list_offset : int, default 0
Start position for getting the list
connector_guid : str, default None
Set to filter by Connector GUID
Returns
-------
_ : list or None
Response data
None if the request fails.
Raises
------
"""
payload = {"limit": list_limit,
"offset": list_offset,
"connector_guid[]": "" if connector_guid is None else connector_guid}
data = self._send_http_request(self.events_api,
header=self.__create_header(),
payload=payload)
if data is None: return None
return data
#------------------------------------------------------------------------------ AMP4EDriver end
#===============================================================================
# Flask initialize and session key encrypted
#===============================================================================
app = Flask(__name__)
app.secret_key = os.urandom(32)
#===============================================================================
# get_request
#===============================================================================
@app.route("/chatbot")
def get_request():
value = request.args.get("text", "")
callback = request.args.get("callback", "")
if re.compile(r"([a-fA-F0-9]{2}\:){5}[a-fA-F0-9]{2}").search(value):
resp = dnac_client_enrich(mac=value)
elif re.compile(r"uid-").search(value):
resp = dnac_client_enrich(uid=value[4:])
elif re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$").search(value):
resp = umbrella_report(value)
resp2 = amp4e_event(value)
resp.extend(resp2)
else:
# Display a default message when an unexpected request comes in.
if value not in json_dict: value = "bad condition"
resp = [{"type": "text",
"value": json_dict[value]}]
contents = callback + "(" + json.dumps({"output": resp}) + ")"
return contents
#===============================================================================
# Duo
#===============================================================================
@app.route("/", methods=['GET'])
def login():
return render_template("login.html", message="これはチャットボットページのログイン用ページです。")
@app.route("/", methods=['POST'])
def login_post():
"""
respond to HTTP POST with 2FA as long as health check passes
"""
username = request.form.get('username')
password = request.form.get('password')
# Check user's first factor.
# (In a production application, actually verify that the password is correct)
if password is None or password == "":
return render_template("login.html",
message="Missing password")
try:
duo_client.health_check()
except DuoException:
traceback.print_exc()
if duo_failmode.upper() == "OPEN":
# If we're failing open errors in 2FA still allow for success
return render_template("success.html",
message="Login 'Successful', but 2FA Not Performed. Confirm Duo client/secret/host values are correct")
else:
# Otherwise the login fails and redirect user to the login page
return render_template("login.html",
message="2FA Unavailable. Confirm Duo client/secret/host values are correct")
# Generate random string to act as a state for the exchange
state = duo_client.generate_state()
session['state'] = state
session['username'] = username
prompt_uri = duo_client.create_auth_url(username, state)
# Redirect to prompt URI which will redirect to the client's redirect URI
# after 2FA
return redirect(prompt_uri)
# This route URL must match the redirect_uri passed to the duo client
@app.route("/duo-callback")
def duo_callback():
# Get state to verify consistency and originality
state = request.args.get('state')
# Get authorization token to trade for 2FA
code = request.args.get('duo_code')
if 'state' in session and 'username' in session:
saved_state = session['state']
username = session['username']
else:
# For flask, if url used to get to login.html is not localhost,
# (ex: 127.0.0.1) then the sessions will be different
# and the localhost session does not have the state
return render_template("login.html",
message="保存された状態がありません。再度ログイン願います。")
# Ensure nonce matches from initial request
if state != saved_state:
return render_template("login.html",
message="保存された状態と異なります。")
decoded_token = duo_client.exchange_authorization_code_for_2fa_result(code, username)
# Exchange happened successfully so render success page
# return render_template("success.html",
# message=json.dumps(decoded_token, indent=2, sort_keys=True))
return render_template("success.html",
message="これはテスト用のサイトです。右下にチャットボットが表示されます。")
#===============================================================================
# dnac_client_enrich
#===============================================================================
def dnac_client_enrich(mac=None, uid=None):
resp = []
try:
token = driver.get_token()
if token is None:
return [{"type": "text",
"value": "DNACへのクエリー失敗しました。<br>しばらく待ってからお試しください。"}]
if uid is None:
enrichs = driver.get_client_enrichment_by_mac(mac)
if enrichs is None:
return [{"type": "text",
"value": "<br>".join([f"該当MACを持つ端末が見付かりません。 '{mac}'.",
"再度、MACを確認して入力願います。 (format: 'XX:XX:XX:XX:XX:XX')"])}]
# Get only the first one.
enrich = enrichs[0]
if not (len(enrich["userDetails"]) and len(enrich["connectedDevice"])):
return [{"type": "text",
"value": "<br>".join(["正常にクライアント情報を入手できませんでした。",
"しばらく待ってリトライ願います。"])}]
# issue_count = enrich["userDetails"]["issueCount"]
issue_details = enrich["issueDetails"]
else:
enrichs = driver.get_client_enrichment_by_uid(uid)
if enrichs is None:
return [{"type": "text",
"value": "<br>".join([f"該当のユーザIDが見つかりませんでした。 '{uid}'.",
"再度、確認して再入力願います。 (format: uid-<userid> / e.g. uid-taro)"])}]
# Get only the first one.
enrich = enrichs[0]
if not (len(enrich["userDetails"]) and len(enrich["connectedDevice"])):
return [{"type": "text",
"value": "<br>".join(["正常にクライアント情報を入手できませんでした。",
"しばらく待ってリトライ願います。"])}]
mac = enrich["userDetails"]["hostMac"]
# issue_count = enrich["userDetails"]["issueCount"]
issue_details = enrich["issueDetails"]
client = driver.get_client(mac)
if client is None:
return [{"type": "text",
"value": f"該当MACを持つ端末が見付かりません。 '{mac}'.<br>IT部門に連絡ください。"}]
elif not (len(client["connectionInfo"]) and len(client["detail"]) and len(client["topology"])):
return [{"type": "text",
"value": "<br>".join(["正常にクライアント情報を入手できませんでした。",
"しばらく待ってリトライ願います。"])}]
conn_status = client["detail"]["connectionStatus"]
host_ip = client["detail"]["hostIpV4"]
host_name = client["detail"]["hostName"]
host_type = client["detail"]["hostType"]
ssid = client["detail"]["ssid"]
host_location = client["detail"]["location"]
port = client["detail"]["port"]
link_speed = client["detail"]["linkSpeed"]
host_score = client["detail"]["healthScore"][0]["score"]
# If the connection is broken and only the client data is left, the connectionInfo will contain error information.
band = client["connectionInfo"].get("band", "----")
channel_width = client["connectionInfo"].get("channelWidth", "----")
device_name = client["connectionInfo"].get("nwDeviceName", "----")
resp.append({"type": "text",
"value": "DNACへのクエリが成功しました。<br>下記を確認ください。",
"delayMs": 500})
if host_type == "WIRED":
resp.append({"type": "text",
"value": "<br>".join([f"Hostname: {host_name}",
f"IP: {host_ip}",
f"場所: {host_location}",
f"ヘルススコア: {host_score}",
f"コネクションタイプ: {host_type}",
f"コネクションステータス: {conn_status}",
f"接続デバイス: {device_name}",
f"インタフェース: {port}",
f"リンクスピード: {link_speed} bps"]),
"delayMs": 2000})
else:
resp.append({"type": "text",
"value": "<br>".join([f"Hostname: {host_name}",
f"IP: {host_ip}",
f"場所: {host_location}",
f"ヘルススコア: {host_score}",
f"コネクションタイプ: {host_type}",
f"コネクションステータス: {conn_status}",
f"接続AP: {device_name}",
f"接続SSID: {ssid}",
f"接続帯域: {band} GHz",
f"チャネル帯域: {channel_width} MHz"]),
"delayMs": 2000})
if len(issue_details) == 0:
resp.append({"type": "text",
"value": "あなたの端末は問題ありません。<br>それでもスコアが低ければIT部門に連絡下さい。",
"delayMs": 2000})
else:
resp.append({"type": "text",
"value": "あなたの端末は問題があります。",
"delayMs": 2000})
for i, issue in enumerate(issue_details["issue"]):
resp.append({"type": "text",
"value": "issue {} : {}".format(i + 1, issue["issueSummary"]),
"delayMs": 2000})
for j, action in enumerate(issue["suggestedActions"]):
resp.append({"type": "text",
"value": "suggested action {}-{} : {}".format(i + 1, j + 1, action["message"]),
"delayMs": 2000})
resp.append({"type": "text",
"value": "<br>".join(["上記の緩和策を実施すると、問題は解決されるかも知れません。",
"もし問題が解決されなければ、IT部門に連絡ください。"]),
"delayMs": 2000})
# We'll also get information about Umbrella and AMP4E.
resp.append({"type": "text",
"value": "セキュリティチェックに移ります。",
"delayMs": 2000})
umbrella_resp = umbrella_report(host_ip)
resp.extend(umbrella_resp)
amp4e_resp = amp4e_event(host_ip)
resp.extend(amp4e_resp)
except KeyError:
traceback.print_exc()
return [{"type": "text",
"value": "予期しないエラーが発生しました。<br>IT部門に連絡ください。"}]
return resp
#===============================================================================
# umbrella_report
#===============================================================================
def umbrella_report(ip):
resp = []
umbrella_driver.get_token()
# TODO: For demonstration purposes, some filtering conditions are fixed.
term_from = "-1days"
term_to = "now"
list_limit = 5
act_category = "68,66,64" # Filtering by Malware, Phishing, and C&C categories
response_report = umbrella_driver.get_activity(term_from,
term_to,
list_limit,
act_category=act_category,
ip=ip)
resp.append({"type": "text",
"value": "Umbrellaへのクエリが成功しました。<br>内容を確認下さい。",
"delayMs": 500 })
activities = response_report["data"]
if len(activities) == 0:
resp.append({"type": "text",
"value": "安心してください。最近、悪意あるドメインへのクエリはありません。",
"delayMs": 2000 })
else:
resp.append({"type": "text",
"value": "最近アクセスした次のドメインは <font color=\"#ff0000\"><b>悪意ある</b></font> ドメインです。",
"delayMs": 2000 })
domains = []
for i, activity in enumerate(activities):
domains.append("{}. {}".format((i + 1), activity["domain"]))
resp.append({"type": "text",
"value": "<br>".join(domains),
"delayMs": 2000})
return resp
#===============================================================================
# amp4e_event
#===============================================================================
def amp4e_event(ip):
resp = []
# TODO: For demonstration purposes, some filtering conditions are fixed.
list_limit = 5
data = amp4e_driver.get_computers(list_limit, ip=ip)
computers = data["data"]
if len(computers) == 0:
return [{"type": "text",
"value": "あなたはAMPエージェントが入ってません。インストールする場合はIT部門に連絡ください。"}]
# If AMP4E is already installed on the device, get event information
data = amp4e_driver.get_events(list_limit, connector_guid=computers[0]["connector_guid"])
resp.append({"type": "text",
"value": "AMP4Eへのクエリが成功しました。<br>内容を確認ください。",
"delayMs": 500 })
events = data["data"]
if len(events) == 0:
resp.append({"type": "text",
"value": "安心してください。あなたは最近セキュリティイベントは発生していません。",
"delayMs": 2000 })
else:
resp.append({"type": "text",
"value": "次のイベントは <font color=\"#ff0000\"><b>悪意ある</b></font> イベントです。",
"delayMs": 2000 })
detections = []
for i, event in enumerate(events):
detections.append("{}. {}".format((i + 1), event["event_type"]))
detections.append("-- Name: {}".format(event["detection"]))
detections.append("-- Time: {}".format(event["date"]))
if "file_name" in event["file"]:
detections.append("-- File: {}".format(event["file"]["file_name"]))
resp.append({"type": "text",
"value": "<br>".join(detections),
"delayMs": 2000})
return resp
if __name__ == "__main__":
# If DNAC uses a self-certificate, an error will occur when verifying the SSL certificate, so turn off verification.
# If you turn off verification, you will get an InsecureRequestWarning, ignore that too.
urllib3.disable_warnings(InsecureRequestWarning)
driver = DNACDriver(verify=False)
umbrella_driver = UmbrellaDriver()
amp4e_driver = AMP4EDriver()
app.run(debug=True, host="localhost", port=8080)
メインプログラム設定ファイル
汎用性を持たせるため本プログラムを実行するために必要のない項目もありますが、下記の設定情報の記述が必要です。
-
DNACのIPアドレス(またはホスト名)、ユーザID、パスワード
- "hostname": "<dna center hostname or IP address>"
- "username": "<username>"
- "password": "<password>"
-
UmbrellaのBase64コーディングしたAPI_id:secret、OrganizationIdを含むHTTP Method(<organizationId>だけ自身のテナントのものに置き換えてください。)
- "umbrella_api_key": "<Base64 of api_id:secret>"
- "reportingActivity": ["GET", "https://reports.api.umbrella.com/v2/organizations/<organizationId>/activity"]
-
AMP4Eのホスト名、api_id、api_key
- "amp4e_id": "<api_id>"
- "amp4e_key": "<api_key>"
{
"hostname": "<dna center hostname or IP address>",
"username": "<username>",
"password": "<password>",
"zip_pass": "<zip_password>",
"api": {
"authenticationAPI": ["POST", "/dna/system/api/v1/auth/token"],
"getDeviceList": ["GET", "/dna/intent/api/v1/network-device"],
"exportDeviceConfigurations": ["POST", "/dna/intent/api/v1/network-device-archive/cleartext"],
"getTaskById": ["GET", "/dna/intent/api/v1/task/{taskId}"],
"downloadAFileByFileId": ["GET", "/dna/intent/api/v1/file/{fileId}"],
"getClientDetail": ["GET", "/dna/intent/api/v1/client-detail"],
"getClientEnrichmentDetails": ["GET", "/dna/intent/api/v1/client-enrichment-details"]
},
"umbrella_api_key": "<Base64 of api_id:secret>",
"umbrella_api": {
"authentication": ["GET", "https://management.api.umbrella.com/auth/v2/oauth2/token"],
"reportingActivity": ["GET", "https://reports.api.umbrella.com/v2/organizations/<organizationid>/activity"]
},
"amp4e_hostname": "api.apjc.amp.cisco.com",
"amp4e_id": "<api_id>",
"amp4e_key": "<api_key>",
"amp4e_api": {
"computers": ["GET", "/v1/computers"],
"events": ["GET", "/v1/events"]
}
}
UmbrellaやAMP for Endpointsを使わない方は、メインプログラムの中から下記の関連個所をコメントアウトしてください。
0881行目 elif re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$").search(value):
0882行目 resp = umbrella_report(value)
0883行目 resp2 = amp4e_event(value)
0884行目 resp.extend(resp2)
1091行目 # We'll also get information about Umbrella and AMP4E.
1092行目 resp.append({"type": "text",
1093行目 "value": "セキュリティチェックに移ります。",
1094行目 "delayMs": 2000})
1096行目 umbrella_resp = umbrella_report(host_ip)
1097行目 resp.extend(umbrella_resp)
1099行目 amp4e_resp = amp4e_event(host_ip)
1100行目 resp.extend(amp4e_resp)
1205行目 umbrella_driver = UmbrellaDriver()
1206行目 amp4e_driver = AMP4EDriver()
メインプログラム辞書ファイル
チャットボットから入力されたテキストに対して応答するための辞書ファイルになります。本辞書ファイルに列挙されているテキスト入力に対しては、APIを使わずにそのままレスポンスします。MACアドレスでもなく、uid‐<ユーザID>でもなく、IPアドレスでもなく、更に辞書内のどのキーワードにもマッチしない場合は、bad conditionの内容でレスポンスします。
ここで良くある無線LANトラブルシューティングのノウハウを登録しておくと、電話で問合せを受けたときと同じような対応方法で、ユーザに対してチャットボットが回答することができます。
{
"おはよう":"おはようございます。ご機嫌いかがでしょうか。",
"こんにちは":"こんにちは。眠くないですか。",
"こんばんは":"こんばんは。お腹すいてませんか。",
"眠い":"早く寝ろやっ!",
"お腹すいた":"なんか食えやっ!",
"体調":"こんなところでチャットしてないで病院行きましょう",
"無線LAN":"何か無線LAN調子悪いでしょうか。「つながらない」「遅い」のいずれでしょうか。",
"つながらない":"ID/Passは間違ってませんか。「間違ってない」「間違っている」のいずれでしょうか。",
"間違ってない":"SSIDは見えてますでしょうか。「見えてる」「見えてない」のいずれでしょうか。",
"見えてない":"端末の問題の可能性があります。無線オンオフやドライバーをアップデートしてください。それでもつながらない場合はIT部門まで連絡下さい",
"間違っている":"正しいID/Passを入力下さい",
"見えてる":"端末の問題の可能性があります。無線オンオフやドライバーをアップデートしてください。それでもつながらない場合はIT部門まで連絡下さい。",
"遅い":"無線LANの受信強度を確認してください。「-75以上」「-75以下」「分からない」のいずれでしょうか",
"-75以上":"電波は弱くはないですが、一度、無線LANのオフオンを実施してください。",
"-75以下":"電波が弱いです。ローミングを促すために無線LANのオフオンを実施してください。",
"分からない":"DNACに状況を確認するのでMACアドレスを下記フォーマットでを入力してください。<br>XX:XX:XX:XX:XX:XX<br>もしくはユーザIDを下記フォーマットで入力してください。<br>uid-<ユーザID>",
"株価":"<a href=\"https://finance.yahoo.co.jp/\" target=\"_blank\">株価はこちらです。</href>",
"天気":"<a href=\"https://weather.yahoo.co.jp/weather/\" target=\"_blank\">天気はこちらです。</href>",
"給料":"今月の給料は、、、秘密です。",
"ありがとう":"ご利用ありがとうございました。またのご利用をお待ちしております。",
"セキュリティ":"IPアドレスを入力してください。",
"bad condition": "問題はなんでしょうか ?<br><li>体調</li><li>無線LAN</li><li>セキュリティ</li>"
}
Duo設定ファイル
先ほど記載した内容と同じなので説明は割愛致します。
; Duo integration config
[duo]
client_id =<Client ID>
client_secret =<Client secret>
api_hostname =<api hostname>.duosecurity.com
redirect_uri = http://localhost:8080/duo-callback
failmode = closed
login.html
最初に表示されるログイン画面のHTMLファイルです。
<html>
<body>
<img src="/static/images/chatbot.png">
<h3><pre class="language-json"><code class="language-json" id="message">{{ message }}</code></pre></h3>
<form class="input-form" action="/" method="POST">
<label for="exampleInputEmail1"><b>Name:</b></label>
<input class="form-control" name="username" type="text" placeholder="username" id="exampleInputEmail1">
<label for="exampleInputPassword1"><b>Password:</b></label>
<input class="form-control" name="password" type="password" placeholder="password" id="exampleInputPassword1">
<button type="submit" class="btn btn-primary">Submit</button>
</form>
</body>
</html>
success.html
多要素認証が成功した後にリダイレクトされるHTMLファイルです。このページの右下にチャットボットが表示されるようになります。
<html>
<body>
<script src="https://riversun.github.io/chatux/chatux.min.js"></script>
<script>
const chatux = new ChatUx();
const initParam =
{
renderMode: 'auto',
api: {
endpoint: 'http://localhost:8080/chatbot',
method: 'GET',
dataType: 'jsonp'
},
bot: {
botPhoto: 'https://enjoynet.co.jp/icon/honwakaneesan_stickericon_02.jpg',
humanPhoto: null,
widget: {
sendLabel: 'send',
placeHolder: 'What is the inquiry?'
}
},
window: {
title: 'Contact Us',
infoUrl: 'http://www.hogehoge.local'
}
};
chatux.init(initParam);
chatux.start(true);
</script>
<h3><pre class="language-json"><code class="language-json" id="message">{{ message }}</code></pre></h3>
<img src="/static/images/chatbot.png">
</body>
</html>
実行
下記コマンドを実行することでWebアプリケーションが立ち上がります。
$ cd duo-chatbot
$ source venv/Scripts/activate
$ python chatux12_dnac_umbrella_amp4e_jp.py
ブラウザで http://localhost:8080/ にアクセスするとログイン画面が表示されます。
適当なクレデンシャルを入力してSubmitを推すとDuoにリダイレクトされて、スマホにPush通知が来ます。
テスト用のサイトにリダイレクトされてチャットボットが表示されます。
実際のチャットボットのやりとりは下記の動画でご確認下さい。
動画
ログイン画面からプライマリ認証、多要素認証、実際のチャットボットの挙動が分かる動画になっています。
おわり
最後まで読んで頂きまして、まことにありがとうございます。詰め込みすぎた感が否めませんが年に一度の記事なので伝えたいことを詰め込みました。DNACだけでも十分に便利な機能を提供できるのですが、APIを活用することによって、異なる視点での利便性を提供できるのは面白いなーっと思って記事にしました。
免責事項
本サイトおよび対応するコメントにおいて表明される意見は、投稿者本人の個人的意見であり、シスコの意見ではありません。本サイトの内容は、情報の提供のみを目的として掲載されており、シスコや他の関係者による推奨や表明を目的としたものではありません。各利用者は、本Webサイトへの掲載により、投稿、リンクその他の方法でアップロードした全ての情報の内容に対して全責任を負い、本Web サイトの利用に関するあらゆる責任からシスコを免責することに同意したものとします。
-
Cisco Secure Endpointにリブランドしましたが本記事では旧名を使っています。 ↩