datetime —— 处理日期和时间
获取当前日期和时间
from datetime import datetime
# 获取当前datetime
now = datetime.now()
获取指定日期和时间
# # 用指定日期时间创建datetime
dt = datetime(2015, 4, 19, 12, 20)
datetime转换为timestamp
dt = datetime(2015, 4, 19, 12, 20)
# 把datetime转换为timestamp
t = dt.timestamp()
Python 的 timestamp 是一个浮点数。如果有小数位,小数位表示毫秒数
某些编程语言(如Java和JavaScript)的 timestamp 使用整数表示毫秒数,这种情况下只需要把 timestamp 除以1000就得到Python的浮点表示方法
timestamp转换为datetime
t = 1429417200.0
# timestamp和本地时间做转换
dt1 = datetime.fromtimestamp(t)
# timestamp是一个浮点数,它没有时区的概念,而datetime是有时区的
# UTC时间,即UTC+0:00时区的时间
dt2 = datetime.utcfromtimestamp(t)
str转换为datetime
dt = datetime.strptime('2015-6-1 18:19:59', '%Y-%m-%d %H:%M:%S')
datetime转换为str
now = datetime.now()
dtStr = now.strftime('%a, %b %d %H:%M')
datetime加减
from datetime import datetime, timedelta
now = datetime.now()
now + timedelta(hours=10)
now - timedelta(days=1)
now + timedelta(days=2, hours=12)
本地时间转换为UTC时间
一个datetime类型有一个时区属性tzinfo,但是默认为None,所以无法区分这个datetime到底是哪个时区,除非强行给datetime设置一个时区:
from datetime import datetime, timedelta, timezone
# 创建时区UTC+8:00
tz_utc_8 = timezone(timedelta(hours=8))
# datetime表示的时间需要时区信息才能确定一个特定的时间,否则只能视为本地时间
now = datetime.now()
# 强制设置为UTC+8:00
dt = now.replace(tzinfo=tz_utc_8)
时区转换
from datetime import datetime, timedelta, timezone
# 通过 utcnow() 拿到当前的UTC时间,并强制设置为UTC,作为基准时间
utc_dt = datetime.utcnow().replace(tzinfo=timezone.utc)
print(utc_dt)
# 利用带时区的 datetime ,通过 astimezone() 方法,可以转换到任意时区
bj_dt = utc_dt.astimezone(timezone(timedelta(hours=8)))
print(bj_dt)
tokyo_dt = utc_dt.astimezone(timezone(timedelta(hours=9)))
print(tokyo_dt)
# 不是必须从UTC+0:00时区转换到其他时区,任何带时区的datetime都可以正确转换
tokyo_dt2 = bj_dt.astimezone(timezone(timedelta(hours=9)))
print(tokyo_dt2)
collections
namedtuple
namedtuple
是一个函数,它用来创建一个自定义的 tuple
对象,并且规定了 tuple
元素的个数,并可以用属性而不是索引来引用 tuple
的某个元素
from collections import namedtuple
Point = namedtuple('Point', ['x', 'y'])
p = Point(1, 2)
p.x
p.y
isinstance(p, tuple) ==> True
deque
deque
是为了高效实现插入和删除操作的双向列表,适合用于队列和栈
from collections import deque
q = deque(['a', 'b', 'c'])
q.append('x')
q.pop()
q.appendleft('x')
q.popleft()
defaultdict
使用 dict 时,如果引用的 Key 不存在,就会抛出 KeyError 。如果希望 key 不存在时,返回一个默认值,就可以用 defaultdict :
from collections import defaultdict
dd = defaultdict(lambda: 'N/A')
dd['key1'] = 'abc'
dd['key1'] ==> 'abc'
dd['key2'] ==> 'N/A'
OrderedDict
要保持 dict 中 Key 的顺序,可以用 OrderedDict ,OrderedDict 的 Key 会按照插入的顺序排列
from collections import OrderedDict
# 普通的 dict
d = dict([('a', 1), ('b', 2), ('c', 3)])
od = OrderedDict([('a', 1), ('b', 2), ('c', 3)])
od ==> OrderedDict([('a', 1), ('b', 2), ('c', 3)])
OrderedDict 可以实现一个 FIFO(先进先出)的 dict ,当容量超出限制时,先删除最早添加的 Key :
from collections import OrderedDict
class LastUpdatedOrderedDict(OrderedDict):
def __init__(self, capacity):
super(LastUpdatedOrderedDict, self).__init__()
self._capacity = capacity
def __setitem__(self, key, value):
containsKey = 1 if key in self else 0
if len(self) - containsKey >= self._capacity:
last = self.popitem(last=False)
print('remove:', last)
if containsKey:
del self[key]
print('set:', (key, value))
else:
print('add:', (key, value))
OrderedDict.__setitem__(self, key, value)
Counter
Counter 是一个简单的计数器,例如,统计字符出现的个数:
from collections import Counter
c = Counter()
for ch in 'programming':
c[ch] = c[ch] + 1
c
base64
Base64编码会把3字节(4*6 bit)的二进制数据编码为4字节的文本数据,长度增加33%,好处是编码后的文本数据可以在邮件正文、网页等直接显示
如果要编码的二进制数据不是3的倍数,Base64用\x00字节在末尾补足后,再在编码的末尾加上1个或2个=号,表示补了多少字节,解码的时候,会自动去掉
"url safe"的base64编码:针对字符 + 和 / 在URL中就不能直接作为参数,把字符 + 和 / 分别变成 - 和 _
import base64
base64.b64encode(b'binary\x00string') ==> b'YmluYXJ5AHN0cmluZw=='
# b64decode() 可接受一个 str
base64.b64decode(b'YmluYXJ5AHN0cmluZw==') ==> b'binary\x00string'
base64.b64decode('YmluYXJ5AHN0cmluZw==') ==> b'binary\x00string'
base64.b64encode(b'i\xb7\x1d\xfb\xef\xff') ==> b'abcd++//'
base64.urlsafe_b64encode(b'i\xb7\x1d\xfb\xef\xff') ==> b'abcd--__'
base64.urlsafe_b64decode('abcd--__') ==> b'i\xb7\x1d\xfb\xef\xff'
由于=字符也可能出现在Base64编码中,但=用在URL、Cookie里面会造成歧义,所以,很多Base64编码后会把=去掉
因为Base64是把3个字节变为4个字节,所以,Base64编码的长度永远是4的倍数,因此,需要加上=把Base64字符串的长度变为4的倍数,就可以正常解码了
能处理去掉=的base64解码函数:
import base64
def safe_base64_decode(s):
return base64.b64decode(s + '=' * (4 - len(s) % 4))
struct
准确地讲,Python没有专门处理字节的数据类型
但由于b'str'可以表示字节,所以,字节数组=二进制str
Python 提供了一个 struct
模块来解决 bytes
和其他二进制数据类型的转换
struct
的 pack
函数把任意数据类型变成 bytes
:
import struct
# pack的第一个参数是处理指令,'>I'的意思是:>表示字节顺序是big-endian,也就是网络序,I表示4字节无符号整数
struct.pack('>I', 10240099) ==> b'\x00\x9c@c'
unpack
把 bytes
变成相应的数据类型:
# 根据>IH的说明,后面的bytes依次变为 I:4字节无符号整数 和 H:2字节无符号整数
struct.unpack('>IH', b'\xf0\xf0\xf0\xf0\x80\x80') ==> (4042322160, 32896)
hashlib
Python 的 hashlib
提供了常见的摘要算法,如MD5,SHA1等等
import hashlib
md5 = hashlib.md5()
md5.update('how to use md5 in python hashlib?'.encode('utf-8'))
# 生成结果是固定的128 bit字节,通常用一个32位的16进制字符串表示
print(md5.hexdigest())
# 如果数据量很大,可以分块多次调用update(),最后计算的结果是一样的
md5 = hashlib.md5()
md5.update('how to use md5 in '.encode('utf-8'))
md5.update('python hashlib?'.encode('utf-8'))
print(md5.hexdigest())
import hashlib
sha1 = hashlib.sha1()
sha1.update('how to use sha1 in '.encode('utf-8'))
sha1.update('python hashlib?'.encode('utf-8'))
# SHA1的结果是160 bit字节,通常用一个40位的16进制字符串表示
print(sha1.hexdigest())
itertools
Python 的内建模块 itertools
提供了非常有用的用于操作迭代对象的函数
itertools
模块提供的全部是处理迭代功能的函数,它们的返回值不是 list
,而是 Iterator
itertools
提供的几个“无限”迭代器:
-
count()
会创建一个无限的迭代器
import itertools
natuals = itertools.count(1)
for n in natuals:
print(n) ==> 1,2,3,...
-
cycle()
会把传入的一个序列无限重复下去
import itertools
cs = itertools.cycle('ABC')
for c in cs:
print(c) ==> 'A','b','c','A','b','c',...
-
repeat()
负责把一个元素无限重复下去,不过如果提供第二个参数就可以限定重复次数
import itertools
ns = itertools.repeat('A', 3)
for n in ns:
print(n) ==> 'A','A','A'
无限序列虽然可以无限迭代下去,但是通常我们会通过takewhile()等函数根据条件判断来截取出一个有限的序列:
import itertools
natuals = itertools.count(1)
ns = itertools.takewhile(lambda x: x <= 10, natuals)
list(ns) ==> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
itertools
提供的几个迭代器操作函数更加有用:
-
chain()
可以把一组迭代对象串联起来,形成一个更大的迭代器
import itertools
for c in itertools.chain('ABC', 'XYZ'):
print(c) ==> 'A' 'B' 'C' 'X' 'Y' 'Z'
-
groupby()
把迭代器中相邻的重复元素挑出来放在一起
import itertools
for key, group in itertools.groupby('AAABBBCCAAA'):
print(key, list(group))
'''
A ['A', 'A', 'A']
B ['B', 'B', 'B']
C ['C', 'C']
A ['A', 'A', 'A']
'''
# 实际上挑选规则是通过函数完成的,只要作用于函数的两个元素返回的值相等,这两个元素就被认为是在一组的,而函数返回值作为组的key
for key, group in itertools.groupby('AaaBBbcCAAa', lambda c: c.upper()):
print(key, list(group))
'''
A ['A', 'a', 'a']
B ['B', 'B', 'b']
C ['c', 'C']
A ['A', 'A', 'a']
'''
contextlib
Python 的 with
语句允许我们非常方便地使用资源,而不必担心资源没有关闭
with open('/path/to/file', 'r') as f:
f.read()
并不是只有 open()
函数返回的fp对象才能使用 with
语句。实际上,任何对象,只要正确实现了上下文管理,就可以用于 with
语句
实现上下文管理是通过 __enter__
和 __exit__
这两个方法实现的
class Query(object):
def __init__(self, name):
self.name = name
def __enter__(self):
print('Begin')
return self
def __exit__(self, exc_type, exc_value, traceback):
if exc_type:
print('Error')
else:
print('End')
def query(self):
print('Query info about %s...' % self.name)
with Query('Bob') as q:
q.query()
@contextmanager
编写 __enter__
和 __exit__
仍然很繁琐,因此 Python 的标准库 contextlib
提供了更简单的写法
@contextmanager
这个 decorator
接受一个 generator
,用 yield
语句把 with ... as var
把变量输出出去,然后,with
语句就可以正常地工作了
from contextlib import contextmanager
class Query(object):
def __init__(self, name):
self.name = name
def query(self):
print('Query info about %s...' % self.name)
@contextmanager
def create_query(name):
print('Begin')
q = Query(name)
yield q
print('End')
with create_query('Bob') as q:
q.query()
很多时候,我们希望在某段代码执行前后自动执行特定代码,也可以用 @contextmanager
实现
@contextmanager
def tag(name):
print("<%s>" % name)
yield
print("</%s>" % name)
with tag("h1"):
print("hello")
print("world")
代码的执行顺序是:
- with语句首先执行yield之前的语句,因此打印出<h1>;
- yield调用会执行with语句内部的所有语句,因此打印出hello和world;
- 最后执行yield之后的语句,打印出</h1>。
因此,@contextmanager 让我们通过编写 generator 来简化上下文管理
@closing
如果一个对象没有实现上下文,我们就不能把它用于 with
语句
可以用 closing()
来把该对象变为上下文对象
用 with
语句使用 urlopen()
:
from contextlib import closing
from contextlib import contextmanager
from urllib.request import urlopen
with closing(urlopen('https://www.python.org')) as page:
for line in page:
print(line)
# closing也是一个经过@contextmanager装饰的generator
# 它的作用就是把任意对象变为上下文对象,并支持with语句
@contextmanager
def closing(thing):
try:
yield thing
finally:
thing.close()
XML
操作XML有两种方法:DOM和SAX
- DOM会把整个XML读入内存,解析为树,因此占用内存大,解析慢,优点是可以任意遍历树的节点
- SAX是流模式,边读边解析,占用内存小,解析快,缺点是我们需要自己处理事件
正常情况下,优先考虑SAX,因为DOM实在太占内存
在 Python 中使用 SAX 解析 XML 非常简洁,通常我们关心的事件是 start_element
,end_element
和 char_data
,准备好这3个函数,然后就可以解析 xml 了
from xml.parsers.expat import ParserCreate
class DefaultSaxHandler(object):
def start_element(self, name, attrs):
print('sax:start_element: %s, attrs: %s' % (name, str(attrs)))
def end_element(self, name):
print('sax:end_element: %s' % name)
def char_data(self, text):
print('sax:char_data: %s' % text)
xml = r'''<?xml version="1.0"?>
<ol>
<li><a href="/python">Python</a></li>
<li><a href="/ruby">Ruby</a></li>
</ol>
'''
handler = DefaultSaxHandler()
parser = ParserCreate()
parser.StartElementHandler = handler.start_element
parser.EndElementHandler = handler.end_element
parser.CharacterDataHandler = handler.char_data
parser.Parse(xml)
生成 XML 可以用字符串拼接。复杂 XML 建议改用 JSON
str.join(list)
返回 str
利用SAX编写程序解析Yahoo的XML格式的天气预报,获取当天和第二天的天气:(未完成)
from xml.parsers.expat import ParserCreate
class WeatherSaxHandler(object):
def __init__(self):
self._city = ''
self._country = ''
self._data = {}
@property
def get_data(self):
return self._data
def start_element(self, name, attrs):
# print('sax:start_element: %s, attrs: %s' % (name, str(attrs)))
if name == 'yweather:location':
self._data['city'] = attrs['city']
self._data['country'] = attrs['country']
elif
def end_element(self, name):
print('sax:end_element: %s' % name)
def char_data(self, text):
print('sax:char_data: %s' % text)
def parse_weather(xml):
handler = WeatherSaxHandler()
parser = ParserCreate()
parser.StartElementHandler = handler.start_element
parser.EndElementHandler = handler.end_element
parser.CharacterDataHandler = handler.char_data
parser.Parse(xml)
return handler.
return {
'city': 'Beijing',
'country': 'China',
'today': {
'text': 'Partly Cloudy',
'low': 20,
'high': 33
},
'tomorrow': {
'text': 'Sunny',
'low': 21,
'high': 34
}
}
测试数据:
data = r'''<?xml version="1.0" encoding="UTF-8" standalone="yes" ?>
<rss version="2.0" xmlns:yweather="http://xml.weather.yahoo.com/ns/rss/1.0" xmlns:geo="http://www.w3.org/2003/01/geo/wgs84_pos#">
<channel>
<title>Yahoo! Weather - Beijing, CN</title>
<lastBuildDate>Wed, 27 May 2015 11:00 am CST</lastBuildDate>
<yweather:location city="Beijing" region="" country="China"/>
<yweather:units temperature="C" distance="km" pressure="mb" speed="km/h"/>
<yweather:wind chill="28" direction="180" speed="14.48" />
<yweather:atmosphere humidity="53" visibility="2.61" pressure="1006.1" rising="0" />
<yweather:astronomy sunrise="4:51 am" sunset="7:32 pm"/>
<item>
<geo:lat>39.91</geo:lat>
<geo:long>116.39</geo:long>
<pubDate>Wed, 27 May 2015 11:00 am CST</pubDate>
<yweather:condition text="Haze" code="21" temp="28" date="Wed, 27 May 2015 11:00 am CST" />
<yweather:forecast day="Wed" date="27 May 2015" low="20" high="33" text="Partly Cloudy" code="30" />
<yweather:forecast day="Thu" date="28 May 2015" low="21" high="34" text="Sunny" code="32" />
<yweather:forecast day="Fri" date="29 May 2015" low="18" high="25" text="AM Showers" code="39" />
<yweather:forecast day="Sat" date="30 May 2015" low="18" high="32" text="Sunny" code="32" />
<yweather:forecast day="Sun" date="31 May 2015" low="20" high="37" text="Sunny" code="32" />
</item>
</channel>
</rss>
'''
测试:
weather = parse_weather(data)
assert weather['city'] == 'Beijing', weather['city']
assert weather['country'] == 'China', weather['country']
assert weather['today']['text'] == 'Partly Cloudy', weather['today']['text']
assert weather['today']['low'] == 20, weather['today']['low']
assert weather['today']['high'] == 33, weather['today']['high']
assert weather['tomorrow']['text'] == 'Sunny', weather['tomorrow']['text']
assert weather['tomorrow']['low'] == 21, weather['tomorrow']['low']
assert weather['tomorrow']['high'] == 34, weather['tomorrow']['high']
print('Weather:', str(weather))
HTMLParser
from html.parser import HTMLParser
from html.entities import name2codepoint
class MyHTMLParser(HTMLParser):
def handle_starttag(self, tag, attrs):
print('<%s>' % tag)
def handle_endtag(self, tag):
print('</%s>' % tag)
def handle_startendtag(self, tag, attrs):
print('<%s/>' % tag)
def handle_data(self, data):
print(data)
def handle_comment(self, data):
print('<!--', data, '-->')
def handle_entityref(self, name):
print('&%s;' % name)
def handle_charref(self, name):
print('&#%s;' % name)
parser = MyHTMLParser()
parser.feed('''<html>
<head></head>
<body>
<!-- test html parser -->
<p>Some <a href=\"#\">html</a> HTML tutorial...<br>END</p>
</body></html>''')
urllib
urllib
提供了一系列用于操作URL的功能
urllib
的 request
模块可以非常方便地抓取URL内容,也就是发送一个GET请求到指定的页面,然后返回HTTP的响应:
from urllib import request
with request.urlopen('https://api.douban.com/v2/book/2129650') as f:
data = f.read()
print('Status:', f.status, f.reason)
for k, v in f.getheaders():
print('%s: %s' % (k, v))
print('Data:', data.decode('utf-8'))
模拟浏览器发送GET请求,就需要使用 Request
对象,通过往 Request
对象添加 HTTP 头,我们就可以把请求伪装成浏览器
from urllib import request
# 模拟iPhone 6去请求豆瓣首页
req = request.Request('http://www.douban.com/')
req.add_header('User-Agent', 'Mozilla/6.0 (iPhone; CPU iPhone OS 8_0 like Mac OS X) AppleWebKit/536.26 (KHTML, like Gecko) Version/8.0 Mobile/10A5376e Safari/8536.25')
with request.urlopen(req) as f:
print('Status:', f.status, f.reason)
for k, v in f.getheaders():
print('%s: %s' % (k, v))
print('Data:', f.read().decode('utf-8'))
如果要以 POST 发送一个请求,只需要把参数 data 以 bytes 形式传入
from urllib import request, parse
# 模拟一个微博登录
print('Login to weibo.cn...')
email = input('Email: ')
passwd = input('Password: ')
login_data = parse.urlencode([
('username', email),
('password', passwd),
('entry', 'mweibo'),
('client_id', ''),
('savestate', '1'),
('ec', ''),
('pagerefer', 'https://passport.weibo.cn/signin/welcome?entry=mweibo&r=http%3A%2F%2Fm.weibo.cn%2F')
])
req = request.Request('https://passport.weibo.cn/sso/login')
req.add_header('Origin', 'https://passport.weibo.cn')
req.add_header('User-Agent', 'Mozilla/6.0 (iPhone; CPU iPhone OS 8_0 like Mac OS X) AppleWebKit/536.26 (KHTML, like Gecko) Version/8.0 Mobile/10A5376e Safari/8536.25')
req.add_header('Referer', 'https://passport.weibo.cn/signin/login?entry=mweibo&res=wel&wm=3349&r=http%3A%2F%2Fm.weibo.cn%2F')
with request.urlopen(req, data=login_data.encode('utf-8')) as f:
print('Status:', f.status, f.reason)
for k, v in f.getheaders():
print('%s: %s' % (k, v))
print('Data:', f.read().decode('utf-8'))
如果还需要更复杂的控制,比如通过一个 Proxy 去访问网站,我们需要利用 ProxyHandler
来处理
proxy_handler = urllib.request.ProxyHandler({'http': 'http://www.example.com:3128/'})
proxy_auth_handler = urllib.request.ProxyBasicAuthHandler()
proxy_auth_handler.add_password('realm', 'host', 'username', 'password')
opener = urllib.request.build_opener(proxy_handler, proxy_auth_handler)
with opener.open('http://www.example.com/login.html') as f:
pass
作业:利用urllib读取XML,将XML一节的数据由硬编码改为由urllib获取:
import re
from xml.parsers.expat import ParserCreate
from urllib import request
class DefaultSaxHandler(object):
def __init__(self):
self.today = 0
self.__data = {}
@property
def get_data(self):
return self.__data
def satrt_element(self,name,attrs):
if name == "yweather:location":
self.__data['city'] = attrs['city']
self.__data['country'] = attrs['country']
if name == "yweather:condition":
self.today = re.split(r'\s?', attrs['date'])[1]
if name == "yweather:forecast":
if re.split(r'\s?', attrs['date'])[0] == self.today:
self.__data['today'] = {}
self.__data['today']['text'] = attrs['text']
self.__data['today']['low'] = int(attrs['low'])
self.__data['today']['high'] = int(attrs['high'])
if int(re.split(r'\s?', attrs['date'])[0]) == int(self.today) + 1:
self.__data['tomorrow'] = {}
self.__data['tomorrow']['text'] = attrs['text']
self.__data['tomorrow']['low'] = int(attrs['low'])
self.__data['tomorrow']['high'] = int(attrs['high'])
def end_element(self,name):
pass
def char_data(self,text):
pass
class get_weather(object):
def __init__(self,city):
self._city = city
@staticmethod
def get_yahoo(city):
url = "https://query.yahooapis.com/v1/public/yql?q=select%20*%20from%20weather.forecast%20where%20woeid%20in%20(select%20woeid%20from%20geo.places(1)%20where%20text%3D%22" + city +"%22)&format=xml&env=store%3A%2F%2Fdatatables.org%2Falltableswithkeys"
with request.urlopen(url) as f:
data = f.read()
return data
def parse_weather(self):
handler = DefaultSaxHandler()
parser = ParserCreate()
parser.StartElementHandler = handler.satrt_element
parser.EndElementHandler = handler.end_element
parser.CharacterDataHandler = handler.char_data
parser.Parse(self.get_yahoo(self._city))
return str(handler.get_data)
weather = get_weather("shanghai").parse_weather()
print(weather)