python学习笔记 [toc]
列表推导式 1 2 3 number_list = [number for number in range (1 ,6 ) if number % 2 == 1 ] print (number_list)[1 ,3 ,5 ]
字典推导式 1 2 3 4 word = 'letters' letter_counts = {letter:word.count(letter) for letter in set (word)} print (letter_counts){'t' : 2 , 'l' : 1 , 'e' : 2 , 'r' : 1 , 's' : 1 }
集合推导式 1 2 3 a_set = {number for number in range (1 ,6 ) if number % 3 == 1 } print (a_set){1 , 4 }
res是一个用户密码认证的装饰器,index要运行的函数,用装饰器扩展功能后,运行index可以增加用户认证的功能。1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 import timeimport randomimport sysuser_dit={ 'user01' :'python01' , 'user02' :'python02' , 'user03' :'python03' , } login_status = { 'user' :None , 'login' :False , } with open ('auth_info.txt' ,'w' ,encoding='utf-8' ) as f: ) f.truncate() f.write(str (user_dit)) db_file = 'auth_info.txt' def res (app ): def auth (*args,**kwargs ): if login_status['user' ] and login_status['login' ] : user_name = login_status['user' ] app(user_name, **kwargs) else : user_name = input ('user name: ' ) user_passwd = input ('user passwd: ' ) with open (db_file, 'r' , encoding='utf-8' ) as f: f_out = eval ( if user_name in f_out and user_passwd == f_out[user_name]: print ('login seccess' ) login_status['user' ] = user_name login_status['login' ] = True app(user_name,**kwargs) else : print ('username or passwd error' ) sys.exit() return auth @res def index (user_name,**kwargs ): time.sleep(random.randrange(1 ,5 )) print ('welecome to %s index page!' %user_name) index() index()
迭代器 1 2 res = [ num for num in range (6 ) ] print (res)
tail -f filename | grep word
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import timedef tail (file_name ): with open (file_name,encoding='utf-8' ) as f: ,2 ) while True : line = f.readline() if line: yield line else : time.sleep(0.5 ) def grep (lines,word ): for i in lines: if word in i: print (i) g = tail('a.txt' ) grep(g,'error' )
生成器表达式 1 2 3 res = (number for number in range (1 ,6 ) if number % 2 == 1 ) for i in res: print (i)
表达式形式的yield 1 2 3 4 5 6 7 8 9 10 11 12 13 14 def init (func ): def res (*args,**kwargs ): g = func(*args,**kwargs) next (g) return g return res init def foo (): while True : x = yield print (x) g = foo()
1 2 3 4 5 6 7 8 9 def max (x,y ): return x if x > y else y print (max (1 ,2 ))
匿名函数lambda 1 2 res = lambda x,y:x*y print (res(2 ,3 ))
递归调用 1 2 3 4 5 6 def age (n ): if n == 5 : return 18 return age(n+1 )+2 print (age(1 ))
爬虫工具 selenium
pip install selenium
1 2 3 4 5 6 import seleniumfrom selenium import webdriverdriver = webdriver.Chrome() driver.get('' ) print (driver.page_source)
phantomjs下载页 ,不需要打开浏览器,静默爬虫。
1 2 3 4 5 6 import seleniumfrom selenium import webdriverdriver = webdriver.PhantomJS() driver.get('' ) print (driver.page_source)
pip install pyquery
1 2 3 4 from pyquery import PyQuery as pqdoc = pq('<html>Hello World!</html>' ) print (doc('html' ).text())
命令行传参 1 2 3 import sysprint ('Program agruments: ' ,sys.argv)
正则re 查找findall
1 2 3 import rer = re.compile ("a\d" ) res = r.findall("a1,ab" )
分组( ) 1 2 3 4 import rer = re.compile ("(?P<year>20[0|1]\d)-(?P<month>\d{1,2})" ) res ="2018-03" ) print ("year" ))
爬虫豆瓣电影排名 1 2 3 4 5 6 7 8 9 10 11 12 13 14 import requests,redef getpage (): res = requests.get("" ) return res.text def run (): ret = getpage() r = re.compile ('<div class="item">.*?<em.*?>(\d+)</em>?.*?<div class="info">.*?<span class="title">(.*?)</span>?.*?<div class="star">.*?<span>(\d+)人评价</span>' ,re.S) res = r.findall(ret) for item in res: print (item) run()
模块(ini文件)1 2 3 4 5 6 7 8 9 import configparsercfg = configparser.ConfigParser() cfg["DEFAULT" ] = {"Access" :"ReadWrite" ,"Connect" :"DSN=AdvWorks" } cfg["USER" ] = {"Name" :"User" } with open ("cfg.ini" ,"w" ) as f: cfg.write(f)
1 2 3 4 5 6 7 import configparsercfg = configparser.ConfigParser()"cfg.ini" ) print (cfg["DEFAULT" ]["Access" ])print (cfg.items("USER" ))
模块(子进程执行linux命令)1 2 3 4 5 import subprocesss = subprocess.Popen("dir" ,shell=True ) s.wait() print ("ending" )
1 2 3 4 import subprocesss = subprocess.Popen("dir" ,shell=True ,stdout=subprocess.PIPE) print ("gbk" ))
方法 1 2 3 4 5 6 7 8 9 10 11 12 class Chinese : country = "china" def __init__ (self,name,age ): = name self.age = age def work (self ): print ("work" ) p1 = Chinese("user01" ,"22" ) p2 = Chinese("user02" ,"23" )
pyhton代码规范检查 pycodestyle检查代码是否符合pep 8规范
python官方提供了检查pep8代码规范的命令行工具pycodestyle,改工具可以检查python代码是否违反pep 8规范,并对违反的地方给出提示。
1 2 3 4 5 6 7 pip install pycodestyle pycodestyle --first pycodestyle --show-source --show-pep8
1 2 3 4 pip install autopep8 autopep8 --in -place
读取标准输出 cat
1 2 3 4 import fileinputfor line in fileinput.input (): print (line, end="" )
1 python /etc/passwd /etc/hosts
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 class people : def __init__ (self,name,age ): = name self.age = age def walk (self ): print ('%s is walking' class teacher (people ): school = "abc" def __init__ (self,name,age,sex ): people.__init__(self,name,age) = sex def teach (self ): print ('%s is teaching' class student (people ): def __init__ (self,name,age,level ): people.__init__(self,name,age) self.level = level def study (self ): print ('%s is studying' t = teacher('egon' ,'18' ,'man' ) print (,t.age,,
super 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 class people : def __init__ (self,name,age ): = name self.age = age def walk (self ): print ('%s is walking' class teacher (people ): school = "abc" def __init__ (self,name,age,sex ): super ().__init__(name,age) = sex def teach (self ): print ('%s is teaching' t = teacher('egon' ,'18' ,'man' ) print (,t.age,,
类的组合 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class date (): def __init__ (self,year,mon,day ): self.year = year self.mon = mon = day def tell_date (self ): print ('今天的日期是:%s年-%s月-%s日' %(self.year,self.mon, class teacher : def __init__ (self,name,age,year,mon,day ): = name self.age = age = date(year,mon,day) def teach (self ): print ('{} is teaching' .format ( t = teacher('user01' ,18 ,2018 ,22 ,12 ) print (,t.age,,,
归一化接口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import abcclass Animal (metaclass=abc.ABCMeta): @abc.abstractmethod def talk (self ): pass class people (Animal ): def talk (self ): print ('say hello' ) class dog (Animal ): def talk (self ): print ('wang wang wang' ) d = dog()
绑定方法与非绑定方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import abcimport hashlibimport timeclass Animal (metaclass=abc.ABCMeta): @abc.abstractmethod def talk (self ): pass class people (Animal ): def talk (self ): print ('say hello' ) @classmethod def func (cls ): print ('say func' ) @staticmethod def sum (x,y ): print (x+y) p = people() people.func() people.sum (1 ,2 )
反射 hasattr
设置对象某个属性的值1 2 3 4 5 6 7 8 9 class foo : say = "hello" def __init__ (self,name,age ): = name self.age = age print (hasattr (foo,"say" ))setattr (foo,"say" ,"hello world" )print (getattr (foo,"say" ))
面向对象高级用法 __str__
可以替换内存地址信息为别的信息1 2 3 4 5 6 7 8 9 10 class foo : def __init__ (self,name,age ): = name self.age = age def __str__ (self ): return "{}:{}" .format (,self.age) f = foo("user" ,18 ) print (f)
程序执行完后的操作,执行清理操作1 2 3 4 5 6 7 8 9 class foo : def __init__ (self,name,age ): = name self.age = age def __del__ (self ): print ("程序运行完成" ) f = foo("user" ,18 )
类使用[ ]
取值 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class foo : def __init__ (self,name,age ): = name self.age = age def __getitem__ (self, item ): return getattr (self,item) def __setitem__ (self, key, value ): setattr (self,key,value) def __delitem__ (self, key ): delattr (self,key) f = foo("user" ,18 ) print (f.__dict__)f["name" ] = "user01" print (f.__dict__)del f["name" ]print (f.__dict__)
socket编程 tcp socket模拟ssh server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import socketimport subprocesssock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) sock.bind(("" ,9999 )) sock.listen(1 ) while True : conn,addr = sock.accept() print ("got a new customer" ,conn,addr) while True : data = conn.recv(1024 ) print ("received" ,data) if not data: print ("conn %s is lost" % str (addr) ) conn.close() break cmd = subprocess.Popen(data.decode("utf-8" ), shell=True , stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout = stderr = cmd_res = stdout + stderr if not cmd_res: cmd_res = b'cmd has not output' conn.sendall(cmd_res) sock.close()
1 2 3 4 5 6 7 8 9 10 11 12 13 import socketsock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) sock.connect(('' ,9999 )) while True : msg = input (">>>:" ).strip() if not msg : continue sock.send(msg.encode("utf-8" )) response = sock.recv(1024 ) print ("received:" ,response.decode("gbk" )) sock.close()
udp socket server
1 2 3 4 5 6 7 8 9 10 11 import socketip_port = ('' ,9999 ) udp_server_client = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) udp_server_client.bind(ip_port) while True : msg,addr = udp_server_client.recvfrom(1024 ) print (msg,addr) udp_server_client.sendto(msg,addr)
1 2 3 4 5 6 7 8 9 10 11 12 13 import socketip_port = ('' ,9999 ) udp_server_client = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) while True : msg = input ('>>: ' ).strip() if not msg:continue udp_server_client.sendto(msg.encode('utf-8' ),ip_port) bank_msg,addr = udp_server_client.recvfrom(1024 ) print (bank_msg.decode('utf-8' ),addr)
threading线程 event
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import threadingimport timeevent = threading.Event() def foo (): print ("wait..." ) event.wait(10 ) print ("connect to redis server" ) def bar (): while not event.is_set(): print ("waiting for even..." ) event.wait(1 ) for i in range (5 ): t1 = threading.Thread(target=foo,args=()) t1.start() t2 = threading.Thread(target=bar,args=()) t2.start() print ("attempt to start redis server" )time.sleep(8 ) print ("set..." )event.set ()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import queueimport threadingimport timedef foo (): q.put(3.14 ) q.put("hello" ) q.put("yes" ) def bar (): print (">>>start..." ) while not q.empty(): print (q.get()) q.task_done() q = queue.Queue(10 ) t1 = threading.Thread(target=foo,args=()) t2 = threading.Thread(target=bar,args=()) t1.start() t2.start()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import queueimport threadingimport timedef foo (): q.put([1 ,3.14 ]) q.put([3 ,"hello" ]) q.put([2 ,"yes" ]) def bar (): print (">>>start..." ) while not q.empty(): print (q.get()【】) q.task_done() q = queue.PriorityQueue(10 ) t1 = threading.Thread(target=foo,args=()) t2 = threading.Thread(target=bar,args=()) t1.start() t2.start()
进程 multiprocessing multiprocessing 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import multiprocessingimport timedef foo (n ): res = 1 for i in range (1 ,n): res += i print (res) def bar (n ): res = 1 for i in range (1 ,n): res *= i print (res) s = time.time() if __name__ == '__main__' : m1 = multiprocessing.Process(target=foo,args=(100000000 ,)) m2 = multiprocessing.Process(target=bar,args=(100000 ,)) m1.start() m2.start() m1.join() m2.join() print (time.time() - s)
进程queue通信 1 2 3 4 5 6 7 8 9 10 import multiprocessingdef foo (q ): q.put([11 ,"hello" ,True ]) if __name__ == '__main__' : q = multiprocessing.Queue() m = multiprocessing.Process(target=foo,args=(q,)) m.start() print (q.get())
管道pipe通信 1 2 3 4 5 6 7 8 9 10 11 12 13 from multiprocessing import Pipe,Processdef foo (sock ): sock.send("hello world!" ) print (sock.recv()) sock,conn = Pipe() if __name__ == '__main__' : m = Process(target=foo, args=(sock,)) m.start() print (conn.recv()) conn.send("hi boy!" )
manage进程通信(最优方法) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from multiprocessing import Manager,Processdef foo (l,i ): l.append(i*i) if __name__ == '__main__' : manage = Manager() mlist = manage.list ([11 ,22 ,33 ]) for i in range (5 ): p = Process(target=foo, args=(mlist,i)) p.start() p.join() print (mlist)
使用进程池并发pool 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from multiprocessing import Poolimport timedef foo (n ): print (n) time.sleep(2 ) if __name__ == '__main__' : p = Pool(5 ) for i in range (100 ): p.apply_async(func=foo,args=(i,)) p.close() p.join() print ("ending..." )
协程 yield 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import timedef consumer (): r = '' while True : n = yield r if not n: return print ("consumer: " ,n) time.sleep(3 ) r = "200 ok" def produce (c ): next (c) n = 0 while n < 5 : n = n + 1 print ("produce: " ,n) cr = c.send(n) print ("consumer return: " ,cr) c.close() if __name__ == '__main__' : c = consumer() produce(c)
greenlet 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from greenlet import greenletdef foo (): print ("ok1" ) g2.switch() print ("ok3" ) g2.switch() def bar (): print ("ok2" ) g1.switch() print ("ok4" ) g1 = greenlet(foo) g2 = greenlet(bar) g1.switch()
geven 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import requestsimport geventimport timedef foo (url ): res = requests.get(url) res_str = res.text print ("len: " ,len (res_str)) s = time.time() gevent.joinall([ gevent.spawn(foo, "" ), gevent.spawn(foo, "" ), gevent.spawn(foo, "" ) ]) print (time.time() - s)
io多路复用 select
socketserver server端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import socketserverimport subprocessclass myserver (socketserver.BaseRequestHandler): def handle (self ): print ("from conn: " ,self.request) while True : data = self.request.recv(1024 ) print (data.decode("utf-8" )) if data.decode("utf-8" ) == "q" : break response = subprocess.Popen(data.decode("utf-8" ), shell=True , stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout = stderr = res = stdout + stderr if type (res) is bytes : self.request.send(res) else : self.request.send(res.encode("utf-8" )) s = socketserver.ThreadingTCPServer(("" ,8800 ),myserver) s.serve_forever()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import socketimport platformsock = socket.socket() sock.connect(("" ,8800 )) while True : data = input (">>>: " ) sock.send(data.encode("utf-8" )) response = sock.recv(1024 ) if platform.system() == "Windows" : print (response.decode("gbk" )) else : print (response.decode("utf-8" ))
定时任务 Timer 1 2 3 4 5 6 7 8 from threading import Timerdef do_timer (): print ('hello timer' ) timer = Timer(1 , do_timer) timer.start() do_timer()
schedule 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 from threading import Threadimport scheduleimport datetimeimport timedef job (): print ( def do_job (): Thread(target=job).start() def run (): schedule.every(10 ) while True : schedule.run_pending() time.sleep(1 ) run()
re正则 1 2 3 4 5 import rere_obj = re.compile ('(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3})\d{3}(Z)' ) start_time = re_obj.match (format_start_time).group(1 ) + re_obj.match (format_start_time).group(2 ) end_time = re_obj.match (format_end_time).group(1 ) + re_obj.match (format_end_time).group(2 )
flask_caching增加缓存 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from flask_caching import Cachecache = Cache(config={'CACHE_TYPE' : 'simple' }) cache.init_app(app) ```` ```python import xmltodictwith open ('server.xml' ,'r' ) as f: f_xml = f_dict = xmltodict.parse(f_xml)
url编码解码 1 2 3 from urllib import parseparse.quote_plus('抱歉' ) parse.unquote_plus('%E6%8A%B1%E6%AD%89' )
高阶函数 map 1 2 list (map (str , [1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 ]))['1' , '2' , '3' , '4' , '5' , '6' , '7' , '8' , '9' ]
add-apt-repository apt-get install python-software-properties apt-get install software-properties-common