UP | HOME

Python3 CookBook

Table of Contents

数据结构和算法

Python 提供了大量的内置数据结构,包括列表、集合和字典。这里将讨论使用这些数据结构遇到的查询、排序和过滤等问题,并在集合模块 collections 当中操作菏泽写数据结构的方法。

解压序列赋值给多个变量

如何将一个包含 N 个元素的元组或序列解压赋值给 N 个变量?

p = (4, 5)
x, y = p
print(x, y)

data = ['ACEM', 50, 91.1, (2012, 12, 21)]
name, shares, price, date = data
print(name, shares, price, date)

name, shares, price, (year, mon, day) = data
print(name, shares, price, year, mon, day)
4 5
ACEM 50 91.1 (2012, 12, 21)
ACEM 50 91.1 2012 12 21

这种解压赋值可以用在任何可迭代对象上面,而不仅仅是列表或元组。包括字符串,文件对象,迭代器和生成器。

a, b, c, d, e = 'Hello'
print(a, b, c, d, e)
a, b, c, d = range(4)
print(a, b, c, d)
H e l l o
0 1 2 3

如果只需要部分值,可用任意不被用到的变量名去占位。

解压可迭代对象赋值给多个变量

如果一个可迭代对象的元素个数超过变量个数,会抛出一个 ValueError。则如何从这个可迭代对象中解压出 N 个元素呢? python 的星号表达式可以解决这个问题。

a, b, *c = [1, 2, 3, 4, 5]
print(a, b, c)

a, b, *c = range(10)
print(a, b, c)

a, *b, c = range(10)
print(a, b, c)

*a, b, c = range(10)
print(a, b, c, end='')
1 2 [3, 4, 5]
0 1 [2, 3, 4, 5, 6, 7, 8, 9]
0 [1, 2, 3, 4, 5, 6, 7, 8] 9
[0, 1, 2, 3, 4, 5, 6, 7] 8 9

注意:星号对应的变量永远是列表类型。扩展的迭代解压语法是专门为解压不确定个数或任意个数元素的可迭代对象而设 concat 计的。通常这些可迭代对象的元素结构有确定的规则,特别是,星号表达式在迭代元素为可变长元组的序列时是很有用的。

records = [('foo', 1, 2), ('bar', 'hello'), ('foo', 3, 4)]


def do_foo(x, y):
    print('foo', x, y)


def do_bar(s):
    print('bar', s)


for tag, *args in records:
    if tag == 'foo':
        do_foo(*args)
    elif tag == 'bar':
        do_bar(*args)
foo 1 2
bar hello
foo 3 4

星号解压语法在字符串操作的时候也会很有用,比如分割字符串:

line = 'nobody:*:-2:-2:Unprivileged User:/var/empty:/usr/bin/false'
uname, *fields, homedir, sh = line.split(':')
print(uname, fields, homedir, sh)
nobody ['*', '-2', '-2', 'Unprivileged User'] /var/empty /usr/bin/false

正克隆到 '/tmp/yaourt-tmp-qiang/aur-redi+RESULTS:

nobody ['*', '-2', '-2', 'Unprivileged User'] /var/empty /usr/bin/false

保留组后 N 个元素

在迭代操作或者其他操作时,怎样只保留最后有限几个元素的历史记录?保留有限历史记录正是 collections.deque 大显身手的时候。

from collections import deque


file_lines = '''Python is powerful... and fast;
plays well with others;
runs everywhere;
is friendly & easy to learn;
is Open.
These are some of the reasons people who use Python would rather not use anything else.

Python can be easy to pick up whether you're a first time programmer
or you're experienced with other languages.
The following pages are a useful first step to get on your way
writing programs with Python!

The community hosts conferences and meetups, collaborates on code,
and much more. Python's documentation will help you along the way,
and the mailing lists will keep you in touch.

Conferences and Workshops
Python Documentation
Mailing Lists and IRC channels'''


def search(lines, pattern, history=5):
    previous_lines = deque(maxlen=history)
    for line in lines:
        if pattern in line:
            yield line, previous_lines
        previous_lines.append(line)


for line, prevlines in search(file_lines.split('\n'), 'Python', 5):
    for pline in prevlines:
        print(pline)
    print(line)
    print('-' * 15)
Python is powerful... and fast;
---------------
Python is powerful... and fast;
plays well with others;
runs everywhere;
is friendly & easy to learn;
is Open.
These are some of the reasons people who use Python would rather not use anything else.
---------------
runs everywhere;
is friendly & easy to learn;
is Open.
These are some of the reasons people who use Python would rather not use anything else.

Python can be easy to pick up whether you're a first time programmer
---------------
These are some of the reasons people who use Python would rather not use anything else.

Python can be easy to pick up whether you're a first time programmer
or you're experienced with other languages.
The following pages are a useful first step to get on your way
writing programs with Python!
---------------
or you're experienced with other languages.
The following pages are a useful first step to get on your way
writing programs with Python!

The community hosts conferences and meetups, collaborates on code,
and much more. Python's documentation will help you along the way,
---------------
The community hosts conferences and meetups, collaborates on code,
and much more. Python's documentation will help you along the way,
and the mailing lists will keep you in touch.

Conferences and Workshops
Python Documentation
---------------

其中 collections.deque 是一个双端队列. 我们在写查询元素的代码时,通常会使用包含 yield 表达式的生成器函数,也就是我们上面代码示例。这样可以将搜索过程和使用搜索代码解耦。使用 deque(maxlen=N)构造函数会新建一个固定大小的队列,当新的元素加入并且这个队列已满的时候,最老的元素会自动被移除。

from collections import deque
q = deque(maxlen=3)
q.append(1)
q.append(2)
q.append(3)
print(q)
q.append(4)
print(q)
q.append(5)
print(q, end='')
deque([1, 2, 3], maxlen=3)
deque([2, 3, 4], maxlen=3)
deque([3, 4, 5], maxlen=3)

使用 deque 队列方案会更优雅且运行更快。一般 deque 类可以被用在任何你只需要一个简单队列数据结构的场合。如果你不设置最大队列大小,那么就会得到一个无线大小队列,你可以在队列的两端执行添加和弹出元素的操作,其具体包含那么方法如下:

from collections import deque

q = deque(maxlen=20)
# 尾部添加数据
q.append(1)
q.append(2)
print(q)
# 头部添加数据
q.appendleft(3)
print(q)
# 尾部扩展可迭代对象
q.extend([4, 5])
print(q)
q.extend(range(3))
print(q)
# 头部扩展可迭代对象
q.extendleft([6, 7])
print(q)
# 尾部移除
q.pop()
print(q)
# 头部移除
q.popleft()
print(q)
# 统计元素出现的个数
print(q.count(1))
# index 检索
print(q.index(2))
# 指定位置插入
q.insert(13, 56)
print(q, end='')
# clear 清空队列
# copy 复制队列
# remove 移除队列中指定值
# reverse 翻转队列
# rotate 旋转队列
deque([1, 2], maxlen=20)
deque([3, 1, 2], maxlen=20)
deque([3, 1, 2, 4, 5], maxlen=20)
deque([3, 1, 2, 4, 5, 0, 1, 2], maxlen=20)
deque([7, 6, 3, 1, 2, 4, 5, 0, 1, 2], maxlen=20)
deque([7, 6, 3, 1, 2, 4, 5, 0, 1], maxlen=20)
deque([6, 3, 1, 2, 4, 5, 0, 1], maxlen=20)
2
3
deque([6, 3, 1, 2, 4, 5, 0, 1, 56], maxlen=20)

查找最大或最小的 N 个元素

如何从一个集合中获得最大或最小 N 个元素列表? heapq 模块有两个函数:nlargest()和 nsmallest()可以完美解决这两个问题。

import heapq

nums = [1, 8, 2, 23, 7, -4, 18, 23, 42, 37, 2]
print(heapq.nlargest(3, nums))
print(heapq.nsmallest(3, nums))
[42, 37, 23]
[-4, 1, 2]

两个函数都能接受一个关键字参数,用于复杂的数据结构中:

import heapq
from pprint import pprint

portfolio = [{
    'name': 'IBM',
    'shares': 100,
    'price': 91.1
}, {
    'name': 'AAPL',
    'shares': 50,
    'price': 543.22
}, {
    'name': 'FB',
    'shares': 200,
    'price': 21.09
}, {
    'name': 'HPQ',
    'shares': 35,
    'price': 31.75
}, {
    'name': 'YHOO',
    'shares': 45,
    'price': 16.35
}, {
    'name': 'ACME',
    'shares': 75,
    'price': 115.65
}]

cheap = heapq.nsmallest(3, portfolio, key=lambda x: x['price'])
expensive = heapq.nlargest(3, portfolio, key=lambda x: x['price'])
pprint(cheap)
pprint(expensive)
[{'name': 'YHOO', 'price': 16.35, 'shares': 45},
 {'name': 'FB', 'price': 21.09, 'shares': 200},
 {'name': 'HPQ', 'price': 31.75, 'shares': 35}]
[{'name': 'AAPL', 'price': 543.22, 'shares': 50},
 {'name': 'ACME', 'price': 115.65, 'shares': 75},
 {'name': 'IBM', 'price': 91.1, 'shares': 100}]

key 的排序可以参考 sorted 函数的 key,都是可以采用匿名函数进行复杂排序的。

如果你想在一个集合中查找最小或最大 N 个元素,并且 N 小于集合元素数量,那么这些函数提供了很好的性能。因为在底层实现里面,首先会先将集合数据进行堆排序后放入一个列表中:

import heapq

nums = [1, 8, 2, 23, 7, -4, 18, 23, 42, 37, 2]
heapq.heapify(nums)
print(nums)
print(heapq.heappop(nums))
print(heapq.heappop(nums))
uprint(heapq.heappop(nums))

堆数据结构最重要的特征是 heap[0]永远是最小的元素。并且剩余的元素都可以很容易通过调用 heapq.heappop()方法得到,该方法会先将地一个元素弹出,然后用下一个最小元素来取代被弹出的元素(这种操作时间复杂度仅仅是 O(logN),N 是堆大小)。堆数据结构的实现是一个很有趣并且值得深入学习的东西,基本上只要是数据结构和算法书籍里都会提到。heapq 模块官方提供了详细的堆数据结构底层实现细节,可以参考研究一下。对于提高 python 是很有帮助的。

实现一个优先级队列

怎样实现一个按优先级排序的队列?并且在这个队列上每次 pop 操作总是返回优先级最高的那个元素?一下是利用 heapq 模块实现了一个简单的优先级队列:

import heapq


class PriorityQueue(object):
    def __init__(self):
        self._queue = []
        self._index = 0

    def push(self, item, priority):
        heapq.heappush(self._queue, (-priority, self._index, item))
        self._index += 1

    def pop(self):
        return heapq.heappop(self._queue)[-1]


class Item(object):
    def __init__(self, name):
        self.name = name

    def __repr__(self):
        return 'Item({!r})'.format(self.name)


q = PriorityQueue()
q.push(Item('foo'), 1)
q.push(Item('bar'), 5)
q.push(Item('spam'), 4)
q.push(Item('grok'), 1)
for _ in range(4):
    print(q.pop())
Item('bar')
Item('spam')
Item('foo')
Item('grok')

第一个 pop()操作返回优先级最高的元素。另外注意到如果两个有着相同优先级的元素(foo 和 grok),pop 操作按照它们传入到队列的顺序返回。这一节主要关注 heapq 模块。函数 heapq.heappush()和 heapq.heappop()分别在队列_queue 上插入和删除第一个元素,并且队列_queue 保证第一个元素拥有最高优先级。heappop()总是返回最小元素,这就保证队列 pop 操作返回正确元素的关键。由于 push 和 pop 操作时间复杂度为 O(logN),其中 N 是堆的大小,因此就算是 N 很大的时候它们运行速度也依旧很快。在上面代码中,队列包含了一个(-priority, index, item)的元组。优先级为负数的目的是使得元素按照优先级从高到底排序。 index 变量的作用是保证同等优先级元素的正确顺序。而且,index 变量也在相同优先级元素比较的时候起到重要作用。如果想在多线程中使用同一个队列,那么需要增加适当的锁和信号量机制。

字典中的键映射多个值

怎样实现一个键对应多个值的字典(multidict)?一个字典就是一个键对应一个单值的映射。如果想要一个键映射多个值,就需要将多个值放入容器中。可以使用 collection 模块的 defaultdict 来构建这样的字典。defaultdict 的一个特征是会自动初始化每个 key 刚开始对应的值。

from collections import defaultdict
from pprint import pprint

d = defaultdict(list)
d['a'].append(1)
d['a'].append(2)
d['b'].append(4)
pprint(dict(d))

d = defaultdict(set)
d['a'].add(1)
d['a'].add(2)
d['b'].add(4)
pprint(dict(d))

d = defaultdict(dict)
d['a1']['a2'] = [1, 2]
d['a1']['a3'] = [2, 3]
d['b'] = (1, 2)
pprint(dict(d))
{'a': [1, 2], 'b': [4]}
{'a': {1, 2}, 'b': {4}}
{'a1': {'a2': [1, 2], 'a3': [2, 3]}, 'b': (1, 2)}

想对于 dict 自带的 setdefault()方便很多。

字典排序

创建一个字典并且在迭代或序列化这个字典的时候能够控制元素的顺序。为了能控制一个字典中元素的顺序,可以使用 collections 模块中 OrderedDict 类。在迭代操作的时候它会保持元素被插入时的顺序。

from collections import OrderedDict

d = OrderedDict()
d['foo'] = 1
d['bar'] = 2
d['spam'] = 3
d['grok'] = 4

for k, v in d.items():
    print(k, v)
foo 1
bar 2
spam 3
grok 4

OrderedDict 内部维护着一个根据键插入顺序的双向链表。每次当一个新的元素插入进来的时候,它会被放到链表的尾部。对于一个已经存在的键的重复赋值不会改变键的顺序。需要注意的是,一个 OrderedDict 的大小是一个普通字典的两倍,因为它内部维护着另外一个链表。所以如果需要构建大量 OrderedDict 实例的数据结构时就得考虑一下内存消耗的影响。

字典的运算

怎样在数据字典中执行一些计算操作(比如求最小值、最大值、排序等)?为了对字典值执行计算,通常需要使用 zip()函数先将键和值反转过来。

import operator

prices = {
    'ACME': 45.23,
    'AAPL': 612.78,
    'IBM': 205.55,
    'HPQ': 37.20,
    'FB': 10.75
}

max_price = max(zip(prices.values(), prices.keys()))
min_price = min(zip(prices.values(), prices.keys()))
print(max_price, min_price)

prices_sorted = sorted(zip(prices.values(), prices.keys()))
print(prices_sorted)

prices_sorted_v2 = sorted(prices.items(), key=lambda x: x[1])
print(prices_sorted_v2)

prices_sorted_v3 = sorted(prices.items(), key=operator.itemgetter(1))
print(prices_sorted_v3)
(612.78, 'AAPL') (10.75, 'FB')
[(10.75, 'FB'), (37.2, 'HPQ'), (45.23, 'ACME'), (205.55, 'IBM'), (612.78, 'AAPL')]
[('FB', 10.75), ('HPQ', 37.2), ('ACME', 45.23), ('IBM', 205.55), ('AAPL', 612.78)]
[('FB', 10.75), ('HPQ', 37.2), ('ACME', 45.23), ('IBM', 205.55), ('AAPL', 612.78)]

上述共有四种方法实现了对字典的排序,推荐使用最后两种。

查找两个字典的相同点

怎样在两个字典中寻找相同点,比如相同的键,相同的值等等?

a = {'x': 1, 'y': 2, 'z': 3}
b = {'w': 10, 'x': 11, 'y': 2}

# find keys in common
print(a.keys() & b.keys())
# find keys in a that are not in b
print(a.keys() - b.keys())
# find (key, value) pairs in common
print(a.items() & b.items())

# create new dict filter from a or b
print({key:a[key] for key in a.keys() - {'z', 'w'}})
{'x', 'y'}
{'z'}
{('y', 2)}
{'x': 1, 'y': 2}

删除序列相同元素并保持顺序

怎样在一个序列上面保持元素顺序的同时消除重复的值?如果序列上的值都是 hashable 类型,那么可以很简单的利用集合或者生成器来解决这个问题。

def dedupe(items):
    seen = set()
    for item in items:
        if item not in seen:
            yield item
            seen.add(item)

a=  [1, 5, 2, 1, 9, 5, 10]
print(list(dedupe(a)))
[1, 5, 2, 9, 10]

这个方法仅仅在序列中元素为 hashable 的时候才管用。如果你想消除元素不可哈希(比如 dict 类型)的序列中重复元素,需要将尚需代码修改:

def dedupe(items, key=None):
    seen = set()
    for item in items:
        val = item if key is None else key(item)
        if val not in seen:
            yield item
            seen.add(val)


a = [{'x': 1, 'y': 2}, {'x': 1, 'y': 3}, {'x': 1, 'y': 2}, {'x': 2, 'y': 4}]
print(list(dedupe(a, key=lambda d: (d['x'], d['y']))))

print(list(dedupe(a, key=lambda d: d['x'])))
[{'x': 1, 'y': 2}, {'x': 1, 'y': 3}, {'x': 2, 'y': 4}]
[{'x': 1, 'y': 2}, {'x': 2, 'y': 4}]

使用 set 也可以去重,但是会打乱顺序,上述的 key 参数模仿了 sorted,min,max 等函数。

命名切片

程序中出现一大堆已经无法直视的硬编码切片下表,该如何清理代码?假定有一段代码要从一个记录字符串中几个固定位置提取特定的数据字段:

record = '....................100 .......513.25 ..........'
cost = int(record[20:23]) * float(record[31:37])
print(cost)
SHARES = slice(20, 23)
PRICE = slice(31, 37)
cost = int(record[SHARES]) * float(record[PRICE])
print(cost)
51325.0
51325.0

第二个版本避免了无法理解的赢编码下标,使得代码清晰可读。

序列中出现次数最多的元素

怎样找出一个序列中出现次数最多的元素? collections.Counter 类专门为这类问题而设计的,它甚至有一个有用的 most_common()方法直接给了答案。

from collections import Counter

words = [
    'look', 'into', 'my', 'eyes', 'look', 'into', 'my', 'eyes', 'the', 'eyes',
    'the', 'eyes', 'the', 'eyes', 'not', 'around', 'the', 'eyes', "don't",
    'look', 'around', 'the', 'eyes', 'look', 'into', 'my', 'eyes', "you're",
    'under'
]

word_counts = Counter(words)
top_three = word_counts.most_common(3)
print(top_three)
[('eyes', 8), ('the', 5), ('look', 4)]

Counter 类的注释写着:Dict subclass for counting hashable items. Elements are stored as dictionary keys and their counts are stored as dictionary values.

from collections import Counter

# count elements from a string, create Counter instance from string.
c = Counter('abcdeabcdabcaba')
print(c)
# three most common elements
print(c.most_common(3))
# list all unique elements
print(sorted(c))
# list elements with repetitions
print(''.join(sorted(c.elements())))
# total of all counts
print(sum(c.values()))


print(c['a'])
# update counts from an iterable
for elem in 'shazam':
    c[elem] += 1
print(c['a'])

# remove all 'b'
del c['b']
print(c['b'])
Counter({'a': 5, 'b': 4, 'c': 3, 'd': 2, 'e': 1})
[('a', 5), ('b', 4), ('c', 3)]
['a', 'b', 'c', 'd', 'e']
aaaaabbbbcccdde
15
5
7
0

Counter instance 可以很容易跟数学运算操作相结合。

from collections  import Counter
words = [
    'look', 'into', 'my', 'eyes', 'look', 'into', 'my', 'eyes',
    'the', 'eyes', 'the', 'eyes', 'the', 'eyes', 'not', 'around', 'the',
    'eyes', "don't", 'look', 'around', 'the', 'eyes', 'look', 'into',
    'my', 'eyes', "you're", 'under'
]
morewords = ['why','are','you','not','looking','in','my','eyes']

a = Counter(words)
b = Counter(morewords)
print(a)
print(b)
print(a + b)
print(a - b)
Counter({'eyes': 8, 'the': 5, 'look': 4, 'into': 3, 'my': 3, 'around': 2, 'not': 1, "don't": 1, "you're": 1, 'under': 1})
Counter({'why': 1, 'are': 1, 'you': 1, 'not': 1, 'looking': 1, 'in': 1, 'my': 1, 'eyes': 1})
Counter({'eyes': 9, 'the': 5, 'look': 4, 'my': 4, 'into': 3, 'not': 2, 'around': 2, "don't": 1, "you're": 1, 'under': 1, 'why': 1, 'are': 1, 'you': 1, 'looking': 1, 'in': 1})
Counter({'eyes': 7, 'the': 5, 'look': 4, 'into': 3, 'my': 2, 'around': 2, "don't": 1, "you're": 1, 'under': 1})

Counter 对象在几乎所有需要制表或者计数数据的场合是非常有用的工具。

通过某个关键字排序一个字典列表

根据某个或几个字典字段来排序这个列表?通过使用 operator 模块的 itemgetter 函数,可以非常容易的排序这样的数据结构。

from operator import itemgetter
from pprint import pprint

rows = [
    {'fname': 'Brian', 'lname': 'Jones', 'uid': 1003},
    {'fname': 'David', 'lname': 'Beazley', 'uid': 1002},
    {'fname': 'John', 'lname': 'Cleese', 'uid': 1001},
    {'fname': 'Big', 'lname': 'Jones', 'uid': 1004}
]

rows_by_fname = sorted(rows, key=itemgetter('fname'))
rows_by_uid = sorted(rows, key=itemgetter('uid'))
pprint(rows_by_fname)
pprint(rows_by_uid)

# itemgetter support multiple keys
rows_by_lfname = sorted(rows, key=itemgetter('lname', 'fname'))
pprint(rows_by_lfname)
[{'fname': 'Big', 'lname': 'Jones', 'uid': 1004},
 {'fname': 'Brian', 'lname': 'Jones', 'uid': 1003},
 {'fname': 'David', 'lname': 'Beazley', 'uid': 1002},
 {'fname': 'John', 'lname': 'Cleese', 'uid': 1001}]
[{'fname': 'John', 'lname': 'Cleese', 'uid': 1001},
 {'fname': 'David', 'lname': 'Beazley', 'uid': 1002},
 {'fname': 'Brian', 'lname': 'Jones', 'uid': 1003},
 {'fname': 'Big', 'lname': 'Jones', 'uid': 1004}]
[{'fname': 'David', 'lname': 'Beazley', 'uid': 1002},
 {'fname': 'John', 'lname': 'Cleese', 'uid': 1001},
 {'fname': 'Big', 'lname': 'Jones', 'uid': 1004},
 {'fname': 'Brian', 'lname': 'Jones', 'uid': 1003}]

上例中,rows 被传递给接受一个关键字参数(key)的 sorted 内置函数,key 参数是一个 callable 类型,并且从 rows 中接受一个单一元素,然后返回被用来排序的值。 itemgetter()函数就是负责创建这个 callable 对象的。 operator.itemgetter 函数有一个被 rows 中的记录用来查找值的索引参数。可以是一个字典键名称,一个整型值或者任何能够传入一个对象的__getitem__()方法的值。如果你传入多个索引参数给 itemgetter(),它生成的 callable 对象会返回一个包含所有元素值的元组,并且,sorted()函数会根据这个元组中元素顺序去排序。但你想要同时在几个字段上面进行排序这种方法就很有用。当然 itemgetter()有时也可以用 lambda 表达式代替。

排序不支持原生比较的对象

如何排序类型相同的对象,但它们不支持原生的比较操作? 内置函数 sorted()有一个关键字参数 key,可以传入一个 callable 对象给它,这个 callable 对象对每个传入的对象返回一个值,这个值会被 sorted 用来排序这些对象

from operator import attrgetter


class User:
    def __init__(self, user_id):
        self.user_id = user_id

    def __repr__(self):
        return 'User({})'.format(self.user_id)


users = [User(23), User(3), User(99)]
print(users)
print(sorted(users, key=lambda u: u.user_id))

# 通过 operator.attrgetter()来代替 lambda
print(sorted(users, key=attrgetter('user_id')))
[User(23), User(3), User(99)]
[User(3), User(23), User(99)]
[User(3), User(23), User(99)]

选择使用 lambda 还是 attrgetter()都可以,但是 attrgetter()通常会快点,并且还能同时允许多个字段进行比较。这个和 itemgetter()函数功能类似。而且这个 attrgetter()不仅适用于 sorted 排序,对 min、max 之类的函数都适用。

通过某个字段将记录分组

你有一个字典或实例的序列,然后想根据某个特定的字段比如 date 来分组迭代访问。 itertools.groupby()函数对于这样的数据分组操作非常实用。

from itertools import groupby
from operator import itemgetter

rows = [
    {'address': '5412 N CLARK', 'date': '07/01/2012'},
    {'address': '5148 N CLARK', 'date': '07/04/2012'},
    {'address': '5800 E 58TH', 'date': '07/02/2012'},
    {'address': '2122 N CLARK', 'date': '07/03/2012'},
    {'address': '5645 N RAVENSWOOD', 'date': '07/02/2012'},
    {'address': '1060 W ADDISON', 'date': '07/02/2012'},
    {'address': '4801 N BROADWAY', 'date': '07/01/2012'},
    {'address': '1039 W GRANVILLE', 'date': '07/04/2012'},
]

rows.sort(key=itemgetter('date'))
for date, items in groupby(rows, key=itemgetter('date')):
    print(date)
    for i in items:
        print(' ', i)
07/01/2012
  {'address': '5412 N CLARK', 'date': '07/01/2012'}
  {'address': '4801 N BROADWAY', 'date': '07/01/2012'}
07/02/2012
  {'address': '5800 E 58TH', 'date': '07/02/2012'}
  {'address': '5645 N RAVENSWOOD', 'date': '07/02/2012'}
  {'address': '1060 W ADDISON', 'date': '07/02/2012'}
07/03/2012
  {'address': '2122 N CLARK', 'date': '07/03/2012'}
07/04/2012
  {'address': '5148 N CLARK', 'date': '07/04/2012'}
  {'address': '1039 W GRANVILLE', 'date': '07/04/2012'}

groupby()函数扫描整个序列并且连续相同值(或者根据指定 key 函数返回值相同)的元素序列。在每次迭代的时候,它会返回一个值和一个迭代对象,这个迭代对象可以生成元素值全部等于上面那个值中所有对象。一个非常重要的步骤是要根据指定的字段将数据排序。因为 groupby()仅仅检查连续的元素,如果事先并没有排序完成的话,分组函数将得不到想要的数据。如果仅仅只想根据 date 字段将数据分组到一个大的数据结构中,并且允许随机访问,最好用 collections.defaultdict() 来构建一个多值字典。

from collections import defaultdict
from pprint import pprint

rows = [
    {'address': '5412 N CLARK', 'date': '07/01/2012'},
    {'address': '5148 N CLARK', 'date': '07/04/2012'},
    {'address': '5800 E 58TH', 'date': '07/02/2012'},
    {'address': '2122 N CLARK', 'date': '07/03/2012'},
    {'address': '5645 N RAVENSWOOD', 'date': '07/02/2012'},
    {'address': '1060 W ADDISON', 'date': '07/02/2012'},
    {'address': '4801 N BROADWAY', 'date': '07/01/2012'},
    {'address': '1039 W GRANVILLE', 'date': '07/04/2012'},
]

rows_by_date = defaultdict(list)
for row in rows:
    rows_by_date[row['date']].append(row)

pprint(rows_by_date)
defaultdict(<class 'list'>,
            {'07/01/2012': [{'address': '5412 N CLARK', 'date': '07/01/2012'},
                            {'address': '4801 N BROADWAY',
                             'date': '07/01/2012'}],
             '07/02/2012': [{'address': '5800 E 58TH', 'date': '07/02/2012'},
                            {'address': '5645 N RAVENSWOOD',
                             'date': '07/02/2012'},
                            {'address': '1060 W ADDISON',
                             'date': '07/02/2012'}],
             '07/03/2012': [{'address': '2122 N CLARK', 'date': '07/03/2012'}],
             '07/04/2012': [{'address': '5148 N CLARK', 'date': '07/04/2012'},
                            {'address': '1039 W GRANVILLE',
                             'date': '07/04/2012'}]})

如果不考虑内存占用情况,这用方法比 groupby()函数迭代运行更快。

过滤序列元素

有一个数据序列,想利用一些规则从中提取出需要的值或者是缩短序列?最简单的过滤列表元素的方法就是适用列表推导式。

mylist = [1, 4, -5, 10, -7, 2, 3, -1]
print([n for n in mylist if n > 0])

pos = (n for n in mylist if n > 0)
print(pos)
[1, 4, 10, 2, 3]
<generator object <genexpr> at 0x7fd8fcbcfc50>

适用列表推导式的一个潜在缺陷是如果输入非常大的时候会产生一个非常大的结果集,占用大量内存。如果对内存有要求,可以适用生成器表达式来过滤元素。就如上例中第二中方法。有时候过滤规则比较复杂,不能简单的在列表推导或者生成器表达式中表达出来。比如,在过滤时需要处理一些异常或者其复杂情况,这时需要将过滤代码放入一个函数,然后调用内置函数 filter()。

values = ['1', '2', '-3', '-', '4', 'N/A', '5']


def is_int(val):
    try:
        int(val)
        return True
    except ValueError:
        return False


ivals = list(filter(is_int, values))
print(ivals)
['1', '2', '-3', '4', '5']

列表推导和生成器表达式通常情况下是过滤数据最简单的方式,当然在过滤数据时也可以转换数据。过滤操作的一个变种就是将不符合条件的值用新的值来代替,而不是丢弃它们。

mylist = [1, 4, -5, 10, -7, 2, 3, -1]
clip_neg = [n if n > 0 else 0 for n in mylist]
print(clip_neg)
clip_pos = [n if n < 0 else 0 for n in mylist]
print(clip_pos)
[1, 4, 0, 10, 0, 2, 3, 0]
[0, 0, -5, 0, -7, 0, 0, -1]

从字典中提取子集

你想构造一个字典,它是另外一个字典的子集?最简单的方式是适用字典推导式。

prices = {
    'ACME': 45.23,
    'AAPL': 612.78,
    'IBM': 205.55,
    'HPQ': 37.20,
    'FB': 10.75
}
p1 = {key: value for key, value in prices.items() if value > 200}
print(p1)
tech_names = {'AAPL', 'IBM', 'HPQ', 'MSFT'}
p2 = {key: value for key, value in prices.items() if key in tech_names}
print(p2)
{'AAPL': 612.78, 'IBM': 205.55}
{'AAPL': 612.78, 'IBM': 205.55, 'HPQ': 37.2}

大多数情况下字典推导式能做到的,通过创建一个元组序列然后把它传给 dict()函数也能实现,但是,字典推导式更加清晰,且实际运行数度更快。

映射名称到序列元素

你有一段通过下表访问列表或者元组中元素代码,但是这样有时候会使得代码难以阅读,于是想通过名称来访问元素。 collections.namedtuple()函数通过使用一个普通的元组对象来帮你解决这个问题。这个函数实际上是一个返回 python 中标准元组类型子类的一个工厂方法。你需要传递一个类型名称和你需要的字段给它,然后它就会返回一个类,你可以初始化这个类,为你定义的字段传递值等。

from collections import namedtuple

subscriber = namedtuple('Subscriber', ['addr', 'joined'])
sub = subscriber('joesy@example.com', '2012-10-19')
print(sub)
print(sub.addr)
print(sub.joined)
Subscriber(addr='joesy@example.com', joined='2012-10-19')
joesy@example.com
2012-10-19

尽管 namedtuple 的实例看起来像一个普通的类实例,但是它跟元素类型是可交换的,支持所有的普通元组操作,比如索引和解压。命名元组的一个主要用途是将你的代码从下标操作中解脱出来。因此,如果你从数据库调用中返回了一个很大的元组列表,通过下表区操作其中的元素,当你添加了新的列的时候你的代码可能就会出错,但是如果你使用了命名元组,那么就不会有这样的顾虑。下标操作通常会让代码不清晰,并且非常依赖记录的结构。

from collections import namedtuple

data = [('apple', 12, 3), ('iphone', 10, 4), ('mac', 11, 2), ('imac', 9, 3)]
Stock = namedtuple('Stock', ['name', 'shares', 'price'])


def compute_cost(records):
    total = 0.0
    for rec in records:
        s = Stock(*rec)
        total += s.shares * s.price
    return total

print(compute_cost(data))
125.0

命名元组的另一个用途是作为字典的替代,因为字典存储需要更多的内容空间。如果你需要构建一个非常大的包含字典的数据结构,那么使用命名元组会更加高效。但需要注意,不像字典那样,一个命名元组是不可更改的。如果需要改变属性的值,那么可以使用命名元组实例的_replace()方法,它会创建一个全新的命名元组并将对应的字段用新的值取代。 _replace()方法还有一个很有用的特性就是当你的命名元组拥有可选或者缺失字段时候,它是一个非常方便的填充数据的方法。

from collections import namedtuple

Stock = namedtuple('Stock', ['name', 'shares', 'price', 'date', 'time'])

stock_prototype = Stock('', 0, 0.0, None, None)


def dict_to_stock(s):
    return stock_prototype._replace(**s)


a = {'name': 'ACME', 'shares': 100, 'price': 123.45}
print(dict_to_stock(a))
b = {'name': 'ACME', 'shares': 100, 'price': 123.45, 'date': '12/17/2012'}
print(dict_to_stock(b))
Stock(name='ACME', shares=100, price=123.45, date=None, time=None)
Stock(name='ACME', shares=100, price=123.45, date='12/17/2012', time=None)

如果你的目标是定义一个需要更多实例属性的高效数据结构,那么命名元组并不是你的最佳选择,这时候你应该考虑定一个包含__slots__方法的类。

转换并同时计算数据

你需要在数据序列上执行聚集函数(比如 sum(),min(),max()),但是首先你需要先转换或者过滤数据。一个非常优雅的方式去结合数据计算与转换就是使用一个生成器表达式。

import os

nums = range(1, 6)
s = sum(x * x for x in nums)
print(s)

files = os.listdir('..')
if any(name.endswith('.org') for name in files):
    print('There be org file!')
else:
    print('Sorry, no org.')

s = ('ACME', 50, 123.45)
print(','.join(str(x) for x in s))

portfolio = [
    {'name': 'GOOG', 'shares': 50},
    {'name': 'YHOO', 'shares': 75},
    {'name': 'AOL', 'shares': 20},
    {'name': 'SCOX', 'shares': 65}
]
min_shares = min(s['shares'] for s in portfolio)
print(min_shares)
55
Sorry, no org.
ACME,50,123.45
20

上面的示例演示了当生成器表达式作为一个单独的参数传递给函数时的巧妙语法。使用一个生成器表达式作为参数会比先创建一个零时列表更加高效和优雅。

合并多个字典或映射

现在有多个字典或者映射,你想将它们从逻辑上合并为一个单一的映射后执行某些操作,比如查找值或者检查键是否存在。

from collections import ChainMap

a = {'x': 1, 'z': 3}
b = {'y': 2, 'z': 4}

c = ChainMap(a, b)
print(list(c.items()))
[('z', 3), ('y', 2), ('x', 1)]

一个 ChainMap 接受多个字典并将它们在逻辑上变为一个字典。然后,这些字典并不是真的合并在一起,ChainMap 类只是在内部创建了一个容纳这些字典的列表并重新定义了一些常见的字典操作类遍历这个列表。如果出现重复出现的键,那么第一次出现的映射值会被返回。因此,例子程序中的 c['z']总是会返回字典 a 中的值。对于字典的更新或者删除操作总是影响列表中第一个字典。 ChainMap 对于编程语言中的作用范围变量(比如 globals,locals 等)是非常有用的。

from collections import ChainMap
values = ChainMap()
values['x'] = 1

values = values.new_child()
values['x'] = 2

values = values.new_child()
values['x'] = 3
print(values['x'])
print(values)

values = values.parents
print(values['x'])
print(values)

values = values.parents
print(values['x'])
print(values)

3
ChainMap({'x': 3}, {'x': 2}, {'x': 1})
2
ChainMap({'x': 2}, {'x': 1})
1
ChainMap({'x': 1})

文本与 IO

使用多个界定符号分割字符

需要将一个字符串分割为多个字段,但是分隔符还有周围的空格并不是固定的。 string 对象的 split()方法只适应非常简单的分割情形,它并不允许有多个分割符号或者是分隔符周围不确定的空格。当你需要更加灵活的切割字符串的时候,最好使用 re.split()方法。

import re
line = 'asdf fjdk; afed, fjek,asdf, foo'
print(re.split('[;,\s]\s*', line))

fields = re.split('(;|,|\s)\s*', line)
print(fields)
['asdf', 'fjdk', 'afed', 'fjek', 'asdf', 'foo']
['asdf', ' ', 'fjdk', ';', 'afed', ',', 'fjek', ',', 'asdf', ',', 'foo']

字符串开头或结尾匹配

你需要通过指定的文本模式去检查字符串的开头或者结尾,比如文件名后缀,URL Scheme 等。检查字符串开头或结尾的一个简单方式是使用 str.startswith()或者 str.endswith()方法。如果检查多个匹配可能,可以使用列表推导式传入 any。

import os
from urllib.request import urlopen

filenames = os.listdir('.')
t = any(name.endswith('.org') for name in filenames)
print(t)


def read_data(name):
    if name.startswith(('http:', 'https:', 'ftp:')):
        return urlopen(name).read()
    else:
        with open(name) as f:
            return f.read()
True

数据编码和处理

函数

类与对象

元编程

在函数上添加包装器

如何在函数上添加一个包装器,增加额外的操作处理,比如日志,计时等?适用装饰器即可完成

import time
from functools import wraps


def timethis(func):
    """
    Decorator that reports the execution time.
    """

    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        end = time.time()
        print(func.__name__, end - start)
        return result

    return wrapper


@timethis
def countdown(n):
    '''
    Counts down
    '''
    while n > 0:
        n -= 1


countdown(100000)
countdown 0.00429224967956543

一个装饰器就是一个函数,它接受一个函数作为参数并返回一个新的函数。内置装饰器@staticmethod,@classmethod,@property,@wraps 原理都是一样。在上面 wrapper()函数中,装饰器内部定义了一个使用*args 和**kwargs 来接受任意参数的函数。在这个函数里调用原始函数并将其结果返回,不过还可以添加其他额外的代码,这个新的函数包装器被作为结果返回来替代原始函数。需要强调的是装饰器并不会修改原始函数的参数签名以及返回值。使用*args 和**kwargs 目的就是确保任何参数都能适用,而返回结果值节本都是调用原始函数 func(*args, **kwargs)的返回结果,其中 func 就是原始函数。 @wraps(func)注释很重要,它能保留原始函数的元数据。

创建装饰器时保留函数元信息

写一个装饰器作用在函数上,但是这个函数的重要的元信息比如名字,文档字符串,注释和参数签名都丢失了。任何时候定义装饰器时都应该使用 functools 中的@wraps 装饰器来注解底层包装函数。

import time
from functools import wraps


def timethis(func):
    '''
    Decorator that reports the execution time.
    '''

    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        end = time.time()
        print(func.__name__, end - start)
        return result

    return wrapper


@timethis
def countdown(n:int):
    '''
    Counts down
    '''
    while n > 0:
        n -= 1


countdown(100000)
print(countdown.__name__)
print(countdown.__doc__)
print(countdown.__annotations__)


countdown.__wrapped__(100000)

from inspect import signature
print(signature(countdown))
countdown 0.004256486892700195
countdown

    Counts down
    
{'n': <class 'int'>}
(n:int)

在编写装饰器的时候复制元信息时一个非常重要的部分。@wraps 的一个重要特征时通过属性__wrapped__直接访问被包装函数。 __wrapped__属性还能让被装饰函数正确暴露底层的参数签名信息。一个很普通的问题是怎样让装饰器去直接复制函数的参数签名信息,如果想自己手动实现需要大量工作,直接适用@wraps,通过底层的__wrapped__属性访问到函数签名信息。

解除一个装饰器

一个装饰器已经作用在一个函数上,如何撤销它,直接访问原始的未包装的那个函数。假设装饰器时通过@wraps 来实现的,那么可以通过访问__wrapped__属性来访问原始函数直接访问未包装的原始函数在调试、内省和其他函数操作时时很有用的。但是这里的方案仅仅适用与在包装器中正确适用了@wrapper 或者直接设置了__wrapped__属性的情况。如果有多个包装器,那么访问__wrapped__属性的行为时不可预知的,应该避免这样做。

from functools import wraps


def decorator1(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        print('Decorator 1')
        return func(*args, **kwargs)

    return wrapper


def decorator2(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        print('Decorator 2')
        return func(*args, **kwargs)

    return wrapper


@decorator1
@decorator2
def add(x, y):
    return x + y


print(add(2, 3))
print(add.__wrapped__(2, 3))
Decorator 1
Decorator 2
5
Decorator 2
5

可以看出在 python3.4 以上,适用__wrapped__直接调用函数时无效的,而且并不是所有的装饰器都适用了@wraps,特别的,内置的装饰器@staticmethod 和@classmethod 就没有遵循这个约定(它们把原始函数存储在属性__func__中)。

定义一个带参数的装饰器

如何定一个带参数的装饰器? 用一个例子来详细阐述接受参数的处理过程。假设想写一个装饰器给函数添加日志功能,同时允许用户指定日志级别和其他的选项。

import logging
from functools import wraps


def logged(level, name=None, message=None):
    """
    Add logging to a function, level is the logging
    level, name is the logger name, and message is the log
    message. If name and message aren't specified they default
    to the function's module and name.
    """

    def decorate(func):
        logname = name if name else func.__module__
        log = logging.getLogger(logname)
        logmsg = message if message else func.__name__

        @wraps(func)
        def wrapper(*args, **kwargs):
            try:
                return func(*args, **kwargs)
            except TypeError:
                log.log(level, logmsg)

        return wrapper

    return decorate


@logged(logging.DEBUG)
def add(x, y):
    return x + y


@logged(logging.CRITICAL, 'example')
def spam():
    print('Spam!')

可自定义属性的装饰器

写一个装饰器来包装一个函数,并且允许用户提供参数在运行时控制装饰器行为?引入一个访问函数,适用 nonlocal 来修改内部变量,然后这个访问函数被作为一个属性赋值给包装函数。

import logging

from functools import partial, wraps


def attach_wrapper(obj, func=None):
    if func is None:
        print('---------')
        return partial(attach_wrapper, obj)
    setattr(obj, func.__name__, func)
    print("+++++++++++")
    return func


def logged(level, name=None, message=None):
    '''
    Add logging to a function. level is the logging
    level, name is the logger name, and message is the
    log messagel If name and message aren't specified,
    they default to the function's module and name.
    '''

    def decorate(func):
        logname = name if name else func.__module__
        log = logging.getLogger(logname)
        logmsg = message if message else func.__name__

        @wraps(func)
        def wrapper(*args, **kwargs):
            log.log(level, logmsg)
            return func(*args, **kwargs)

        @attach_wrapper(wrapper)
        def set_level(newlevel):
            nonlocal level
            level = newlevel

        @attach_wrapper(wrapper)
        def set_message(newmsg):
            nonlocal logmsg
            logmsg = newmsg

        return wrapper

    return decorate


@logged(logging.DEBUG)
def add(x, y):
    return x + y


@logged(logging.CRITICAL, 'example')
def spam():
    print('Spam')


logging.basicConfig(level=logging.DEBUG)
print(add(2, 3))

add.set_message('Add called')
print(add(2, 3))

add.set_level(logging.WARNING)
print(add(2, 3))
---------
+++++++++++
---------
+++++++++++
---------
+++++++++++
---------
+++++++++++
5
5
5

模块与包

网络与 web 编程

并发编程

脚本编程与系统管理

测试、调试和异常

C 语言扩展

Created: 2018-01-19 五 14:03

Validate