最近、音声をテキスト変換したものを要約してデータとして取込む必要があり、検討の為ににリクルートテクノロジーズが公開している文章の自動要約APIをpython3で動かしました。もとがpython2.7で動作するものなので、python3で動かせればなと考えていると方も多いかと思い共有します。
リクルートテクノロジーズのGitHub
今回アルゴリズムはlexrankのみ変換、Webでのアクセスのみ確認しています。注釈は削ってあります。
##環境
OS | windows10Home | |
python | Winpython64-3.7.7.1.exe | |
Mecab | MeCab 0.996 64bit version | https://github.com/ikegami-yukino/mecab/releases/tag/v0.996 |
python追加モジュール | CherryPy==18.6.0、mecab==0.996.2 |
summpy-master
├─server.bat
└─summpy
├─lexrank.py
├─server.py
├─tools.py
├─misc
│ ├─divrank.py
│ └─mecab_segmenter.py
└─server_data
└─test.html
##ソース
server.py
#!/usr/bin/env python
# coding: utf-8
import sys
import os
import re
import getopt
import cherrypy
import json
path = os.getcwd() #Win対応
sys.path.append(path) #Win対応
from summpy import tools #Win対応
class Summarizer(object):
def __init__(self):
self.summarizers = {}
def get_summarizer(self, name):
if name in self.summarizers:
pass
elif name == 'lexrank':
from summpy import lexrank
self.summarizers[name] = lexrank.summarize
elif name == 'mcp':
from summpy import mcp_summ
self.summarizers[name] = mcp_summ.summarize
return self.summarizers[name]
@cherrypy.expose
def summarize(self, text=None, algo='lexrank', **summarizer_params):
try: # TODO: generate more useful error message
# fix parameter type
for param, value in list(summarizer_params.items()):
if value == '':
del summarizer_params[param]
continue
elif re.match(r'^\d*.\d+$', value):
value = float(value)
elif re.match(r'^\d+$', value):
value = int(value)
elif value == 'true':
value = True
elif value == 'false':
value = False
summarizer_params[param] = value
if algo in ('lexrank', 'clexrank', 'divrank'):
summarizer = self.get_summarizer('lexrank')
if algo == 'clexrank':
summarizer_params['continuous'] = True
if algo == 'divrank':
summarizer_params['use_divrank'] = True
elif algo == 'mcp':
summarizer = self.get_summarizer('mcp')
summary, debug_info = summarizer(text, **summarizer_params) # **XXXXXは任意の数のキーワード引数を許可し、XXXXXという名前の辞書
except Exception as e:
print(str(e))
return json.dumps({'error': str(e)}, ensure_ascii=False, indent=2)
else:
res = json.dumps(
tools.tree_encode({
'summary': summary, 'debug_info': debug_info
}),
ensure_ascii=False, indent=2
)
return res.encode('utf8') # 修正https://stackoverflow.com/questions/20215147/python-cherrypy-500-valueerror-page-handlers-must-return-bytes
if __name__ == '__main__':
options, args = getopt.getopt(sys.argv[1:], 'h:p:')
options = dict(options)
host, port = options['-h'], int(options['-p'])
cherrypy.config.update({
'server.socket_host': host,
'server.socket_port': port
})
conf = {
'/': {
'tools.staticdir.root': path
},
'/summarize': {
'tools.response_headers.on': True,
'tools.response_headers.headers': [
('Content-type', 'application/json')
]
},
'/static': {
'tools.staticdir.on': True,
'tools.staticdir.dir': 'summpy\\server_data', #Win対応
'tools.response_headers.headers': [
('Content-type', 'application/json')
]
}
}
cherrypy.quickstart(Summarizer(), '/', conf)
lexrank.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys,os
import getopt
import codecs
import collections
import numpy
import networkx
from sklearn.feature_extraction import DictVectorizer
from sklearn.metrics import pairwise_distances
path = os.getcwd() #Win対応
sys.path.append(path) #Win対応
from summpy import tools #Win対応
from summpy.misc.divrank import divrank, divrank_scipy #Win対応
def lexrank(sentences, continuous=False, sim_threshold=0.1, alpha=0.9,
use_divrank=False, divrank_alpha=0.25):
# configure ranker
ranker_params = {'max_iter': 1000}
if use_divrank:
ranker = divrank_scipy
ranker_params['alpha'] = divrank_alpha
ranker_params['d'] = alpha
else:
ranker = networkx.pagerank_scipy
ranker_params['alpha'] = alpha
graph = networkx.DiGraph()
# sentence -> tf
sent_tf_list = []
for sent in sentences:
words = tools.word_segmenter_ja(sent)
tf = collections.Counter(words)
sent_tf_list.append(tf)
sent_vectorizer = DictVectorizer(sparse=True)
sent_vecs = sent_vectorizer.fit_transform(sent_tf_list)
# compute similarities between senteces
sim_mat = 1 - pairwise_distances(sent_vecs, sent_vecs, metric='cosine')
if continuous:
linked_rows, linked_cols = numpy.where(sim_mat > 0)
else:
linked_rows, linked_cols = numpy.where(sim_mat >= sim_threshold)
# create similarity graph
graph.add_nodes_from(list(range(sent_vecs.shape[0])))
for i, j in zip(linked_rows, linked_cols):
if i == j:
continue
weight = sim_mat[i,j] if continuous else 1.0
# graph.add_edge(i, j, {'weight': weight}) # 変更
graph.add_edge(i, j)
scores = ranker(graph, **ranker_params)
return scores, sim_mat
def summarize(text, sent_limit=None, char_limit=None, imp_require=None,
debug=False, **lexrank_params):
debug_info = {}
sentences = list(tools.sent_splitter_ja(text))
scores, sim_mat = lexrank(sentences, **lexrank_params)
sum_scores = sum(scores.values())
acc_scores = 0.0
indexes = set()
num_sent, num_char = 0, 0
for i in sorted(scores, key=lambda i: scores[i], reverse=True):
num_sent += 1
num_char += len(sentences[i])
if sent_limit is not None and num_sent > sent_limit:
break
if char_limit is not None and num_char > char_limit:
break
if imp_require is not None and acc_scores / sum_scores >= imp_require:
break
indexes.add(i)
acc_scores += scores[i]
if len(indexes) > 0:
summary_sents = [sentences[i] for i in sorted(indexes)]
else:
summary_sents = sentences
if debug:
debug_info.update({
'sentences': sentences, 'scores': scores
})
return summary_sents, debug_info
if __name__ == '__main__':
_usage = '''
Usage:
python lexrank.py -f <file_name> [-e <encoding> ]
[ -v lexrank | clexrank | divrank ]
[ -s <sent_limit> | -c <char_limit> | -i <imp_required> ]
Args:
-f: plain text file to be summarized
-e: input and output encoding (default: utf-8)
-v: variant of LexRank (default is 'lexrank')
-s: summary length (the number of sentences)
-c: summary length (the number of charactors)
-i: cumulative LexRank score [0.0-1.0]
'''.strip()
options, args = getopt.getopt(sys.argv[1:], 'f:e:v:s:c:i:')
options = dict(options)
if len(options) < 2:
print(_usage)
sys.exit(0)
fname = options['-f']
encoding = options['-e'] if '-e' in options else 'utf-8'
variant = options['-v'] if '-v' in options else 'lexrank'
sent_limit = int(options['-s']) if '-s' in options else None
char_limit = int(options['-c']) if '-c' in options else None
imp_require = float(options['-i']) if '-i' in options else None
if fname == 'stdin':
text = '\n'.join(
line for line in sys.stdin.readlines()
).decode(encoding)
else:
text = codecs.open(fname, encoding=encoding).read()
lexrank_params = {}
if variant == 'clexrank':
lexrank_params['continuous'] = True
if variant == 'divrank':
lexrank_params['use_divrank'] = True
sentences, debug_info = summarize(
text, sent_limit=sent_limit, char_limit=char_limit,
imp_require=imp_require, **lexrank_params
)
for sent in sentences:
print(sent.strip().encode(encoding))
tools.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os,sys
import re
import json
path = os.getcwd() #Win対応
sys.path.append(path) #Win対応
def tree_encode(obj, encoding='utf-8'):
type_ = type(obj)
if type_ == list or type_ == tuple:
return [tree_encode(e, encoding) for e in obj]
elif type_ == dict:
new_obj = dict(
(tree_encode(k, encoding), tree_encode(v, encoding))
for k, v in obj.items()
)
return new_obj
elif type_ == str: # unicode:⇒str:へ自動変換
# return obj.encode(encoding) #削除
return obj
else:
return obj
def sent_splitter_ja(text, delimiters=set('。.?!\n\r'),
parenthesis='()「」『』“”'):
paren_chars = set(parenthesis)
close2open = dict(list(zip(parenthesis[1::2], parenthesis[0::2])))
pstack = []
buff = []
for i, c in enumerate(text):
c_next = text[i+1] if i+1 < len(text) else None
# check correspondence of parenthesis
if c in paren_chars:
if c in close2open: # close
if len(pstack) > 0 and pstack[-1] == close2open[c]:
pstack.pop()
else: # open
pstack.append(c)
buff.append(c)
if c in delimiters:
if len(pstack) == 0 and c_next not in delimiters:
yield ''.join(buff)
buff = []
if len(buff) > 0:
yield ''.join(buff)
if os.environ.get('SUMMPY_USE_JANOME') is not None:
from summpy.misc.janome_segmenter import word_segmenter_ja
else:
try:
from summpy.misc.mecab_segmenter import word_segmenter_ja
except ImportError:
from summpy.misc.janome_segmenter import word_segmenter_ja
if __name__ == '__main__':
pass
divrank.py(変更はありません。)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import networkx as nx
from networkx.exception import NetworkXError
from networkx.utils import not_implemented_for
@not_implemented_for('multigraph')
def divrank(G, alpha=0.25, d=0.85, personalization=None,
max_iter=100, tol=1.0e-6, nstart=None, weight='weight',
dangling=None):
'''
Returns the DivRank (Diverse Rank) of the nodes in the graph.
This code is based on networkx.pagerank.
Args: (diff from pagerank)
alpha: controls strength of self-link [0.0-1.0]
d: the damping factor
Reference:
Qiaozhu Mei and Jian Guo and Dragomir Radev,
DivRank: the Interplay of Prestige and Diversity in Information Networks,
http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.174.7982
'''
if len(G) == 0:
return {}
if not G.is_directed():
D = G.to_directed()
else:
D = G
# Create a copy in (right) stochastic form
W = nx.stochastic_graph(D, weight=weight)
N = W.number_of_nodes()
# self-link (DivRank)
for n in W.nodes_iter():
for n_ in W.nodes_iter():
if n != n_ :
if n_ in W[n]:
W[n][n_][weight] *= alpha
else:
if n_ not in W[n]:
W.add_edge(n, n_)
W[n][n_][weight] = 1.0 - alpha
# Choose fixed starting vector if not given
if nstart is None:
x = dict.fromkeys(W, 1.0 / N)
else:
# Normalized nstart vector
s = float(sum(nstart.values()))
x = dict((k, v / s) for k, v in list(nstart.items()))
if personalization is None:
# Assign uniform personalization vector if not given
p = dict.fromkeys(W, 1.0 / N)
else:
missing = set(G) - set(personalization)
if missing:
raise NetworkXError('Personalization dictionary '
'must have a value for every node. '
'Missing nodes %s' % missing)
s = float(sum(personalization.values()))
p = dict((k, v / s) for k, v in list(personalization.items()))
if dangling is None:
# Use personalization vector if dangling vector not specified
dangling_weights = p
else:
missing = set(G) - set(dangling)
if missing:
raise NetworkXError('Dangling node dictionary '
'must have a value for every node. '
'Missing nodes %s' % missing)
s = float(sum(dangling.values()))
dangling_weights = dict((k, v/s) for k, v in list(dangling.items()))
dangling_nodes = [n for n in W if W.out_degree(n, weight=weight) == 0.0]
# power iteration: make up to max_iter iterations
for _ in range(max_iter):
xlast = x
x = dict.fromkeys(list(xlast.keys()), 0)
danglesum = d * sum(xlast[n] for n in dangling_nodes)
for n in x:
D_t = sum(W[n][nbr][weight] * xlast[nbr] for nbr in W[n])
for nbr in W[n]:
#x[nbr] += d * xlast[n] * W[n][nbr][weight]
x[nbr] += (
d * (W[n][nbr][weight] * xlast[nbr] / D_t) * xlast[n]
)
x[n] += danglesum * dangling_weights[n] + (1.0 - d) * p[n]
# check convergence, l1 norm
err = sum([abs(x[n] - xlast[n]) for n in x])
if err < N*tol:
return x
raise NetworkXError('divrank: power iteration failed to converge '
'in %d iterations.' % max_iter)
def divrank_scipy(G, alpha=0.25, d=0.85, personalization=None,
max_iter=100, tol=1.0e-6, nstart=None, weight='weight',
dangling=None):
'''
Returns the DivRank (Diverse Rank) of the nodes in the graph.
This code is based on networkx.pagerank_scipy
'''
import scipy.sparse
N = len(G)
if N == 0:
return {}
nodelist = G.nodes()
M = nx.to_scipy_sparse_matrix(G, nodelist=nodelist, weight=weight,
dtype=float)
S = scipy.array(M.sum(axis=1)).flatten()
S[S != 0] = 1.0 / S[S != 0]
Q = scipy.sparse.spdiags(S.T, 0, *M.shape, format='csr')
M = Q * M
# self-link (DivRank)
M = scipy.sparse.lil_matrix(M)
M.setdiag(0.0)
M = alpha * M
M.setdiag(1.0 - alpha)
#print M.sum(axis=1)
# initial vector
x = scipy.repeat(1.0 / N, N)
# Personalization vector
if personalization is None:
p = scipy.repeat(1.0 / N, N)
else:
missing = set(nodelist) - set(personalization)
if missing:
raise NetworkXError('Personalization vector dictionary '
'must have a value for every node. '
'Missing nodes %s' % missing)
p = scipy.array([personalization[n] for n in nodelist],
dtype=float)
p = p / p.sum()
# Dangling nodes
if dangling is None:
dangling_weights = p
else:
missing = set(nodelist) - set(dangling)
if missing:
raise NetworkXError('Dangling node dictionary '
'must have a value for every node. '
'Missing nodes %s' % missing)
# Convert the dangling dictionary into an array in nodelist order
dangling_weights = scipy.array([dangling[n] for n in nodelist],
dtype=float)
dangling_weights /= dangling_weights.sum()
is_dangling = scipy.where(S == 0)[0]
# power iteration: make up to max_iter iterations
for _ in range(max_iter):
xlast = x
D_t = M * x
x = (
d * (x / D_t * M * x + sum(x[is_dangling]) * dangling_weights)
+ (1.0 - d) * p
)
# check convergence, l1 norm
err = scipy.absolute(x - xlast).sum()
if err < N * tol:
return dict(list(zip(nodelist, list(map(float, x)))))
raise NetworkXError('divrank_scipy: power iteration failed to converge '
'in %d iterations.' % max_iter)
if __name__ == '__main__':
g = nx.Graph()
# this network appears in the reference.
edges = {
1: [2, 3, 6, 7, 8, 9],
2: [1, 3, 10, 11, 12],
3: [1, 2, 15, 16, 17],
4: [11, 13, 14],
5: [17, 18, 19, 20],
6: [1],
7: [1],
8: [1],
9: [1],
10: [2],
11: [4],
12: [2],
13: [4],
14: [4],
15: [3],
16: [3],
17: [3, 5],
18: [5],
19: [5],
20: [5]
}
for u, vs in edges.items():
for v in vs:
g.add_edge(u, v)
scores = nx.pagerank(g)
print('# PageRank')
print('# rank: node score')
#print sum(scores.values())
for i, n in enumerate(sorted(scores, key=lambda n: scores[n], reverse=True)):
print('# {}: {} {}'.format(i+1, n, scores[n]))
scores = divrank(g)
print('\n# DivRank')
#print sum(scores.values())
print('# rank: node score')
for i, n in enumerate(sorted(scores, key=lambda n: scores[n], reverse=True)):
print('# {}: {} {}'.format(i+1, n, scores[n]))
scores = divrank_scipy(g)
print('\n# DivRank (scipy)')
#print sum(scores.values())
print('# rank: node score')
for i, n in enumerate(sorted(scores, key=lambda n: scores[n], reverse=True)):
print('# {}: {} {}'.format(i+1, n, scores[n]))
mecab_segmenter.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import re
import MeCab
_mecab = MeCab.Tagger()
# 品詞,品詞細分類1,品詞細分類2,品詞細分類3,活用形,活用型,原形,読み,発音
_mecab_feat_labels = 'pos cat1 cat2 cat3 conj conj_t orig read pron'.split(' ')
def _mecab_parse_feat(feat):
return dict(list(zip(_mecab_feat_labels, feat.split(','))))
def _mecab_node2seq(node, decode_surface=True, feat_dict=True,
mecab_encoding='utf-8'):
# MeCab.Nodeはattributeを変更できない。
while node:
if decode_surface:
# node._surface = node.surface.decode(mecab_encoding) 修正
node._surface = node.surface
if feat_dict: # 品詞の情報をdictで保存
node.feat_dict = _mecab_parse_feat(
# node.feature.decode(mecab_encoding) 修正
node.feature
)
yield node
node = node.next
def is_stopword(n): # <- mecab node
if len(n._surface) == 0:
return True
elif re.search(r'^[\s!-@\[-`\{-~ 、-〜!-@[-`]+$', n._surface): #ur⇒rに修正以下三行
return True
elif re.search(r'^(接尾|非自立)', n.feat_dict['cat1']):
return True
elif 'サ変・スル' == n.feat_dict['conj'] or 'ある' == n.feat_dict['orig']:
return True
elif re.search(r'^(名詞|動詞|形容詞)', n.feat_dict['pos']):
return False
else:
return True
def not_stopword(n): # <- mecab node
return not is_stopword(n)
def node2word(n): # <- mecab node
return n._surface
def node2norm_word(n): # mecab node
if n.feat_dict['orig'] != '*':
return n.feat_dict['orig']
else:
return n._surface
def word_segmenter_ja(sent, node_filter=not_stopword,
node2word=node2norm_word, mecab_encoding='utf-8'):
#if type(sent) == str: 削除
# sent = sent.encode(mecab_encoding) 削除
nodes = list(
_mecab_node2seq(_mecab.parseToNode(sent))
)
if node_filter:
nodes = [n for n in nodes if node_filter(n)]
words = [node2word(n) for n in nodes]
return words
if __name__ == '__main__':
text = '今日はいい天気ですね。'
print('|'.join(word_segmenter_ja(text))) #.encode('utf-8')を削除
test.html(基本、変更はありません。テキストサイズやajaxのエラーを追加しました。)
<html>
<head>
<meta charset="UTF-8">
</head>
<body>
<textarea type="text" name="text" rows="20" cols="70"></textarea>
<br>
algorithm (lexrank|clexrank|divrank|mcp): <input type="text" value="lexrank" name="algo" /><br>
length (the number of sentences): <input type="text" value="3" name="sent_limit" /><br>
length (the number of chars): <input type="text" value="" name="char_limit" /><br>
cumulative LexRank score: <input type="text" value="" name="imp_require" /><br>
<button id="summarize">summarize</button>
<br>
<div id="out"></div>
<script src="http://code.jquery.com/jquery-2.0.3.min.js"></script>
<script type="text/javascript">
$(document).ready(function () {
$("#summarize").click(function (e) {
var text = $("textarea[name='text']").val();
var params = {
text: text,
algo: $("input[name='algo']").val(),
sent_limit: $("input[name='sent_limit']").val(),
char_limit: $("input[name='char_limit']").val(),
imp_require: $("input[name='imp_require']").val(),
debug: true
};
$.post("/summarize", params)
.done(function (res) {
var sentences = res.summary;
var debug_info = res.debug_info;
var out = $("#out");
var summ_length = 0;
out.empty();
sentences.forEach(function (s) {
summ_length += s.length;
out.append("<p>" + s + "</p>");
});
var summ_rate = summ_length / text.length;
out.prepend(
'<p style="color:blue">'
+ '要約率: ' + summ_rate
+ ' (' + summ_length + '/' + text.length + ' 文字)'
+ '</p>'
);
}).fail((jqXHR, textStatus, errorThrown) => {
alert("error" + jqXHR + "/" + textStatus + "/" + errorThrown)
})
});
});
</script>
</body>
</html>
その他
【server.bat】
python -m summpy.server -h 127.0.0.1 -p 8000
【ブラウザからアクセスする際のurl】
http://127.0.0.1:8000/static/test.html
gitにあげれば良いとは思いますが・・・。久々の投稿でした。