前回でテンプレートができたと思うので、なにかと使いそうなBase64を扱うコマンドを作ってみる。
nkfで十分という話はあります。
@uneyamauneko さん、python3でもいけましたっけか?
#Pythonによるコード検証
import base64 as b64
message='test message is utf-8 text'
message.encode()
enc_text=b64.b64encode(message.encode())
enc_text.decode()
出力'dGVzdCBtZXNzYWdlIGlzIHV0Zi04IHRleHQ='
enc_message='dGVzdCBtZXNzYWdlIGlzIHV0Zi04IHRleHQ='
dec_text=b64.b64decode(enc_message.encode())
dec_text.decode()
出力'test message is utf-8 text'
このように、Pythonのbase64
はバイト列を扱うのでencode()``decode()
を適宜やってあげる必要がある。
#Commands.conf
[base64]
chunked = true
filename = base64encdec.py
$SPLUNK_HOME/etc/<<APPS>>/default/
に配置する。
ファイル名はimport
するモジュールと一緒にしてはいけません
base64.py
にしていて結構はまった・・
#Code
#!/usr/bin/env python
import sys, base64
from splunklib.searchcommands import dispatch, StreamingCommand, Configuration, Option, validators
@Configuration()
class base64Command(StreamingCommand):
""" Base64 encode and decode text
##Syntax
.. code-block::
base64 output=<field> action=<enc|dec> <field>
##Description
Outputs the string of the input field with base64 encoding/decoding.
##Example
Encode
.. code-block::
| makeresults | base64 output=enc_time action=enc _time
Decode
.. code-block::
| makeresults | eval enc_text = "cHl0aG9uIGlzIGRpZmZpY3VsdCBmb3IgbWU=" | base64 output=plain_text action=dec enc_text
"""
output = Option(
doc='''
**Syntax:** **output=***<fieldname>*
**Description:** Name of the field that will hold the output text''',
require=True, validate=validators.Fieldname())
action = Option(
doc='''
**Syntax:** **action=***<enc|dec>*
**Description:** Name of the action in encoding/decoding''',
require=True, validate=validators.Set('enc','dec'))
def stream(self, records):
self.logger.debug('base64Command: %s', self) # logs command line
for record in records:
for fieldname in self.fieldnames:
pass
if self.action == "enc":
record[self.output]=base64.b64encode(record[fieldname].encode()).decode()
else:
record[self.output]=base64.b64decode(record[fieldname].encode()).decode()
yield record
dispatch(base64Command, sys.argv, sys.stdin, sys.stdout, __name__)
$SPLUNK_HOME/etc/<<APPS>>/bin/
に配置する。
lambda関数を使ってみた
#!/usr/bin/env python
import sys, base64
from splunklib.searchcommands import dispatch, StreamingCommand, Configuration, Option, validators
@Configuration()
class base64Command(StreamingCommand):
""" Base64 encode and decode text
##Syntax
.. code-block::
base64 output=<field> action=<enc|dec> <field>
##Description
Outputs the string of the input field with base64 encoding/decoding.
##Example
Encode
.. code-block::
| makeresults | base64 output=enc_time action=enc _time
Decode
.. code-block::
| makeresults | eval enc_text = "cHl0aG9uIGlzIGRpZmZpY3VsdCBmb3IgbWU=" | base64 output=plain_text action=dec enc_text
"""
output = Option(
doc='''
**Syntax:** **output=***<fieldname>*
**Description:** Name of the field that will hold the output text''',
require=True, validate=validators.Fieldname())
action = Option(
doc='''
**Syntax:** **action=***<enc|dec>*
**Description:** Name of the action in encoding/decoding''',
require=True, validate=validators.Set('enc','dec'))
def stream(self, records):
self.logger.debug('base64Command: %s', self) # logs command line
enc = lambda x: base64.b64encode(x.encode()).decode()
dec = lambda x: base64.b64decode(x.encode()).decode()
for record in records:
record[self.output]=enc(record[self.fieldnames[0]]) if self.action == "enc" else dec(record[self.fieldnames[0]])
yield record
dispatch(base64Command, sys.argv, sys.stdin, sys.stdout, __name__)
すっきりした。
内包表記ができないかと色々やってみたけど、結局諦めた。
for record in records:
yield record
この処理で、イベント毎に処理を行って、逐次出力している。
(x in record for records)
と書けそうだけど、結局ループで出力しないといけないので無理だった。
lambda関数を使ってみた ではできるだけすっきり書いてみた。
#試行
| makeresults
| base64 output=e_time action=enc _time
| base64 output=d_time action=dec e_time
_time | e_time | d_time |
---|---|---|
2021/01/02 16:17:55 | MTYwOTU3MTg3NQ== | 1609571875 |
#解説
##Option
今回validators.Set()
を使ってみた。情報まったくなし。
Githubのコード、validators.pyをみてこれかな〜と動かしてみて、なんとかなった。
ただ、enc
やdec
以外の時は
2 errors occurred while the search was executing. Therefore, search results might be incomplete.
- A value for "action" is required
- Illegal value: action=de
としか出ないので、微妙といえば微妙。
searchbnf.confをきちんとしないとね。
##if
オプションの評価をしっかりしているので、action
の判定は二択
ワンライナーを作ってしまえば前回と一緒の形にできる。
#もっといいかも
SlackでひろったGeorge Starcher作のコマンド
import csv
import sys, os
if sys.argv[0] == '':
mypath='.'
else:
mypath=os.path.dirname(sys.argv[0])
sys.path.append(os.path.join(mypath,'lib'))
try:
import base64
except ImportError as e:
raise(e)
def decode_value(value):
value_acsii = ""
try:
base64_bytes = value.encode('ascii')
message_bytes = base64.b64decode(base64_bytes)
value_ascii = message_bytes.decode('ascii')
except Exception as e:
pass
return(value_ascii)
def main():
if len(sys.argv) != 2:
print("Usage: python external_b64decode.py [b64 field]")
sys.exit(1)
valuefield = sys.argv[1]
infile = sys.stdin
outfile = sys.stdout
r = csv.DictReader(infile)
header = r.fieldnames
w = csv.DictWriter(outfile, fieldnames=r.fieldnames)
w.writeheader()
for result in r:
value = result.get(valuefield)
value_ascii = decode_value(value)
result['value_ascii'] = value_ascii
w.writerow(result)
main()
これをみるとSplunk python SDKがなくてもコマンドが作れることがよくわかる。
#まとめ
base64については、DECRYPTといったAppsがあったりするので、今更感はあります。
ただ、コードを書いてみるとわかることもいっぱいあるので、勉強を兼ねてやっていこうと思います。
https://github.com/bentleymi/ta-webtools
とか結構新そう
.conf17のスライドでカスタムサーチコマンドの作り方の紹介がされていた。