0%

python学习笔记

python学习笔记

[toc]

推导式

推导式是从一个或者多个迭代器快速简洁地创建数据结构的一种方法。它可以将循环和条件判断结合,从而避免语法冗长的代码。

列表推导式
1
2
3
number_list = [number for number in range(1,6) if number % 2 == 1]
print(number_list)
[1,3,5]
字典推导式
1
2
3
4
word = 'letters'
letter_counts = {letter:word.count(letter) for letter in set(word)}
print(letter_counts)
{'t': 2, 'l': 1, 'e': 2, 'r': 1, 's': 1}
集合推导式
1
2
3
a_set = {number for number in range(1,6) if number % 3 == 1}
print(a_set)
{1, 4}
生成器函数

生成器函数:函数体含有yield关键字,生成器就是迭代器

1

装饰器

装饰器的功能是在不修改被装饰对象源代码以及调用方式的前提下,为其添加新功能。

  • res是一个用户密码认证的装饰器,index要运行的函数,用装饰器扩展功能后,运行index可以增加用户认证的功能。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    import time
    import random
    import sys

    user_dit={
    'user01':'python01',
    'user02':'python02',
    'user03':'python03',
    }

    login_status = {
    'user':None,
    'login':False,
    }

    with open('auth_info.txt','w',encoding='utf-8') as f:
    f.seek(0)
    f.truncate()
    f.write(str(user_dit))

    db_file = 'auth_info.txt'
    def res(app):
    def auth(*args,**kwargs):
    if login_status['user'] and login_status['login'] :
    user_name = login_status['user']
    app(user_name, **kwargs)
    else:
    user_name = input('user name: ')
    user_passwd = input('user passwd: ')
    with open(db_file, 'r', encoding='utf-8') as f:
    f_out = eval(f.read())
    if user_name in f_out and user_passwd == f_out[user_name]:
    print('login seccess')
    login_status['user'] = user_name
    login_status['login'] = True
    app(user_name,**kwargs)
    else:
    print('username or passwd error')
    sys.exit()
    return auth

    @res # index = res(index)
    def index(user_name,**kwargs):
    time.sleep(random.randrange(1,5))
    print('welecome to %s index page!' %user_name)

    index()

    index() # 第一次登录成功后,后面再次操作不需要输入密码

迭代器

1
2
res = [ num for num in range(6) ]
print(res)

生成器

函数体内包含yield关键字,运行时遇到yield暂停完成一次迭代,生成器就是迭代器。

生成器模拟linux命令

tail -f filename | grep word

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import time

def tail(file_name):
with open(file_name,encoding='utf-8') as f:
f.seek(0,2)
while True:
line = f.readline()
if line:
yield line
else:
time.sleep(0.5)

def grep(lines,word):
for i in lines:
if word in i:
print(i)

g = tail('a.txt')
grep(g,'error')
生成器表达式
1
2
3
res = (number for number in range(1,6) if number % 2 == 1)
for i in res:
print(i)
表达式形式的yield
1
2
3
4
5
6
7
8
9
10
11
12
13
14
def init(func):
def res(*args,**kwargs):
g = func(*args,**kwargs)
next(g)
return g
return res

init
def foo():
while True:
x = yield
print(x)

g = foo()

三元表达式

可以缩写if判断

1
2
3
4
5
6
7
8
9
def max(x,y):
# if x > y:
# return x
# else:
# return y

return x if x > y else y

print(max(1,2))

匿名函数lambda

1
2
res = lambda x,y:x*y
print(res(2,3))

递归调用

1
2
3
4
5
6
def age(n):
if n == 5:
return 18
return age(n+1)+2

print(age(1))

爬虫工具

selenium

pip install selenium安装,需要下载chrome驱动。

1
2
3
4
5
6
import selenium
from selenium import webdriver

driver = webdriver.Chrome()
driver.get('https://www.baidu.com')
print(driver.page_source)
phantomjs

phantomjs下载页,不需要打开浏览器,静默爬虫。

1
2
3
4
5
6
import selenium
from selenium import webdriver

driver = webdriver.PhantomJS()
driver.get('https://www.baidu.com')
print(driver.page_source)
pyquery

pip install pyquery

1
2
3
4
from pyquery import PyQuery as pq

doc = pq('<html>Hello World!</html>')
print(doc('html').text())
jupyter

可以web运行python命令,写笔记。

命令行传参

1
2
3
import sys

print('Program agruments: ',sys.argv)

正则re

查找findall
1
2
3
import re
r = re.compile("a\d")
res = r.findall("a1,ab")
分组( )
1
2
3
4
import re
r = re.compile("(?P<year>20[0|1]\d)-(?P<month>\d{1,2})")
res = r.search("2018-03")
print(res.group("year"))
爬虫豆瓣电影排名
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import requests,re

def getpage():
res = requests.get("https://movie.douban.com/top250?start=25&filter=")
return res.text

def run():
ret = getpage()
r = re.compile('<div class="item">.*?<em.*?>(\d+)</em>?.*?<div class="info">.*?<span class="title">(.*?)</span>?.*?<div class="star">.*?<span>(\d+)人评价</span>',re.S)
res = r.findall(ret)
for item in res:
print(item)

run()

configparser模块(ini文件)

1
2
3
4
5
6
7
8
9
import configparser

cfg = configparser.ConfigParser()
# DEFAULT全局配置
cfg["DEFAULT"] = {"Access":"ReadWrite","Connect":"DSN=AdvWorks"}
cfg["USER"] = {"Name":"User"}

with open("cfg.ini","w") as f:
cfg.write(f)
1
2
3
4
5
6
7
import configparser

cfg = configparser.ConfigParser()
cfg.read("cfg.ini")

print(cfg["DEFAULT"]["Access"])
print(cfg.items("USER"))

subprocess模块(子进程执行linux命令)

1
2
3
4
5
import subprocess

s = subprocess.Popen("dir",shell=True)
s.wait()
print("ending")
1
2
3
4
import subprocess

s = subprocess.Popen("dir",shell=True,stdout=subprocess.PIPE)
print(s.stdout.read().decode("gbk"))

class,__init__方法

1
2
3
4
5
6
7
8
9
10
11
12
class Chinese:
country = "china"

def __init__(self,name,age):
self.name = name
self.age = age

def work(self):
print("work")

p1 = Chinese("user01","22")
p2 = Chinese("user02","23")

pyhton代码规范检查

pycodestyle检查代码是否符合pep 8规范

python官方提供了检查pep8代码规范的命令行工具pycodestyle,改工具可以检查python代码是否违反pep 8规范,并对违反的地方给出提示。

1
2
3
4
5
6
7
pip install pycodestyle

# 对python代码检查并打印检查报告
pycodestyle --first test.py

# --show-source显示不规范的源码
pycodestyle --show-source --show-pep8 test.py
使用autopep8将代码格式化

autopep8是一个开源的命令行工具,他能够将python代码自动格式化为pep8风格。autopep8使用pycodestyle来决定哪部分代码需要格式化。

1
2
3
4
pip install autopep8

# 代码自动格式化pep8规范
autopep8 --in-place test.py

读取标准输出

cat fileinput.py

1
2
3
4
import fileinput

for line in fileinput.input():
print(line, end="")
1
python fileinput.py /etc/passwd /etc/hosts

类的继承

继承是创建新的类的一种方式,继承可以减少重复代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class people:
def __init__(self,name,age):
self.name = name
self.age = age
def walk(self):
print('%s is walking' %self.name)

class teacher(people):
school = "abc"
def __init__(self,name,age,sex):
people.__init__(self,name,age)
self.sex = sex
def teach(self):
print('%s is teaching' %self.name)

class student(people):
def __init__(self,name,age,level):
people.__init__(self,name,age)
self.level = level
def study(self):
print('%s is studying' %self.name)

t = teacher('egon','18','man') # 实例化
print(t.name,t.age,t.school,t.sex)
super
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class people:
def __init__(self,name,age):
self.name = name
self.age = age
def walk(self):
print('%s is walking' %self.name)

class teacher(people):
school = "abc"
def __init__(self,name,age,sex):
super().__init__(name,age)
self.sex = sex
def teach(self):
print('%s is teaching' %self.name)

t = teacher('egon','18','man') # 实例化
print(t.name,t.age,t.school,t.sex)

类的组合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class date():
def __init__(self,year,mon,day):
self.year = year
self.mon = mon
self.day = day
def tell_date(self):
print('今天的日期是:%s年-%s月-%s日' %(self.year,self.mon,self.day))

class teacher:
def __init__(self,name,age,year,mon,day):
self.name = name
self.age = age
self.date = date(year,mon,day)
def teach(self):
print('{} is teaching'.format(self.name))

t = teacher('user01',18,2018,22,12)
print(t.name,t.age,t.date.year,t.date.mon,t.date.day)

归一化接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import abc

class Animal(metaclass=abc.ABCMeta):
@abc.abstractmethod
def talk(self):
pass

class people(Animal):
def talk(self):
print('say hello')

class dog(Animal):
def talk(self):
print('wang wang wang')

d = dog()
d.talk()

绑定方法与非绑定方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import abc
import hashlib
import time

class Animal(metaclass=abc.ABCMeta):
@abc.abstractmethod
def talk(self):
pass

class people(Animal):
def talk(self):
print('say hello')

@classmethod
def func(cls):
print('say func')

@staticmethod
def sum(x,y):
print(x+y)

# 绑定到函数的方法
p = people()
p.talk()
# 绑定到类的方法
people.func()
# 非绑定方法
people.sum(1,2)

反射

hasattr查看对象是否有某个属性
getattr查看对象某个属性的值
setattr 设置对象某个属性的值
1
2
3
4
5
6
7
8
9
class foo:
say = "hello"
def __init__(self,name,age):
self.name = name
self.age = age

print(hasattr(foo,"say"))
setattr(foo,"say","hello world")
print(getattr(foo,"say"))

面向对象高级用法

__str__可以替换内存地址信息为别的信息
1
2
3
4
5
6
7
8
9
10
class foo:
def __init__(self,name,age):
self.name = name
self.age = age

def __str__(self):
return "{}:{}".format(self.name,self.age)

f = foo("user",18)
print(f)
__del__程序执行完后的操作,执行清理操作
1
2
3
4
5
6
7
8
9
class foo:
def __init__(self,name,age):
self.name = name
self.age = age

def __del__(self):
print("程序运行完成")

f = foo("user",18)
类使用[ ]取值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class foo:
def __init__(self,name,age):
self.name = name
self.age = age

def __getitem__(self, item):
return getattr(self,item)

def __setitem__(self, key, value):
setattr(self,key,value)

def __delitem__(self, key):
delattr(self,key)

f = foo("user",18)
print(f.__dict__)
f["name"] = "user01"
print(f.__dict__)
del f["name"]
print(f.__dict__)

socket编程

tcp socket模拟ssh

server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import socket
import subprocess

sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sock.bind(("",9999))
sock.listen(1)

while True:
conn,addr = sock.accept() #等待、阻塞
print("got a new customer",conn,addr)

while True:
data = conn.recv(1024) #等待 bytes
print("received",data)
if not data: #客户端断开了
print("conn %s is lost" % str(addr) )
conn.close() #清除了跟这个断开的链接的实例对象
break
#conn.send(data.upper() )
cmd = subprocess.Popen(data.decode("utf-8"),
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)

stdout = cmd.stdout.read()
stderr = cmd.stderr.read()
cmd_res = stdout + stderr
if not cmd_res:
cmd_res = b'cmd has not output'

conn.sendall(cmd_res) #不能发空消息

sock.close() #关闭端口和服务

client

1
2
3
4
5
6
7
8
9
10
11
12
13
import socket

sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) #数据流
sock.connect(('127.0.0.1',9999))

while True:
msg = input(">>>:").strip()
if not msg : continue
sock.send(msg.encode("utf-8"))
response = sock.recv(1024)
print("received:",response.decode("gbk"))

sock.close()
udp socket

server

1
2
3
4
5
6
7
8
9
10
11
import socket

ip_port = ('127.0.0.1',9999)

udp_server_client = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
udp_server_client.bind(ip_port)

while True:
msg,addr = udp_server_client.recvfrom(1024)
print(msg,addr)
udp_server_client.sendto(msg,addr)

client

1
2
3
4
5
6
7
8
9
10
11
12
13
import socket

ip_port = ('127.0.0.1',9999)

udp_server_client = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)

while True:
msg = input('>>: ').strip()
if not msg:continue
udp_server_client.sendto(msg.encode('utf-8'),ip_port)

bank_msg,addr = udp_server_client.recvfrom(1024)
print(bank_msg.decode('utf-8'),addr)

threading线程

event

当有几个线程的时候,可能线程A依赖线程B的运行,可以用event对象让线程A等待线程B

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import threading
import time

event = threading.Event()

def foo():
print("wait...")
event.wait(10)
print("connect to redis server")

def bar():
while not event.is_set():
print("waiting for even...")
event.wait(1)

for i in range(5):
t1 = threading.Thread(target=foo,args=())
t1.start()

t2 = threading.Thread(target=bar,args=())
t2.start()

print("attempt to start redis server")
time.sleep(8)

print("set...")
event.set()
线程队列queue

先进先出和先进后出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import queue
import threading
import time

def foo():
q.put(3.14)
q.put("hello")
q.put("yes")

def bar():
print(">>>start...")
while not q.empty():
print(q.get())
q.task_done()

q = queue.Queue(10) # queue.LifoQueue(10) # queue.PriorityQueue(10)

t1 = threading.Thread(target=foo,args=())
t2 = threading.Thread(target=bar,args=())

t1.start()
t2.start()

优先级模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import queue
import threading
import time

def foo():
q.put([1,3.14])
q.put([3,"hello"])
q.put([2,"yes"])

def bar():
print(">>>start...")
while not q.empty():
print(q.get()【】)
q.task_done()

q = queue.PriorityQueue(10)

t1 = threading.Thread(target=foo,args=())
t2 = threading.Thread(target=bar,args=())

t1.start()
t2.start()

进程 multiprocessing

multiprocessing
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import multiprocessing
import time

def foo(n):
res = 1
for i in range(1,n):
res += i
print(res)

def bar(n):
res = 1
for i in range(1,n):
res *= i
print(res)

s = time.time()

if __name__ == '__main__':
m1 = multiprocessing.Process(target=foo,args=(100000000,))
m2 = multiprocessing.Process(target=bar,args=(100000,))
m1.start()
m2.start()
m1.join()
m2.join()
print(time.time() - s)
进程queue通信
1
2
3
4
5
6
7
8
9
10
import multiprocessing

def foo(q):
q.put([11,"hello",True])

if __name__ == '__main__':
q = multiprocessing.Queue()
m = multiprocessing.Process(target=foo,args=(q,))
m.start()
print(q.get())
管道pipe通信
1
2
3
4
5
6
7
8
9
10
11
12
13
from multiprocessing import Pipe,Process

def foo(sock):
sock.send("hello world!")
print(sock.recv())

sock,conn = Pipe()

if __name__ == '__main__':
m = Process(target=foo, args=(sock,))
m.start()
print(conn.recv())
conn.send("hi boy!")
manage进程通信(最优方法)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from multiprocessing import Manager,Process

def foo(l,i):
l.append(i*i)

if __name__ == '__main__':
manage = Manager()
mlist = manage.list([11,22,33])

for i in range(5):
p = Process(target=foo, args=(mlist,i))
p.start()
p.join()

print(mlist)
使用进程池并发pool
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from multiprocessing import Pool
import time

def foo(n):
print(n)
time.sleep(2)

if __name__ == '__main__':

p = Pool(5)
for i in range(100):
p.apply_async(func=foo,args=(i,))
p.close()
p.join()

print("ending...")

协程

yield
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import time

def consumer():
r = ''
while True:
n = yield r
if not n:
return
print("consumer: ",n)
time.sleep(3)
r = "200 ok"

def produce(c):
next(c)
n = 0
while n < 5:
n = n + 1
print("produce: ",n)
cr = c.send(n)
print("consumer return: ",cr)
c.close()

if __name__ == '__main__':
c = consumer()
produce(c)
greenlet
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from greenlet import greenlet

def foo():
print("ok1")
g2.switch()
print("ok3")
g2.switch()

def bar():
print("ok2")
g1.switch()
print("ok4")

g1 = greenlet(foo)
g2 = greenlet(bar)

g1.switch()
geven
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import requests
import gevent
import time

def foo(url):
res = requests.get(url)
res_str = res.text
print("len: ",len(res_str))

s = time.time()
gevent.joinall([
gevent.spawn(foo, "https://www.baidu.com/"),
gevent.spawn(foo, "https://translate.google.cn/"),
gevent.spawn(foo, "https://www.zhihu.com/")
])

print(time.time() - s)

io多路复用

selectpollepoll模型

socketserver

server端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import socketserver
import subprocess

class myserver(socketserver.BaseRequestHandler):
def handle(self):
print("from conn: ",self.request)

while True:
data = self.request.recv(1024)
print(data.decode("utf-8"))

if data.decode("utf-8") == "q":
break
response = subprocess.Popen(data.decode("utf-8"),
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout = response.stdout.read()
stderr = response.stderr.read()
res = stdout + stderr
if type(res) is bytes:
self.request.send(res)
else:
self.request.send(res.encode("utf-8"))

s = socketserver.ThreadingTCPServer(("127.0.0.1",8800),myserver)

s.serve_forever()

client端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import socket
import platform

sock = socket.socket()
sock.connect(("127.0.0.1",8800))

while True:
data = input(">>>: ")
sock.send(data.encode("utf-8"))
response = sock.recv(1024)
if platform.system() == "Windows":
print(response.decode("gbk"))
else:
print(response.decode("utf-8"))

定时任务

Timer
1
2
3
4
5
6
7
8
from threading import Timer

def do_timer():
print('hello timer')
timer = Timer(1, do_timer)
timer.start()

do_timer()
schedule
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from threading import Thread
import schedule
import datetime
import time

def job():
print(datetime.datetime.now())

def do_job():
Thread(target=job).start()

def run():
# #每10分钟执行一次job函数
# schedule.every(10).minutes.do(test)
# # 每10秒执行一次job函数
# schedule.every(10).seconds.do(test)
# # 当every()没参数时默认是1小时/分钟/秒执行一次job函数
# schedule.every().hour.do(test)
# schedule.every().day.at("10:30").do(test)
# schedule.every().monday.do(test)
# # 具体某一天某个时刻执行一次job函数
# schedule.every().wednesday.at("13:15").do(test)
# # 可以同时定时执行多个任务,但是每个任务是按顺序执行
# schedule.every(10).seconds.do(job2)
# # 如果job函数有有参数时,这么写
# schedule.every(10).seconds.do(job,"参数")

schedule.every(10).minutes.do(do_job)
while True:
schedule.run_pending()
time.sleep(1)

run()

re正则

1
2
3
4
5
import re

re_obj = re.compile('(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3})\d{3}(Z)')
start_time = re_obj.match(format_start_time).group(1) + re_obj.match(format_start_time).group(2)
end_time = re_obj.match(format_end_time).group(1) + re_obj.match(format_end_time).group(2)

flask_caching增加缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from flask_caching import Cache

cache = Cache(config={'CACHE_TYPE': 'simple'})
cache.init_app(app)
````


#### xml转doct

```python
import xmltodict

with open('server.xml','r') as f:
f_xml = f.read()
f_dict = xmltodict.parse(f_xml)

url编码解码

1
2
3
from urllib import parse
parse.quote_plus('抱歉')
parse.unquote_plus('%E6%8A%B1%E6%AD%89')

高阶函数

map

1
2
list(map(str, [1, 2, 3, 4, 5, 6, 7, 8, 9]))
['1', '2', '3', '4', '5', '6', '7', '8', '9']

add-apt-repository

apt-get install python-software-properties
apt-get install software-properties-common