python学习笔记 [toc]
推导式
推导式是从一个或者多个迭代器快速简洁地创建数据结构的一种方法。它可以将循环和条件判断结合,从而避免语法冗长的代码。
列表推导式 1 2 3 number_list = [number for number in range (1 ,6 ) if number % 2 == 1 ] print (number_list)[1 ,3 ,5 ]
字典推导式 1 2 3 4 word = 'letters' letter_counts = {letter:word.count(letter) for letter in set (word)} print (letter_counts){'t' : 2 , 'l' : 1 , 'e' : 2 , 'r' : 1 , 's' : 1 }
集合推导式 1 2 3 a_set = {number for number in range (1 ,6 ) if number % 3 == 1 } print (a_set){1 , 4 }
生成器函数
生成器函数:函数体含有yield关键字,生成器就是迭代器
装饰器
装饰器的功能是在不修改被装饰对象源代码以及调用方式的前提下,为其添加新功能。
res是一个用户密码认证的装饰器,index要运行的函数,用装饰器扩展功能后,运行index可以增加用户认证的功能。1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 import timeimport randomimport sysuser_dit={ 'user01' :'python01' , 'user02' :'python02' , 'user03' :'python03' , } login_status = { 'user' :None , 'login' :False , } with open ('auth_info.txt' ,'w' ,encoding='utf-8' ) as f: f.seek(0 ) f.truncate() f.write(str (user_dit)) db_file = 'auth_info.txt' def res (app ): def auth (*args,**kwargs ): if login_status['user' ] and login_status['login' ] : user_name = login_status['user' ] app(user_name, **kwargs) else : user_name = input ('user name: ' ) user_passwd = input ('user passwd: ' ) with open (db_file, 'r' , encoding='utf-8' ) as f: f_out = eval (f.read()) if user_name in f_out and user_passwd == f_out[user_name]: print ('login seccess' ) login_status['user' ] = user_name login_status['login' ] = True app(user_name,**kwargs) else : print ('username or passwd error' ) sys.exit() return auth @res def index (user_name,**kwargs ): time.sleep(random.randrange(1 ,5 )) print ('welecome to %s index page!' %user_name) index() index()
迭代器 1 2 res = [ num for num in range (6 ) ] print (res)
生成器
函数体内包含yield关键字,运行时遇到yield暂停完成一次迭代,生成器就是迭代器。
生成器模拟linux命令
tail -f filename | grep word
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import timedef tail (file_name ): with open (file_name,encoding='utf-8' ) as f: f.seek(0 ,2 ) while True : line = f.readline() if line: yield line else : time.sleep(0.5 ) def grep (lines,word ): for i in lines: if word in i: print (i) g = tail('a.txt' ) grep(g,'error' )
生成器表达式 1 2 3 res = (number for number in range (1 ,6 ) if number % 2 == 1 ) for i in res: print (i)
表达式形式的yield 1 2 3 4 5 6 7 8 9 10 11 12 13 14 def init (func ): def res (*args,**kwargs ): g = func(*args,**kwargs) next (g) return g return res init def foo (): while True : x = yield print (x) g = foo()
三元表达式
可以缩写if判断
1 2 3 4 5 6 7 8 9 def max (x,y ): return x if x > y else y print (max (1 ,2 ))
匿名函数lambda 1 2 res = lambda x,y:x*y print (res(2 ,3 ))
递归调用 1 2 3 4 5 6 def age (n ): if n == 5 : return 18 return age(n+1 )+2 print (age(1 ))
爬虫工具 selenium
pip install selenium
安装,需要下载chrome驱动。
1 2 3 4 5 6 import seleniumfrom selenium import webdriverdriver = webdriver.Chrome() driver.get('https://www.baidu.com' ) print (driver.page_source)
phantomjs
phantomjs下载页 ,不需要打开浏览器,静默爬虫。
1 2 3 4 5 6 import seleniumfrom selenium import webdriverdriver = webdriver.PhantomJS() driver.get('https://www.baidu.com' ) print (driver.page_source)
pyquery
pip install pyquery
1 2 3 4 from pyquery import PyQuery as pqdoc = pq('<html>Hello World!</html>' ) print (doc('html' ).text())
jupyter
可以web运行python命令,写笔记。
命令行传参 1 2 3 import sysprint ('Program agruments: ' ,sys.argv)
正则re 查找findall
1 2 3 import rer = re.compile ("a\d" ) res = r.findall("a1,ab" )
分组( ) 1 2 3 4 import rer = re.compile ("(?P<year>20[0|1]\d)-(?P<month>\d{1,2})" ) res = r.search("2018-03" ) print (res.group("year" ))
爬虫豆瓣电影排名 1 2 3 4 5 6 7 8 9 10 11 12 13 14 import requests,redef getpage (): res = requests.get("https://movie.douban.com/top250?start=25&filter=" ) return res.text def run (): ret = getpage() r = re.compile ('<div class="item">.*?<em.*?>(\d+)</em>?.*?<div class="info">.*?<span class="title">(.*?)</span>?.*?<div class="star">.*?<span>(\d+)人评价</span>' ,re.S) res = r.findall(ret) for item in res: print (item) run()
configparser
模块(ini文件)1 2 3 4 5 6 7 8 9 import configparsercfg = configparser.ConfigParser() cfg["DEFAULT" ] = {"Access" :"ReadWrite" ,"Connect" :"DSN=AdvWorks" } cfg["USER" ] = {"Name" :"User" } with open ("cfg.ini" ,"w" ) as f: cfg.write(f)
1 2 3 4 5 6 7 import configparsercfg = configparser.ConfigParser() cfg.read("cfg.ini" ) print (cfg["DEFAULT" ]["Access" ])print (cfg.items("USER" ))
subprocess
模块(子进程执行linux命令)1 2 3 4 5 import subprocesss = subprocess.Popen("dir" ,shell=True ) s.wait() print ("ending" )
1 2 3 4 import subprocesss = subprocess.Popen("dir" ,shell=True ,stdout=subprocess.PIPE) print (s.stdout.read().decode("gbk" ))
class,__init__
方法 1 2 3 4 5 6 7 8 9 10 11 12 class Chinese : country = "china" def __init__ (self,name,age ): self.name = name self.age = age def work (self ): print ("work" ) p1 = Chinese("user01" ,"22" ) p2 = Chinese("user02" ,"23" )
pyhton代码规范检查 pycodestyle检查代码是否符合pep 8规范
python官方提供了检查pep8代码规范的命令行工具pycodestyle,改工具可以检查python代码是否违反pep 8规范,并对违反的地方给出提示。
1 2 3 4 5 6 7 pip install pycodestyle pycodestyle --first test.py pycodestyle --show-source --show-pep8 test.py
使用autopep8将代码格式化
autopep8是一个开源的命令行工具,他能够将python代码自动格式化为pep8风格。autopep8使用pycodestyle来决定哪部分代码需要格式化。
1 2 3 4 pip install autopep8 autopep8 --in -place test.py
读取标准输出 cat fileinput.py
1 2 3 4 import fileinputfor line in fileinput.input (): print (line, end="" )
1 python fileinput.py /etc/passwd /etc/hosts
类的继承
继承是创建新的类的一种方式,继承可以减少重复代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 class people : def __init__ (self,name,age ): self.name = name self.age = age def walk (self ): print ('%s is walking' %self.name) class teacher (people ): school = "abc" def __init__ (self,name,age,sex ): people.__init__(self,name,age) self.sex = sex def teach (self ): print ('%s is teaching' %self.name) class student (people ): def __init__ (self,name,age,level ): people.__init__(self,name,age) self.level = level def study (self ): print ('%s is studying' %self.name) t = teacher('egon' ,'18' ,'man' ) print (t.name,t.age,t.school,t.sex)
super 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 class people : def __init__ (self,name,age ): self.name = name self.age = age def walk (self ): print ('%s is walking' %self.name) class teacher (people ): school = "abc" def __init__ (self,name,age,sex ): super ().__init__(name,age) self.sex = sex def teach (self ): print ('%s is teaching' %self.name) t = teacher('egon' ,'18' ,'man' ) print (t.name,t.age,t.school,t.sex)
类的组合 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class date (): def __init__ (self,year,mon,day ): self.year = year self.mon = mon self.day = day def tell_date (self ): print ('今天的日期是:%s年-%s月-%s日' %(self.year,self.mon,self.day)) class teacher : def __init__ (self,name,age,year,mon,day ): self.name = name self.age = age self.date = date(year,mon,day) def teach (self ): print ('{} is teaching' .format (self.name)) t = teacher('user01' ,18 ,2018 ,22 ,12 ) print (t.name,t.age,t.date.year,t.date.mon,t.date.day)
归一化接口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import abcclass Animal (metaclass=abc.ABCMeta): @abc.abstractmethod def talk (self ): pass class people (Animal ): def talk (self ): print ('say hello' ) class dog (Animal ): def talk (self ): print ('wang wang wang' ) d = dog() d.talk()
绑定方法与非绑定方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import abcimport hashlibimport timeclass Animal (metaclass=abc.ABCMeta): @abc.abstractmethod def talk (self ): pass class people (Animal ): def talk (self ): print ('say hello' ) @classmethod def func (cls ): print ('say func' ) @staticmethod def sum (x,y ): print (x+y) p = people() p.talk() people.func() people.sum (1 ,2 )
反射 hasattr
查看对象是否有某个属性getattr
查看对象某个属性的值setattr
设置对象某个属性的值1 2 3 4 5 6 7 8 9 class foo : say = "hello" def __init__ (self,name,age ): self.name = name self.age = age print (hasattr (foo,"say" ))setattr (foo,"say" ,"hello world" )print (getattr (foo,"say" ))
面向对象高级用法 __str__
可以替换内存地址信息为别的信息1 2 3 4 5 6 7 8 9 10 class foo : def __init__ (self,name,age ): self.name = name self.age = age def __str__ (self ): return "{}:{}" .format (self.name,self.age) f = foo("user" ,18 ) print (f)
__del__
程序执行完后的操作,执行清理操作1 2 3 4 5 6 7 8 9 class foo : def __init__ (self,name,age ): self.name = name self.age = age def __del__ (self ): print ("程序运行完成" ) f = foo("user" ,18 )
类使用[ ]
取值 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class foo : def __init__ (self,name,age ): self.name = name self.age = age def __getitem__ (self, item ): return getattr (self,item) def __setitem__ (self, key, value ): setattr (self,key,value) def __delitem__ (self, key ): delattr (self,key) f = foo("user" ,18 ) print (f.__dict__)f["name" ] = "user01" print (f.__dict__)del f["name" ]print (f.__dict__)
socket编程 tcp socket模拟ssh server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import socketimport subprocesssock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) sock.bind(("" ,9999 )) sock.listen(1 ) while True : conn,addr = sock.accept() print ("got a new customer" ,conn,addr) while True : data = conn.recv(1024 ) print ("received" ,data) if not data: print ("conn %s is lost" % str (addr) ) conn.close() break cmd = subprocess.Popen(data.decode("utf-8" ), shell=True , stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout = cmd.stdout.read() stderr = cmd.stderr.read() cmd_res = stdout + stderr if not cmd_res: cmd_res = b'cmd has not output' conn.sendall(cmd_res) sock.close()
client
1 2 3 4 5 6 7 8 9 10 11 12 13 import socketsock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) sock.connect(('127.0.0.1' ,9999 )) while True : msg = input (">>>:" ).strip() if not msg : continue sock.send(msg.encode("utf-8" )) response = sock.recv(1024 ) print ("received:" ,response.decode("gbk" )) sock.close()
udp socket server
1 2 3 4 5 6 7 8 9 10 11 import socketip_port = ('127.0.0.1' ,9999 ) udp_server_client = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) udp_server_client.bind(ip_port) while True : msg,addr = udp_server_client.recvfrom(1024 ) print (msg,addr) udp_server_client.sendto(msg,addr)
client
1 2 3 4 5 6 7 8 9 10 11 12 13 import socketip_port = ('127.0.0.1' ,9999 ) udp_server_client = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) while True : msg = input ('>>: ' ).strip() if not msg:continue udp_server_client.sendto(msg.encode('utf-8' ),ip_port) bank_msg,addr = udp_server_client.recvfrom(1024 ) print (bank_msg.decode('utf-8' ),addr)
threading线程 event
当有几个线程的时候,可能线程A依赖线程B的运行,可以用event对象让线程A等待线程B
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import threadingimport timeevent = threading.Event() def foo (): print ("wait..." ) event.wait(10 ) print ("connect to redis server" ) def bar (): while not event.is_set(): print ("waiting for even..." ) event.wait(1 ) for i in range (5 ): t1 = threading.Thread(target=foo,args=()) t1.start() t2 = threading.Thread(target=bar,args=()) t2.start() print ("attempt to start redis server" )time.sleep(8 ) print ("set..." )event.set ()
线程队列queue
先进先出和先进后出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import queueimport threadingimport timedef foo (): q.put(3.14 ) q.put("hello" ) q.put("yes" ) def bar (): print (">>>start..." ) while not q.empty(): print (q.get()) q.task_done() q = queue.Queue(10 ) t1 = threading.Thread(target=foo,args=()) t2 = threading.Thread(target=bar,args=()) t1.start() t2.start()
优先级模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import queueimport threadingimport timedef foo (): q.put([1 ,3.14 ]) q.put([3 ,"hello" ]) q.put([2 ,"yes" ]) def bar (): print (">>>start..." ) while not q.empty(): print (q.get()【】) q.task_done() q = queue.PriorityQueue(10 ) t1 = threading.Thread(target=foo,args=()) t2 = threading.Thread(target=bar,args=()) t1.start() t2.start()
进程 multiprocessing multiprocessing 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import multiprocessingimport timedef foo (n ): res = 1 for i in range (1 ,n): res += i print (res) def bar (n ): res = 1 for i in range (1 ,n): res *= i print (res) s = time.time() if __name__ == '__main__' : m1 = multiprocessing.Process(target=foo,args=(100000000 ,)) m2 = multiprocessing.Process(target=bar,args=(100000 ,)) m1.start() m2.start() m1.join() m2.join() print (time.time() - s)
进程queue通信 1 2 3 4 5 6 7 8 9 10 import multiprocessingdef foo (q ): q.put([11 ,"hello" ,True ]) if __name__ == '__main__' : q = multiprocessing.Queue() m = multiprocessing.Process(target=foo,args=(q,)) m.start() print (q.get())
管道pipe通信 1 2 3 4 5 6 7 8 9 10 11 12 13 from multiprocessing import Pipe,Processdef foo (sock ): sock.send("hello world!" ) print (sock.recv()) sock,conn = Pipe() if __name__ == '__main__' : m = Process(target=foo, args=(sock,)) m.start() print (conn.recv()) conn.send("hi boy!" )
manage进程通信(最优方法) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from multiprocessing import Manager,Processdef foo (l,i ): l.append(i*i) if __name__ == '__main__' : manage = Manager() mlist = manage.list ([11 ,22 ,33 ]) for i in range (5 ): p = Process(target=foo, args=(mlist,i)) p.start() p.join() print (mlist)
使用进程池并发pool 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from multiprocessing import Poolimport timedef foo (n ): print (n) time.sleep(2 ) if __name__ == '__main__' : p = Pool(5 ) for i in range (100 ): p.apply_async(func=foo,args=(i,)) p.close() p.join() print ("ending..." )
协程 yield 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import timedef consumer (): r = '' while True : n = yield r if not n: return print ("consumer: " ,n) time.sleep(3 ) r = "200 ok" def produce (c ): next (c) n = 0 while n < 5 : n = n + 1 print ("produce: " ,n) cr = c.send(n) print ("consumer return: " ,cr) c.close() if __name__ == '__main__' : c = consumer() produce(c)
greenlet 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from greenlet import greenletdef foo (): print ("ok1" ) g2.switch() print ("ok3" ) g2.switch() def bar (): print ("ok2" ) g1.switch() print ("ok4" ) g1 = greenlet(foo) g2 = greenlet(bar) g1.switch()
geven 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import requestsimport geventimport timedef foo (url ): res = requests.get(url) res_str = res.text print ("len: " ,len (res_str)) s = time.time() gevent.joinall([ gevent.spawn(foo, "https://www.baidu.com/" ), gevent.spawn(foo, "https://translate.google.cn/" ), gevent.spawn(foo, "https://www.zhihu.com/" ) ]) print (time.time() - s)
io多路复用 select
、poll
、epoll
模型
socketserver server端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import socketserverimport subprocessclass myserver (socketserver.BaseRequestHandler): def handle (self ): print ("from conn: " ,self.request) while True : data = self.request.recv(1024 ) print (data.decode("utf-8" )) if data.decode("utf-8" ) == "q" : break response = subprocess.Popen(data.decode("utf-8" ), shell=True , stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout = response.stdout.read() stderr = response.stderr.read() res = stdout + stderr if type (res) is bytes : self.request.send(res) else : self.request.send(res.encode("utf-8" )) s = socketserver.ThreadingTCPServer(("127.0.0.1" ,8800 ),myserver) s.serve_forever()
client端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import socketimport platformsock = socket.socket() sock.connect(("127.0.0.1" ,8800 )) while True : data = input (">>>: " ) sock.send(data.encode("utf-8" )) response = sock.recv(1024 ) if platform.system() == "Windows" : print (response.decode("gbk" )) else : print (response.decode("utf-8" ))
定时任务 Timer 1 2 3 4 5 6 7 8 from threading import Timerdef do_timer (): print ('hello timer' ) timer = Timer(1 , do_timer) timer.start() do_timer()
schedule 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 from threading import Threadimport scheduleimport datetimeimport timedef job (): print (datetime.datetime.now()) def do_job (): Thread(target=job).start() def run (): schedule.every(10 ).minutes.do(do_job) while True : schedule.run_pending() time.sleep(1 ) run()
re正则 1 2 3 4 5 import rere_obj = re.compile ('(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3})\d{3}(Z)' ) start_time = re_obj.match (format_start_time).group(1 ) + re_obj.match (format_start_time).group(2 ) end_time = re_obj.match (format_end_time).group(1 ) + re_obj.match (format_end_time).group(2 )
flask_caching增加缓存 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from flask_caching import Cachecache = Cache(config={'CACHE_TYPE' : 'simple' }) cache.init_app(app) ```` ```python import xmltodictwith open ('server.xml' ,'r' ) as f: f_xml = f.read() f_dict = xmltodict.parse(f_xml)
url编码解码 1 2 3 from urllib import parseparse.quote_plus('抱歉' ) parse.unquote_plus('%E6%8A%B1%E6%AD%89' )
高阶函数 map 1 2 list (map (str , [1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 ]))['1' , '2' , '3' , '4' , '5' , '6' , '7' , '8' , '9' ]
add-apt-repository apt-get install python-software-properties apt-get install software-properties-common