Python 实例

读写文件

写文件

#!/usr/bin/env python3

with open('test.txt', 'w') as f:
    f.write('hello\n')
    f.write('python')

读文件

#!/usr/bin/env python3

with open('test.txt', 'r') as f:
    for l in f.readlines():
        print(l, end='')

遍历文件夹下的所有文件

#!/usr/bin/env python3

import pathlib
import pprint

pprint.pprint(list(pathlib.Path().rglob("*")))

遍历所有文件,gbk转码为utf-8

#!/usr/bin/env python3

import pathlib

for path in pathlib.Path().rglob("*"):
    file_name = str(path.resolve())
    if not file_name.endswith(".h") or not file_name.endswith("*.cpp"):
        continue

    f1 = open(file_name, "rb")
    content = str(f1.read(), "gbk")
    f1.close()

    f2 = open(file_name, "wb")
    f2.write(content.encode("utf8"))
    f2.close()
    print(file_name)

日期时间文本转为时间戳

import datetime

datetimestr = "2021-01-01 00:00:00"
dt = datetime.datetime.strptime(datetimestr, "%Y-%m-%d %H:%M:%S")
timestamp = dt.replace(tzinfo=datetime.timezone.utc).timestamp()

RESTful 服务器

以下代码使用python标准库,推荐使用更好用的Flask-RESTful。

#!/usr/bin/env python3

from http import *
from http.server import *
from urllib.parse import urlparse
import json

class HTTPRequestHandler(BaseHTTPRequestHandler):
    def do_POST(self):
        url = urlparse(self.path)
        path = url.path
        if path == "/test":
            rep = json.dumps({"key1": "test"})
            self.send_response(HTTPStatus.OK)
            self.send_header("Content-Type", "application/json")
            self.send_header("Content-Length", str(len(rep)))
            self.end_headers()
            self.wfile.write(rep)

server_address = ("0.0.0.0", 8080)
print("Serving HTTP on " + str(server_address))

httpd = HTTPServer(server_address, HTTPRequestHandler)
httpd.serve_forever()

使用SQLite数据库

import sqlite3

connection = sqlite3.connect("test.db")
connection.row_factory = sqlite3.Row
cursor = connection.cursor

# SQL语句
sql = "..."

# 执行SQL语句,如果是修改数据库内容,必须调用commit
try:
    cursor.execute(sql)
    connection.commit()
except BaseException as e:
    print("error: " + str(e))

# 执行查询语句时,获取单个记录
try:
    cursor.execute(sql)
    row = cursor.fetchone()
    if row:
        keys = row.keys()  # 所有字段名
        vals = list(row)   # 所有字段值
        data = dict(zip(keys, vals)) # 构成一个字典 
except BaseException as e:
    print("error: " + str(e))

# 执行查询语句时,获取所有记录
try:
    for row in cursor.execute(sql):
        keys = row.keys()  # 所有字段名
        vals = list(row)   # 所有字段值
        data = dict(zip(keys, vals)) # 构成一个字典 
except BaseException as e:
    print("error: " + str(e))