博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
python大佬养成计划----线程与多线程
阅读量:6222 次
发布时间:2019-06-21

本文共 8822 字,大约阅读时间需要 29 分钟。

创建线程

一个进程必有一个线程,进程也可由多个线程组成,但有一个线程为主线程。

若一个任务需要花10Mins,当只有一个线程时,花费10Mins,当有十个线程时,可能就花费1Mins,所以多线程可以提升任务执行时间,提高工作效率。
python里与线程有关的模块:

  • _thread 底层
  • threading

查看当前运行的线程个数:threading.current_thread()

查看当前线程信息:threading.active_count()

import _threadimport threadingdef job():    print("当前线程个数:",threading.active_count())    print("当前线程信息",threading.current_thread())if __name__=='__main__':    job()

图片描述

_thread创建多线程

调用thread模块中的start_new_thread()函数来产生新线程。

thread.start_new_thread(function,args = ())

#_thread创建多线程import _threadimport timedef job(name):    print("name:%s,time:%s" %(name,time.ctime()))if __name__=="__main__":    # 创建多个线程, 但是没有开始执行任务    _thread.start_new_thread(job,('thread1',))    _thread.start_new_thread(job,('thread2',))    while True: #盲等待        pass

图片描述

threading通过实例化Thread类创建多线程

_thread模块提供了低级别的、原始的线程以及一个简单的锁。

threading模块是对_thread再封装,对使用者更友好
通过实例化Thread对象创建线程,Thread的方法有:

  • run() #Method representing the thread's activity.
  • start() #Start the thread's activity.
  • join() #Wait until the thread terminates.
  • is_alive() #Return whether the thread is alive.
import threadingdef job(name):    print("当前执行的任务名:",name)    print("当前线程个数:",threading.active_count())    print("当前线程信息:",threading.current_thread())if __name__=="__main__":    t1 = threading.Thread(target=job,name='thread1',args=('job1',))    t2 = threading.Thread(target=job,name='thread2',args=('job2',))    t1.start()  #Start the thread's activity.    t2.start()

图片描述

使用多线程与不使用多线程的对比

不使用多线程执行任务,程序会一直等待sleep时间过去,在执行下一条命令。

#不使用多线程import timedef music(name):    for i in range(2):        print("i am listening :",name)        time.sleep(2)def read(book):    for i in range(2):        print("i am reading :",book)        time.sleep(1)if __name__ == '__main__':    start_time = time.time()    music("空空如也")    read('面纱')    print("花费时间: %s" %(time.time()-start_time))

图片描述

使用多线程执行任务,在遇到某一线程需要等待时,会执行其他线程

Thread.join()会等待当前线程执行结束,再执行主线程。

import threadingimport timedef music(name):    for i in range(2):        print("i am listening :",name)        time.sleep(2)def read(book):    for i in range(2):        print("i am reading :",book)        time.sleep(1)if __name__=="__main__":    start_time = time.time()    t1 = threading.Thread(target=music,args=('空空如也',))    t2 = threading.Thread(target=read,args=('面纱',))    t1.start()    t2.start()    t1.join()   #等待线程执行结束,才执行主程序,防止主线程阻塞子线程    t2.join()    end_time = time.time()    print("任务执行时间:",end_time-start_time)

图片描述

守护线程setDeamon

当申明一个子线程为守护线程时,主线程结束时,子线程也结束。

申明守护线程需要在开启线程之前。

import threadingimport timedef music(name):    for i in range(2):        print("listening music :",name)        time.sleep(4)def code(pro):    for i in range(2):        print('i am coding :',pro)        time.sleep(5)if __name__=='__main__':    st_time = time.time()    t1 = threading.Thread(target=music,args=('hello',))    t2 = threading.Thread(target=code,args=('mydiff',))    #将线程申明为守护线程,如果设置为True,当主线程结束,子线程也结束    #必须在启动线程之前进行设置    t1.setDaemon(True)    t2.setDaemon(True)  #主线程执行结束之后,子线程还没来得及执行结束,整个程序就退出了    t1.start()    t2.start()    end_time = time.time()    print('运行时间:',end_time-st_time)

图片描述

线程同步

如果多个线程共同对某个数据修改,则可能出现不可预料的结果,为了保证数据的正确性,需要对多个线程进行同步。

使用Thread对象的Lock和Rlock可以实现简单的线程同步,这两个对象都有acquire方法和release方法,对于那些需要每次只允许一个线程操作的数据,可以将其操作放到acquire和release方法之间。

import threadingdef add(lock):    #操作变量之前加锁    lock.acquire()    global money    for i in range(1389993):        money+=1    #变量操作完成之后,解锁    lock.release()def reduce(lock):    #操作变量之前加锁    lock.acquire()    global money    for i in range(4728937):        money-=1    #变量操作完成之后,解锁    lock.release()if __name__=="__main__":    money = 0    lock = threading.Lock() #示例化一个锁对象    t1 = threading.Thread(target=add,args=(lock,))    t2 = threading.Thread(target=reduce,args=(lock,))    t1.start()    t2.start()    t1.join()    t2.join()    print('最终金额为:',money)

图片描述

GIL全局解释器锁

Python 代码的执行由 Python 虚拟机(也叫解释器主循环)来控制。Python 在设计之初就考虑到要在主循环中,同时只有一个线程在执行,就像单 CPU 的系统中运行多个进程那样,内存中可以存放多个程序,但任意时刻,只有一个程序在 CPU 中运行。同样地,虽然 Python 解释器中可以“运行”,多个线程,但在任意时刻,只有一个线程在解释器中运行。

对 Python 虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同一时刻只有一个线程在运行。

执行过程:

1). 设置GIL2). 切换到线程去运行对应的任务;3). 运行    - 执行完了    - time.sleep()    - 获取其他信息才能继续执行, eg: 从网络上获取网页信息等;3. 把线程设置为睡眠状态4. 解锁GIL5.再次重复执行上述内容;

生产者消费者模型

在工作中,某些模块生成一些数据,由另一些模块负责处理。产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。在生产者与消费者之间在加个缓冲区,我们形象的称之为仓库,生产者负责往仓库了进商品,而消费者负责从仓库里拿商品,这就构成了生产者消费者模式。

这里,我们用生产者消费者模型来实现多线程的网址访问,节省时间。

#多线程实现生产者消费者模型#实现不同的网址或ip访问import threadingfrom urllib.request import urlopendef create_data():    with open('ips.txt','w') as f:        f.write("www.baidu.com\n")        f.write("www.163.com\n")        for i in range(100):            f.write('172.25.254.%s\n' %(i+1))def creat_url(filename='ips.txt'):    ports=[80,443]    with open(filename) as f:        ips = [url_info.strip() for url_info in f.readlines()]    urls = ['http://%s:%s' %(ip,port) for ip in ips for port in ports]    return urlsdef job(url):    try:        urlObj = urlopen(url)    except Exception as e :        print('Warnning!!!    %s不可访问' %(url))    else:        print("%s可以访问" %(url))if __name__=="__main__":    urls = creat_url()    threads = []    for url in urls:        t = threading.Thread(target=job,args=(url,))        threads.append(t)        t.start()    [thread.join() for thread in threads]    print("任务执行结束")

图片描述

再封装threading.Thread类

无参版

对threading.Thread类的再封装,执行时无需传递参数

from threading import Threadclass IpThread(Thread):    def __init__(self):        super(IpThread, self).__init__()# 将多线程需要执行的任务重写到run方法中;    def run(self):        print("this is a JOB")        print(type(self))t = IpThread()t.start()

图片描述

含参版

实现访问Ip地址

import jsonfrom threading import Threadfrom urllib.request import urlopenclass IpThread(Thread):    #重写构造方法,如果执行的任务需要传递参数,那将参数与self绑定    def __init__(self,jobname,ip):        super(IpThread, self).__init__()        self.jobname = jobname        self.ip = ip    #将多线程需要执行的任务重写到run方法中    def run(self):        print('this is a %s job' %(self.jobname))        #需要有一个参数,传递ip        url = "http://ip.taobao.com/service/getIpInfo.php?ip=%s" % (self.ip)        try :            # 根据url获取网页的内容, 并且解码为utf-8格式, 识别中文;            result = urlopen(url).read().decode('utf-8')        except Exception as e:            print("访问%s失败" %(self.ip))        else:             # 将获取的字符串类型转换为字典, 方便处理            d = json.loads(result)['data']            country = d['country']            city = d['city']        print("%s位于%s,城市为%s" %(self.ip,country,city))if __name__=="__main__":    ips = ['172.25.254.22','8.8.8.8','89.31.136.0']    threads = []    for ip in ips :        t = IpThread(jobname='Clawer',ip=ip)        threads.append(t)        t.start()    [thread.join() for thread in threads]    print("程序执行结束")

图片描述

线程池

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。

from concurrent.futures import ThreadPoolExecutorimport time#需要执行的任务def job():    print("morning sheen")    return 'new day'if __name__=='__main__':    #示例化对象,线程池里最多有10个线程    pool = ThreadPoolExecutor(max_workers=10)    #往线程池里扔需要执行的任务,返回一个对象 _base.Future()示例化出来的    f1 = pool.submit(job)    f2 = pool.submit(job)    #判断任务是否执行结束    print(f1.done())    time.sleep(1)    print(f2.done())    #判断是否释放了线程    #获取执行任务的结果    print(f1.result())    print(f2.result())

图片描述

线程池循环执行任务

线程池执行任务方式

concurrent.futures.ThreadPoolExecutor,在提交任务的时候,有两种方式,一种是submit()函数,另一种是map()函数,两者的主要区别在于:

  • map可以保证输出的顺序, submit输出的顺序是乱的
  • 如果你要提交的任务的函数是一样的,就可以简化成map。但是假如提交的任务函数是不一样的,或者执行的过程之可能出现异常(使用map执行过程中发现问题会直接抛出错误)就要用到submit()
  • submit和map的参数是不同的,submit每次都需要提交一个目标函数和对应的参数,map只需要提交一次目标函数,目标函数的参数放在一个迭代器(列表,字典)里就可以。
from urllib.error import HTTPErrorfrom urllib.request import urlopenfrom concurrent.futures import ThreadPoolExecutor,as_completedimport timeURLS = ['http://httpbin.org', 'http://example.com/',        'https://api.github.com/']*3def get_page(url,timeout = 0.3):   #爬取网页信息    try:        content = urlopen(url).read()        return {'url':url, 'len':len(content)}    except HTTPError as e:        return {'url':url, 'len':0}# 方法1: submit提交任务start_time = time.time()pool = ThreadPoolExecutor(max_workers=20)#submit返回的是Future对象,对于Future对象可以简单地理解为一个在未来完成的操作futuresObj = [pool.submit(get_page, url) for url in URLS]# # 注意: 传递的是包含futures对象的序列, as_complete返回已经执行完任务的future对象,# # 直到所有的future对应的任务执行完成, 循环结束;for finish_fs in as_completed(futuresObj):    print(finish_fs.result() )#submit返回值Future的方法result(self, timeout=None)"""Return the result of the call that the future represents.Args:    timeout: The number of seconds to wait for the result if the future        isn't done. If None, then there is no limit on the wait time.Returns:    The result of the call that the future represents."""print("执行时间:%s" %(time.time()-start_time))# 方法2:通过map方式执行start2_time = time.time()pool2 = ThreadPoolExecutor(max_workers=20)for res in pool2.map(get_page, URLS):    print(res)print("执行时间:%s" %(time.time()-start2_time))

图片描述

转载地址:http://ggrja.baihongyu.com/

你可能感兴趣的文章
深入理解 hashcode 和 hash 算法
查看>>
java底层深入开发教程
查看>>
如何写javascript代码隐藏和显示这个div
查看>>
并发系列5-大白话聊聊Java并发面试问题之微服务注册中心的读写锁优化【石杉的架构笔记】...
查看>>
jQuery插件开发
查看>>
计算机图形学(OpenGL版)-第一个OpenGL程序
查看>>
Linux安装JDK和Tomcat
查看>>
NO.2: 尽量以const,enum,inline 替换 #define
查看>>
动态表单 - 加载与关闭
查看>>
gentoo
查看>>
TRI 解题报告
查看>>
ahjesus用forever管理nodejs服务
查看>>
步步为营:Asp.Net 淘宝通用应用接口攻略
查看>>
数组排序问题
查看>>
关于钱的存储数据类型
查看>>
transform函数
查看>>
MySQL服务器安装配置-非安装版、windows版
查看>>
批量往数据库导入数据遇到的问题总结
查看>>
一个小公司的前端笔试HTML CSS JS
查看>>
noip普及组2018T1 标题统计
查看>>