LoginSignup
8
13

More than 5 years have passed since last update.

Qiitaの記事をRDFに変換

Posted at

目的

この記事の目的はQiitaの記事を
1. スクレイピングで取得して
2. LOD(Linked Open Data)の形式の1つであるRDFデータで保存して
3. 読み込む
までの方法を紹介します。

Qiitaの記事をスクレイピングで取得

①. ユーザー登録
まずは、Qiitaにユーザ登録します。
https://qiita.com/ の画面右上の「ユーザ登録」から、ユーザ情報を登録します。
②.アクセストークン取得
Qiitaの記事データを大量に取得するためには、アクセストークンの取得が必要です。
以下の手順で簡単に取得できます。
1. まずはアカウントマークから「設定」を開き
2. 次いで「アプリケーション」を開き、
3. 最後に「新しくトークンを発行する」を押します。
詳細は以下の記事の方が詳しいです。
Qiitaの記事データは、機械学習のためのデータセットに向いている
③.Qiita記事データをスクレイピングする
Pythonで、Qiitaの記事データをスクレイピングします。

ここで、②で取得したアクセストークンを使います。
注意点は、Qiita APIでは、1回のアクセスで最大100件の記事情報しか取得できない点と、1時間あたりのアクセス回数が1000回までに制限されている点です
上記を意識して、Qiitaの記事データを収集するにあたり、以下のように準備します。

GetData1.py
# Qiitaデータの収集
import time
import pandas as pd

url = 'https://qiita.com/api/v2/items'  # アクセス先
h = {'Authorization': 'Bearer XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'}  # 取得したアクセストークン

start = '2017-10-15'  # 取得開始日
end = '2018-10-16'  # 取得終了日

date_list = [d.strftime('%Y-%m-%d') for d in pd.date_range(start, end)]  # date_rangeで日付を補完する
start_list = date_list[:-1]
end_list = date_list[1:]

result_dir_path = '~/Qiita_LOD/data'  # 任意のフォルダ名称(出力先)
sleep_sec = 3.6  # 1時間に1000回までのリクエスト制限を超過しないようにするため

print(start_list)  # 確認用
print(end_list)  # 確認用

次いでスクレイピングされた辞書形式のデータから必要なものだけを抽出する関数を定義します。

GetData2.py
def get_simple_df(df):
    tags_list = []
    for tag in df['tags']:
        tags_list.append(tag['name'])
    tags_str = ','.join(tags_list)
    df['tags_str'] = tags_str
    columns = ['id', 'title', 'body', 'created_at', 'updated_at','likes_count', 'comments_count', 'tags_str','user']
    ele_list = []
    for column_name in columns:
        ele_list.append(df[column_name])
    #===
    assert len(ele_list)==len( columns),"lenght do not much! {0}".format(len(ele_list))
    return ele_list

最後に、データを収集してCSVファイルに保存します。

GetData3.py
# データ取得する
import os
import requests
import numpy as np

# 指定した日付けの区間で繰り返し実行
for start, end in zip(start_list, end_list):
    p = {
        'per_page': 100,  # 最大100件まで
        'query': 'created_at:>{0} created_at:<{1}'.format(start, end)  # 作成日毎で抽出
    }
    print("created_date %s : page 1" % start)
    time.sleep(sleep_sec)  # アクセス制限対策
    r = requests.get(url, params=p, headers=h)# 記事データ取得
    total_count = int(r.headers['Total-Count'])# 取得上限対策
    # 取得なし対応
    if total_count == 0:
        print("count is 0!"*5)
        continue
    df_list = []
    for json_dict in r.json():
        ele_list = get_simple_df(json_dict)
        df_list.append(ele_list)
    print("list shape =",np.shape(df_list))
    column_name=['id', 'title', 'body', 'created_at', 'updated_at','likes_count', 'comments_count', 'tags_str','user']
    df = pd.DataFrame(df_list, columns=column_name)
    df.to_csv(result_dir_path+"/"+start+".csv", index=False)

上記のコードをまとめたものが下のものになります。

GetData.py
# Qiitaデータの収集
import time
import pandas as pd

url = 'https://qiita.com/api/v2/items'  # アクセス先
h = {'Authorization': 'Bearer XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'}  # 取得したアクセストークン

start = '2017-10-15'  # 取得開始日
end = '2018-10-16'  # 取得終了日

date_list = [d.strftime('%Y-%m-%d') for d in pd.date_range(start, end)]  # date_rangeで日付を補完する
start_list = date_list[:-1]
end_list = date_list[1:]

result_dir_path = '~/Qiita_LOD/data'  # 任意のフォルダ名称(出力先)
sleep_sec = 3.6  # 1時間に1000回までのリクエスト制限を超過しないようにするため

print(start_list)  # 確認用
print(end_list)  # 確認用

def get_simple_df(df):
    tags_list = []
    for tag in df['tags']:
        tags_list.append(tag['name'])
    tags_str = ','.join(tags_list)
    df['tags_str'] = tags_str
    columns = ['id', 'title', 'body', 'created_at', 'updated_at','likes_count', 'comments_count', 'tags_str','user']
    ele_list = []
    for column_name in columns:
        ele_list.append(df[column_name])
    #===
    assert len(ele_list)==len( columns),"lenght do not much! {0}".format(len(ele_list))
    return ele_list
# データ取得する
import os
import requests
import numpy as np

# 指定した日付けの区間で繰り返し実行
for start, end in zip(start_list, end_list):
    p = {
        'per_page': 100,  # 最大100件まで
        'query': 'created_at:>{0} created_at:<{1}'.format(start, end)  # 作成日毎で抽出
    }
    print("created_date %s : page 1" % start)
    time.sleep(sleep_sec)  # アクセス制限対策
    r = requests.get(url, params=p, headers=h)# 記事データ取得
    total_count = int(r.headers['Total-Count'])# 取得上限対策
    # 取得なし対応
    if total_count == 0:
        print("count is 0!"*5)
        continue
    df_list = []
    for json_dict in r.json():
        ele_list = get_simple_df(json_dict)
        df_list.append(ele_list)
    print("list shape =",np.shape(df_list))
    column_name=['id', 'title', 'body', 'created_at', 'updated_at','likes_count', 'comments_count', 'tags_str','user']
    df = pd.DataFrame(df_list, columns=column_name)
    df.to_csv(result_dir_path+"/"+start+".csv", index=False)

RDFに変換

GetData.pyでCSV形式保存されたQiitaの記事を読み込み、1つのRDFファイルにまとめます。
最初にCSV読み込みのためのライブラリとしてpandas、PDFへの変換のためのライブラリとしてrdflibをインポートします。

Conv2LOD1.py
import pandas as pd
import rdflib
from rdflib import ConjunctiveGraph, URIRef, RDFS, Literal
from rdflib.namespace import SKOS

そして、rdflibは文法規則が厳しいので記事データをクリーニングするコードを書きます。このコードではクリーニングする順番が非常に重要なのでリストを使っています。

Conv2LOD2.py
original_list = [r'I\'m',r'\"',r'\{\'',r'\'\}',r'\':',r':\s\'',r',\s\'',r'\',',r'False',r'\\r',r'\n'
                 ,r'\\x',r'None']
replaced_list = ['I am','\'','{\"','\"}','\":',': \"',', \"','\",',"\"False\"",'',''
                ,'',"\"None\""]
def dictClean(text):
    for original,replaced in zip(original_list,replaced_list):
        text = re.sub(original,replaced,text)
    return(text)

rdflibで保存するためにDataFrame形式で読み込まれた記事データを基に、1日分の記事たちのrdfであるgraphと全ての記事のrdfであるfull_graphにデータを追加します。

Conv2LOD3.py
def makeXML(df,full_graph):
    base_url = "https://qiita.com"
    column_list = ["id","title","created_at","updated_at","likes_count","comments_count"]
    graph = ConjunctiveGraph()
    for i,row in df.iterrows():
        node = rdflib.BNode()
        for column_name in column_list:
            val = base_url+"/"+str(df[column_name][i])
            graph.add((node,rdflib.term.URIRef(base_url+"/"+column_name),Literal(val)))
            full_graph.add((node,rdflib.term.URIRef(base_url+"/"+column_name),Literal(val)))
        str_dict = df["user"][i]
        str_dict = dictClean(str_dict)
        str_dict = json.loads(str_dict)
        userName = str_dict["id"]
        graph.add((node,rdflib.term.URIRef(base_url+"/user"),Literal(userName)))
        full_graph.add((node,rdflib.term.URIRef(base_url+"/"+column_name),Literal(val)))
        url = base_url+"/"+userName+"/items/"+df["id"][i]
        graph.add((node,rdflib.term.URIRef(base_url+"/url"),Literal(url)))
        full_graph.add((node,rdflib.term.URIRef(base_url+"/url"),Literal(url)))
    return(graph.serialize().decode())

上記のコードをまとめたものが下のものになります。

Conv2LOD.py
import os
import pandas as pd
import signal
import rdflib
from rdflib import ConjunctiveGraph, URIRef, RDFS, Literal
from rdflib.namespace import SKOS
from pprint import pprint
import json
import re
signal.signal(signal.SIGINT, signal.SIG_DFL)
dataDir = "./data"
from collections import OrderedDict
#==========================
original_list = [r'I\'m',r'\"',r'\{\'',r'\'\}',r'\':',r':\s\'',r',\s\'',r'\',',r'False',r'\\r',r'\n'
                 ,r'\\x',r'None']
replaced_list = ['I am','\'','{\"','\"}','\":',': \"',', \"','\",',"\"False\"",'',''
                ,'',"\"None\""]
def dictClean(text):
    for original,replaced in zip(original_list,replaced_list):
        text = re.sub(original,replaced,text)
    return(text)

def makeXML(df,full_graph):
    base_url = "https://qiita.com"
    column_list = ["id","title","created_at","updated_at","likes_count","comments_count"]
    graph = ConjunctiveGraph()
    for i,row in df.iterrows():
        node = rdflib.BNode()
        for column_name in column_list:
            val = base_url+"/"+str(df[column_name][i])
            graph.add((node,rdflib.term.URIRef(base_url+"/"+column_name),Literal(val)))
            full_graph.add((node,rdflib.term.URIRef(base_url+"/"+column_name),Literal(val)))
        str_dict = df["user"][i]
        str_dict = dictClean(str_dict)
        str_dict = json.loads(str_dict)
        userName = str_dict["id"]
        graph.add((node,rdflib.term.URIRef(base_url+"/user"),Literal(userName)))
        full_graph.add((node,rdflib.term.URIRef(base_url+"/"+column_name),Literal(val)))
        url = base_url+"/"+userName+"/items/"+df["id"][i]
        graph.add((node,rdflib.term.URIRef(base_url+"/url"),Literal(url)))
        full_graph.add((node,rdflib.term.URIRef(base_url+"/url"),Literal(url)))
    return(graph.serialize().decode())
#=============================
def getData(aDataDir):
    full_graph = ConjunctiveGraph()
    for root, dirs, files in os.walk(aDataDir):
        for i,file in enumerate(files):
            print("file =",file)
            df = pd.read_csv(aDataDir+"/"+file)
            try:
                user = df["user"]
            except KeyError:
                print("user do not exist!")
                continue
            with open("./RDF/"+"dbita"+str(i)+".rdf", mode='w') as f:
                xml = makeXML(df,full_graph)
                f.write(xml)
    with open("./RDF/"+"dbita-full.rdf", mode='w') as f:
        f.write(full_graph.serialize().decode())
prefix_dict = {"item":" https://qiita.com/items"}
#===============================
if __name__ == '__main__':
    getData(dataDir)

読み込む

最後に作成されたRDFデータを読み込んでAPIでデータを返してみます。
APIの立ち上げにはflaskを使用しました。

buildAPI.py
# 必要なモジュールの読み込み
from flask import Flask, jsonify, abort, make_response,request
import rdflib
#========================
base_url = "https://qiita.com"
def getRDF(value_list,max_got=100):
    # load xml
    graph = rdflib.ConjunctiveGraph()
    graph.parse("RDF/"+"dbita-full.rdf")
    id_list = []
    gotValue_list = []
    for value in value_list:
        # query: navigate方式を採用
        gotValue = [v for r, p, v in graph.triples((None,rdflib.term.URIRef(base_url+"/"+value),None))][0:max_got]
        gotValue_list.append(gotValue)
    result_list = []
    for i,gotValue in enumerate(gotValue_list):
        for j,val in enumerate(gotValue):
            if(i==0):
                result_list.append([val])
            else:
                result_list[j].append(val)
    return(result_list)

def getResult():
    response = { "title": "NONE" }
    if request.method == "GET":
        if request.get_json().get("feature"):
            # read feature from json
            feature = request.get_json().get("feature")
        else:
            response["title"] = "invalid feature!"
        if request.get_json().get("value"):
            response["content"] = getRDF(request.get_json().get("value"))
        else:
            response["title"] = "invalid value!"
    return(response)
#=========================
# Flaskクラスのインスタンスを作成
# __name__は現在のファイルのモジュール名
api = Flask(__name__)

# GETの実装
@api.route('/get', methods=['GET'])
def get():
    response = getResult()
    #print(response)
    return(jsonify(response))

# エラーハンドリング
@api.errorhandler(404)
def not_found(error):
    return make_response(jsonify({'error': 'Not found'}), 404)

# ホスト0.0.0.0, ポート3001番でサーバーを起動
if __name__ == '__main__':
    api.run(debug=False,host='0.0.0.0', port=3001)

実行結果

試しに各記事のURLを取得してみます。
Screen Shot 2019-02-05 at 9.28.44 am 10-43-20-141.png

8
13
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
13