はじめに
この記事はシスコの同志による Advent Calendar の一部として投稿しています
- 2017年版: https://qiita.com/advent-calendar/2017/cisco
- 2018年版: https://qiita.com/advent-calendar/2018/cisco
- 2019年版: https://qiita.com/advent-calendar/2019/cisco
- 2020年版: https://qiita.com/advent-calendar/2020/cisco
- 2020年版(2枚目): https://qiita.com/advent-calendar/2020/cisco2
本記事は2020年版Advent Calendar 2枚目の17日目の投稿になり、私自身としては初の投稿となります。
Guest Shellを触ってみたきっかけ
いつも懇意にしているエンジニアから「小さい拠点のインターネットルータのログ管理したいんだけど、拠点側にSyslogサーバ立てるのも面倒だから、直接クラウドに保存する方法ないですか?」という問合せがあり、ちょっとだけ考えた結果「Meraki MXならそもそもイベントログはクラウドですよ。」と返したのですが、どうやらISRでやりたいとのこと。そこでもうちょっとだけ考えたら、最近のISRはGuest Shellと言うLinux環境が筐体内にあるのでこれを使って実現する方法は無いか?と言うことで考えてみました。
Guest Shellとは
プログラマビリティ コンフィギュレーション ガイドにはこのように書いてあります。
「ゲスト シェルは、仮想化された Linux ベースの環境であり、Cisco デバイスの自動制御と管理のための Python アプリケーションを含む、カスタム Linux アプリケーションを実行するように設計されています。ゲスト シェルを使用して、サードパーティ製 Linux アプリケーションをインストール、更新、および操作することもできます。」
簡単に言うとCisco機器上でLinux環境を使うことが出来るんですね。
https://www.cisco.com/c/ja_jp/td/docs/ios-xml/ios/prog/configuration/1610/b_1610_programmability_cg/b_1610_programmability_cg_chapter_011.html
実装イメージ
実装方法を考えてみた結果、下記のステップでやりたいこと実現できそうでした。
1. ISR1kのSyslogを一旦、Guest Shell内のrsyslogで受信
2. Cronで定期的に筐体内でログローテション
3. CronでローテーションしたログファイルをAWS S3にコピー
今回の環境
IOS-XEでの設定内容
バージョン情報
# show version
Cisco IOS XE Software, Version 17.03.02
Logging設定
rsyslogで受けるときのFacilityを設定。今回はlocal1で送信します。
logging facility local1
logging host 192.168.35.2
Guest Shellの有効化
今回の環境に合わせてGuest Shellを設定。
interface VirtualPortGroup0
ip address 192.168.35.1 255.255.255.0
ip nat inside
interface GigabitEthernet0/0/0
ip address 10.10.30.192 255.255.255.0
ip nat outside
iox
ip nat inside source static 192.168.35.2 10.10.30.155
app-hosting appid guestshell
app-vnic gateway1 virtualportgroup 0 guest-interface 0
guest-ipaddress 192.168.35.2 netmask 255.255.255.0
app-default-gateway 192.168.35.1 guest-interface 0
そして最後のGuest Shellの起動です。
# guestshell enable
Guest Shellの動作確認です。
# show app-hosting list
App id State
---------------------------------------------------------
guestshell RUNNING
Guest Shellに移動します。
# guestshell run bash
[guestshell@guestshell ~]$
Guest Shellでの設定内容
バージョン情報
$ python3 -V
Python 3.6.8
aws cliインストール
AWS S3にファイルをアップロードするためのaws-cliをインストール。
$ curl "https://bootstrap.pypa.io/get-pip.py" -o "get-pip.py"
$ python get-pip.py
$ pip install awscli
$ aws --version
aws-cli/1.18.177 Python/3.6.8 Linux/4.4.214-armada-17.10.1 botocore/1.19.17
参考URL
https://qiita.com/takahashi-kazuki/items/a0b737a3eaa2c9d6304f
AWS ConfigureでAWS Access KeyとSecret Keyを登録
あらかじめ作成したS3バケットのAWSアカウントのAccess KeyとSecret Keyを登録。
$ aws configure
AWS Access Key ID [None]: xxxxxxxxxxxxxxxxxxxxxx
AWS Secret Access Key [None]:xxxxxxxxxxxxxxxxxxxxx
Default region name [None]: ap-northeast-1
Default output format [None]: json
正常に登録されてバケット名が見えることを確認する。
$ aws sts get-caller-identity{
"UserId": “xxxxxxxxxxxxxxxxxx",
"Account": “xxxxxxxxxxxxxx",
"Arn": "arn:aws:iam::xxxxxxxxxxxxx:user/admin-xxxxxxx“
}
$ aws s3 ls
2020-11-30 06:29:06 xxxxx-syslog-bucket
※一応、バケット名も伏字です。
rsyslog設定変更
外部からのsyslogを受信するために、/etc/rsyslogd.confの下記2行のコメントを外す。
module(load="imudp") # needs to be done just once
input(type="imudp" port="514")
IOS-XEからlocal1で送信されるsyslogを受信するため、/etc/rsyslogd.confに下記Ruleを追記。
#### RULES ####
local1.* /bootflash/guest-share/isr1klog
rsyslogを再起動する。
$ sudo systemctl restart rsyslog
Guest Shellでsyslogを受信出来ていることを確認。
$ tail /bootflash/guest-share/isr1klog
Dec 1 00:12:30 _gateway 4688: *Dec 1 00:12:29.774: %LINEPROTO-5-UPDOWN: Line protocol on Interface Loopback5, changed state to up
Dec 1 00:12:39 _gateway 4689: *Dec 1 00:12:38.709: %SYS-5-CONFIG_I: Configured from console by console
Dec 1 00:12:50 _gateway 4690: *Dec 1 00:12:49.759: %LINEPROTO-5-UPDOWN: Line protocol on Interface Loopback5, changed state to down
Dec 1 00:12:50 _gateway 4691: *Dec 1 00:12:49.760: %LINK-5-CHANGED: Interface Loopback5, changed state to administratively down
Dec 1 00:12:52 _gateway 4692: *Dec 1 00:12:52.186: %LINEPROTO-5-UPDOWN: Line protocol on Interface Loopback5, changed state to up
Dec 1 00:12:53 _gateway 4693: *Dec 1 00:12:52.186: %LINK-3-UPDOWN: Interface Loopback5, changed state to up
Dec 1 00:12:57 _gateway 4694: *Dec 1 00:12:57.430: %LINEPROTO-5-UPDOWN: Line protocol on Interface Loopback5, changed state to down
Dec 1 00:12:57 _gateway 4695: *Dec 1 00:12:57.431: %LINK-5-CHANGED: Interface Loopback5, changed state to administratively down
Dec 1 00:12:58 _gateway 4696: *Dec 1 00:12:57.888: %SYS-5-CONFIG_I: Configured from console by console
logrotate設定変更
AWS S3にUpload後のローカルファイルは不要なため、/etc/logrotate.confでDaily2世代に変更
# rotate log files weekly
# weekly
# rotate log files dayly
daily
# keep 4 weeks worth of backlogs
# rotate 4
# keep 2 days worth of backlogs
rotate 2
さらにおまじないで下記を追記する。
/bootflash/guest-share/*log {
missingok
su root root
}
AWSバックアップシェル作成
ログフォルダからアーカイブされたファイルのみをS3バケットにコピーする内容のawsbk2.shを作成する。
#!/bin/sh
source /etc/profile
/home/guestshell/.local/bin/aws s3 cp --recursive --exclude '*' --include 'isr1klog-*' /bootflash/guest-share s3://xxxxx-syslog-bucket
exit 0
※一応、バケット名も伏字です。
crontab登録
下記のスケジュールのcrontabを登録
$ crontab -l
* 6 * * * /home/guestshell/awsbk2.sh >>/home/guestshell/awsbk2.log 2>>/home/guestshell/awsbk2-err.log
動作確認
Guest Shell内での確認
Guest Shell内でログがローテーションされたことを確認
$ ll /bootflash/guest-share/
total 56
-rw-rw-r--. 1 root root 123 Dec 2 12:59 isr1klog
-rw-rw-r--. 1 root root 17220 Dec 1 03:06 isr1klog-20201201
-rw-rw-r--. 1 root root 3233 Dec 2 03:00 isr1klog-20201202
...
正常にawsbk2.shが動作したことを確認。
$ tail /home/guestshell/awsbk2.log
upload: ../../bootflash/guest-share/isr1klog-20201202 to s3://xxxxx-syslog-bucket/isr1klog-20201202
upload: ../../bootflash/guest-share/isr1klog-20201201 to s3://xxxxx-syslog-bucket/isr1klog-20201201
AWS S3側にも保存されていることを確認。
$ aws s3 ls xxxxx-syslog-bucket
2020-12-02 06:57:05 17220 isr1klog-20201201
2020-12-02 06:57:05 3233 isr1klog-20201202
※一応、バケット名も伏字です。
おまけ
Guest Shellの面白さにはまり、IOS-XEで新しいコマンドを作ってみました。
show-threat-fqdn <ドメイン名>
このコマンドをIOS-XE上で発行することで、IOS-XE上から入力したドメイン名についてのUmbrella InvestigateとVirustotalの判定結果を表示させることが出来ます。全然、ルータ上で実装する必要性はないですが、自分で作ったPythonプログラムが、IOS-XEから呼び出せるって素敵じゃないですか?そんなことないですか?私はこれで提案ネタが出来たと思ってしまいました。
※Umbrella Investigate APIを利用するためには、有償のUmbrella Investigateの契約が必要です。
# show-threat-fqdn ihaveabadreputation.com
ihaveabadreputation.com Umbrella Investigate Score is following
Status: Malicious
Security_categories: Malware
Content_categories: Computer Security
dga_score: 0.0
perplexity: 0.43312814494916696
securerank2: -33.59303360111114
pagerank: 12.507543
asn_score: -0.06686509817044371
prefix_score: -0.16157170447705335
rip_score: 0.0
polularity: 7.632245549719956
ihaveabadreputation.com Virustotal Score is following
Positives:/Total: 0 / 77
Safety score: 60
Adult content: no
Verdict: unrated
Websense ThreatSeeker category: Unknown
TrendMicro category: Unknown
Dr.Web category: Unknown
実装方法
1.作成したスクリプト(下記参照:domaincheck03.py)を/bootflash/guest-share/にコピー
# copy ftp://10.10.30.21/c1111/domaincheck03.py flash:domaincheck03.py
# copy flash:domaincheck03.py flash:guest-share/domaincheck03.py
2.Guest Shell内でスクリプトをホームディレクトリに移動させて権限変更
$ cp /flash/guest-share/domaincheck03.py /home/guestshell/
$ chmod 755 /home/guestshell/domaincheck03.py
3.IOX-XE上からalias作成
alias exec show-threat-fqdn guestshell run python3 domaincheck03.py
こちらのスクリプト(domaincheck03.py)の中身は、Pythonスクリプトが汚すぎて恥ずかしいのですが、一応、下記に載せておきます。
#!/usr/bin/python3
import requests
import string
import json
import base64
from requests.packages.urllib3.exceptions import InsecureRequestWarning
import os,sys
import re
from time import sleep
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
umbrella_token = '<Umbrella InvestigateのAPIトークン>'
virustotal_token = '<Virus TotalのAPIトークン>'
if not umbrella_token:
print("ERROR: environment variable \'INVESTIGATE_TOKEN\' not set. Invoke script with \'INVESTIGATE_TOKEN=%YourToken% python scripts.py\'")
sys.exit(1)
if not virustotal_token:
print("ERROR: environment variable \'VIRUSTOTAL_TOKEN\' not set. Invoke script with \'VIRUSTOTAL_TOKEN=%YourToken% python scripts.py\'")
sys.exit(1)
u_headers = {
'Authorization': 'Bearer ' + umbrella_token
}
v_headers = {
"Accept-Encoding": "gzip, deflate",
"User-Agent" : "gzip, My Python requests library example client or username"
}
def send_request_domains_categorization(value00):
host = "https://investigate.api.opendns.com/"
api = "domains/categorization/"
url = host + api + value00 + '?showLabels'
querystring = {".full": "true"}
response = requests.request("GET", url, headers=u_headers, params=querystring, verify=False)
message01 = value00 + ' Umbrella Investigate Score is following\n'
r_json = response.json()
if (len(str(r_json[value00]['status']))) > 0:
if str(r_json[value00]['status']) == '1':
print('Status: Benign')
message01 = message01 + 'Status: Benign\n'
elif str(r_json[value00]['status']) == '-1':
print('Status: Malicious')
message01 = message01 + 'Status: Malicious\n'
else:
print('Status: Not-classified')
message01 = message01 + 'Status: Not-classified\n'
if (len(str(r_json[value00]['security_categories']))) > 0:
if str(r_json[value00]['security_categories']) == '[]':
print('Security_categories: Not-classified')
message01 = message01 + 'Status: Not-classified\n'
else:
print('Security_categories:',str(r_json[value00]['security_categories']).split("'")[1])
message01 = message01 + 'Security_categories:' + str(r_json[value00]['security_categories']).split("'")[1] + '\n'
if (len(str(r_json[value00]['content_categories']))) > 0:
if str(r_json[value00]['content_categories']) == '[]':
print('Content_categories: Not-classified')
message01 = message01 + 'Content_categories: Not-classified\n'
else:
print('Content_categories:', str(r_json[value00]['content_categories']).replace("[","").replace("'","").replace("]",""))
message01 = message01 + 'Content_categories:' + str(r_json[value00]['content_categories']).replace("[","").replace("'","").replace("]","") + '\n'
def send_request_security(value00):
host = "https://investigate.api.opendns.com/"
api = "security/name/"
url = host + api + value00 + '.json'
querystring = {".full": "true"}
response = requests.request("GET", url, headers=u_headers, params=querystring, verify=False)
message02 = ''
r_json = response.json()
if 'dga_score' in r_json:
print('dga_score: ', r_json['dga_score'])
message02 = message02 + 'dga_score: ' + str(r_json['dga_score']) + '\n'
if 'perplexity' in r_json:
print('perplexity: ', r_json['perplexity'])
message02 = message02 + 'perplexity: ' + str(r_json['perplexity']) + '\n'
if 'securerank2' in r_json:
print('securerank2: ', r_json['securerank2'])
message02 = message02 + 'securerank2: ' + str(r_json['securerank2']) + '\n'
if 'pagerank' in r_json:
print('pagerank: ', r_json['pagerank'])
message02 = message02 + 'pagerank: ' + str(r_json['pagerank']) + '\n'
if 'asn_score' in r_json:
print('asn_score: ', r_json['asn_score'])
message02 = message02 + 'asn_score: ' + str(r_json['asn_score']) + '\n'
if 'prefix_score' in r_json:
print('prefix_score: ', r_json['prefix_score'])
message02 = message02 + 'prefix_score: ' + str(r_json['prefix_score']) + '\n'
if 'rip_score' in r_json:
print('rip_score: ', r_json['rip_score'])
message02 = message02 + 'rip_score: ' + str(r_json['rip_score']) + '\n'
if 'popularity' in r_json:
print('polularity: ', r_json['popularity'])
message02 = message02 + 'popularity: ' + str(r_json['popularity']) + '\n\n'
def send_request_retrieve_url(value00):
host = "http://www.virustotal.com/"
api = "vtapi/v2/url/report"
url = host + api
querystring = {'apikey': virustotal_token, 'resource':value00}
response = requests.request("POST", url, headers=v_headers, params=querystring, verify=False)
if response.status_code == 200:
message03 = value00 + ' Virustotal is following\n'
r_json = response.json()
if 'positives' in r_json:
if 'total' in r_json:
print('Positives:/Total:', r_json['positives'],'/',r_json['total'])
message03 = message03 + 'Positives:/Total:' + str(r_json['positives']) + '/' + str(r_json['total']) + '\n'
else:
print('Positives:/Total:', r_json['positives'],'/ Unknown')
message03 = message03 + 'Positives:/Total:' + str(r_json['positives']) + '/ Unknown' + '\n'
else:
print('Positives:/Total: Uknown')
message03 = message03 + 'Positives:/Total: Unknown' + '\n'
else:
print('Virustotal retrieve URL API Response Error')
message03 = 'Virustotal retrieve URL API Response Error'
def send_request_retrieve_domain(value00):
host = "http://www.virustotal.com/"
api = "vtapi/v2/domain/report"
url = host + api
querystring = {'apikey': virustotal_token, 'domain':value00}
response = requests.request("GET", url, headers=v_headers, params=querystring, verify=False)
if response.status_code == 200:
message04 = ''
r_json = response.json()
if 'Webutation domain info' in r_json:
print('Safety score:', r_json['Webutation domain info']['Safety score'])
message04 = message04 + 'Safety score:' + str(r_json['Webutation domain info']['Safety score']) + '\n'
print('Adult content:', r_json['Webutation domain info']['Adult content'])
message04 = message04 + 'Adult content:' + str(r_json['Webutation domain info']['Adult content']) + '\n'
print('Verdict:', r_json['Webutation domain info']['Verdict'])
message04 = message04 + 'Verdict:' + str(r_json['Webutation domain info']['Verdict']) + '\n'
else:
print('Webutation domain info: Unknown')
message04 = message04 + 'Webutation domain info: Unknown\n'
if 'Websense ThreatSeeker category' in r_json:
print('Websense ThreatSeeker category:', r_json['Websense ThreatSeeker category'])
message04 = message04 + 'Websense ThreatSeeker category:' + r_json['Websense ThreatSeeker category'] + '\n'
else:
print('Websense ThreatSeeker category: Unknown')
message04 = message04 + 'Websense ThreatSeeker category: Unknown\n'
if 'TrendMicro category' in r_json:
print('TrendMicro category:', r_json['TrendMicro category'])
message04 = message04 + 'TrendMicro category:' + r_json['TrendMicro category'] + '\n'
else:
print('TrendMicro category: Unknown')
message04 = message04 + 'TrendMicro category: Unknown\n'
if 'Dr.Web category' in r_json:
print('Dr.Web category:', r_json['Dr.Web category'])
message04 = message04 + 'Dr.Web category:' + r_json['Dr.Web category'] + '\n\n'
else:
print('Dr.Web category: Unknown')
message04 = message04 + 'Dr.Web category: Unknown\n\n'
else:
print('Virustoal retrieve Domain API Response Error')
message04 = 'Virustoal retrieve Domain API Response Error\n'
def main(fqdn):
value00 = fqdn
print(value00,'Umbrella Investigate Score is following')
send_request_domains_categorization(value00)
send_request_security(value00)
sleep(2)
print('')
print(value00,'Virustotal Score is following')
send_request_retrieve_url(value00)
send_request_retrieve_domain(value00)
if __name__ == '__main__':
args = sys.argv
if len(args) == 2:
fqdn = args[1]
main(fqdn)
else:
print('Please input fqdn.')
quit()
最後まで読んで頂きまして、まことにありがとうございます。
免責事項
本サイトおよび対応するコメントにおいて表明される意見は、投稿者本人の個人的意見であり、シスコの意見ではありません。本サイトの内容は、情報の提供のみを目的として掲載されており、シスコや他の関係者による推奨や表明を目的としたものではありません。各利用者は、本Webサイトへの掲載により、投稿、リンクその他の方法でアップロードした全ての情報の内容に対して全責任を負い、本Web サイトの利用に関するあらゆる責任からシスコを免責することに同意したものとします。