LoginSignup
1

More than 3 years have passed since last update.

Python3標準ライブラリだけでHTTPをいい感じに叩く雑な関数書いた(multipart/form-data対応)

Last updated at Posted at 2019-08-06

いつも欲しくなったときにコードがどっか行くし、urllib.requestの使い方をしばしば誤るし、すぐ叩けず諦めてrequests入れて、とかやるのがめんどくさくなり、むしゃくしゃしてた。

  • 標準ライブラリのみ
  • 1つの関数で様々なMethodのHTTPを投げ分けたい
  • 同一キーで2つ以上のパラメータは考えていない
  • multipart/form-dataも多分対応。ファイルデータは[filepath ,filename]という感じにリストにして与えると、ファイル読んで送る。[filepath]と長さ1の場合はbasenameで補完。(PerlのHTTP::Request::Commonの動きをまねた。)
import os
import json
import mimetypes

from urllib.parse import urlencode
from urllib.request import Request, urlopen, HTTPError


def str2byte(s):
    if type(s) == str:
        return s.encode()
    return s

def convert_multipart_data(data, boundry):
    CRLF = b"\r\n"

    boundry = str2byte(boundry)

    for key, val in data.items():
        key = str2byte(key)
        if type(val) == list:
            if len(val) == 1:
                filepath = val[0]
                name = None
            else:
                (filepath, name) = val
            if name is None:
                name = os.path.basename(filepath)
            name = str2byte(name)
            # size = os.path.getsize(filepath)
            mime = mimetypes.guess_type(filepath)[0].encode() or b'application/octet-stream'

            yield (b'--' + boundry + CRLF
                   + b'Content-Disposition: form-data; name="' + key + b'"; filename="' + name + b'"' + CRLF \
                   # + 'Content-Length: {0}'.format(size) + CRLF \
                   + b'Content-Type: ' + mime + CRLF \
                   + CRLF)
            with open(filepath, 'rb') as fp:
                for chunk in iter(lambda: fp.read(4096), b''):
                    yield chunk

            yield CRLF
        else:
            val = str2byte(val)
            yield (b'--' + boundry + CRLF
                   + b'Content-Disposition: form-data; name="' + key + b'"' + CRLF
                   + CRLF
                   + val + CRLF)

    yield (b'--' + boundry + b'--' + CRLF)
    return


def request_simple(url, data=None, headers={}, method=None, use_multipart_formdata=False):
    if method is None:
        method = 'GET' if data is None else 'POST'

    if method == 'POST' and use_multipart_formdata:
        boundary = 'bbboundaryyy'
        headers['Content-Type'] = 'multipart/form-data; boundary=' + boundary
        data = convert_multipart_data(data, boundary)
    # TODO: 直したい
    elif method == 'POST' and (headers.get('Content-Type') is None or headers['Content-Type'] == 'application/x-www-form-urlencoded'):
        headers['Content-Type'] = 'application/x-www-form-urlencoded'
        data = urlencode(data).encode()

    req = Request(url, data=data, headers=headers, method=method)

    try:
        with urlopen(req) as f:
            # 必要に応じて。bodyを返したり、そのまま返したり、jsonを解釈してdictで返したり
            return f.read()
#            return json.load(f.fp)
    except HTTPError as err:
        print(json.load(err.fp)) # 


# multipart/form-data
res = request_simple('http://localhost:8080/', headers={
    "Test-Header": "TEST"
}, data={
   'files[0]': 'cat-1.jpg', # value
    'files[1]': ['cat-4.jpg', 'image2.jpg'], # file
    'files[2]': ['cat-3.jpg', 'image.jpg'] # file
}, use_multipart_formdata=True)

print(res.decode('utf-8'))

# application/x-www-form-urlencoded
res = request_simple('http://localhost:8080/hoge?fuga&piyo', headers={
    "Test-Header": "TEST"
}, data={
   'files[0]': 'cat-1.jpg',
   'files[1]': 'cat-2.jpg'
})

print(res.decode('utf-8'))

ついでに、動作確認に使ってたHttp Serverとか(golang)

package main

import (
    "bytes"
    "fmt"
    "net/http"
    "strings"
)

func handler(w http.ResponseWriter, r *http.Request) {
    w.Header().Add("Content-Type", "text/plain")
    fmt.Fprintf(w, "%s %s %s\n", r.Method, r.RequestURI, r.Proto)

    for name, headers := range r.Header {
        for _, val := range headers {
            fmt.Fprintf(w, "%v: %v\n", name, val)
        }
    }
    fmt.Fprintf(w, "\n")

    contentType := r.Header.Get("Content-Type")
    if err := r.ParseMultipartForm(8192); err == nil {
        boundaryIndex := strings.Index(contentType, "boundary=")
        boundary := contentType[boundaryIndex+9:]

        for _, vals := range r.MultipartForm.File {
            fmt.Fprintf(w, "--%s\n", boundary)
            for _, val := range vals {
                for name, headers := range val.Header {
                    for _, val := range headers {
                        fmt.Fprintf(w, "%v: %v\n", name, val)
                    }
                }
                fmt.Fprintf(w, "\n[data (%d byte)]\n", val.Size)
            }
        }
        fmt.Fprintf(w, "--%s--\n", boundary)
    }

    if err := r.ParseForm(); err == nil {
        for key, vals := range r.Form {
            for _, val := range vals {
                fmt.Fprintf(w, "%v=%v\n", key, val)
            }
        }
    }
    if r.Body != nil {
        bufbody := new(bytes.Buffer)
        bufbody.ReadFrom(r.Body)
        defer r.Body.Close()
        fmt.Fprintf(w, "%v", bufbody.String())
    }
}

func main() {
    http.HandleFunc("/", handler)
    http.ListenAndServe(":8080", nil)
}

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1