Python-多线程、多进程、协程

描述

基本使用

**基本概念

**

  • 进程
    • 几乎所有的操作系统都支持同时运行多个任务,一个任务通常就是一个程序,每个运行中的程序就是一个进程
    • 进程是处于运行过程中的程序,并且具有一定的独立功能
    • 进程是系统进行资源分配调度的一个独立单位
  • 线程
    • 线程(Thread)也叫 轻量级进程 ,是操作系统能够进行运算调度的最小单位
    • 它被包涵在进程之中,是进程中的实际运作单位,线程自己不拥有系统资源,只拥有一点儿在运行中必不可少的资源,但它可与同属一个进程的其它线程共享进程所拥有的全部资源
    • 进程中的多个线程之间可以并发执行

一个进程可以拥有多个线程,一个线程必须有一个父进程。线程可以拥有自己的堆栈、自己的程序计数器和自己的局部变量,但不拥有系统资源,它与父进程的其他线程共享该进程所拥有的全部资源。

多线程的优点

  • 进程之间不能共享内存,但线程之间共享内存非常容易
  • 操作系统在创建进程时,需要为该进程重新分配系统资源,但创建线程的代价则小得多。因此,使用多线程来实现多任务并发执行比使用多进程的效率高
  • Python 语言内置了多线程功能支持,而不是单纯地作为底层操作系统的调度方式,从而简化了 Python 的多线程编程

示例

**方式一: **使用 threading.Thread(target=方法名) 的方式实现多线程

参数说明:threading.Thread(参数说明)

  • target: 指定该线程要调度的目标方法。只传函数名,不传函数,即不加()
  • args: 指定一个元组,以位置参数的形式为target指定的函数传入参数。元组的第一个参数传给target的第一个,以此类推
  • kwargs: 指定一个字典,以关键字参数的形式为target指定的函数传入参数
  • daemon: 指定所构建的线程是否为后台线程
import time
import threading




def eat(num):
    for i in range(num):
        print("我正在吃饭......")
        time.sleep(1)




def drunk(num=10):
    for i in range(num):
        print("我正在喝水......")
        time.sleep(1)




if __name__ == '__main__':
    # 创建两个线程
    t1 = threading.Thread(target=eat, args=(10,))
    t2 = threading.Thread(target=drunk)
    # 启动两个线程
    t1.start()
    t2.start()


    while True:
        threadList = threading.enumerate()
        print("正在运行的线程是:", threadList)
        print("正在运行的线程数量是:", len(threadList))
        time.sleep(2)

方式二: 继承threading.Thread

import time
import threading




class MyThread(threading.Thread):


    def run(self):
        for i in range(10):
            print("线程启动了,我的名字叫:", self.name)
            time.sleep(1)




if __name__ == '__main__':
    # 创建两个线程
    t1 = MyThread()
    t2 = MyThread()
    # 启动两个线程, start() 方法内部会自动去调用 run方法,所以此处写 start() 就可以了
    t1.start()
    t2.start()


    while True:
        threadList = threading.enumerate()
        print("正在运行的线程是:", threadList)
        print("正在运行的线程数量是:", len(threadList))
        time.sleep(2)

全局变量、互斥锁、死锁

共享全局变量示例

import threading


g_num = 0


def fun_1(num):
    global g_num
    for i in range(num):
        g_num += 1
    print("------fun_1的g_num值:%d" % g_num)




def fun_2(num):
    global g_num
    for i in range(num):
        g_num += 1
    print("------fun_2的g_num值:%d" % g_num)




if __name__ == '__main__':
    t1 = threading.Thread(target=fun_1, args=(1000000,))
    t2 = threading.Thread(target=fun_2, args=(1000000,))
    t1.start()
    t2.start()

输出结果 :

进程

从以上结果可以看出,直接使用全局变量是有问题的,按理说,预期结果应该是2000000,实际结果相关很大,且每次执行结果都不一样

原因

  • 在g_num=0 时,t1取得g_num=0,此时系统把t1调度为 "sleeping" 状态,把t2转换为 "running" 状态,t2 这时也获得了 g_num=0
  • 然后 t2对得到的值进行加1,并赋给g_num,使得g_num=1
  • 然后系统又把 t2调度为"sleeing",把t1转为"running",线程t1又把它之前得到的0 加1后赋值给 g_num。
  • 这样就导致了 t1和t2都对 g_num加1,但结果仍然是 g_num=1

解决方案:互斥锁

  • 当多个线程几乎同时修改某个共享数据的时候,需要进行同步控制
  • 某个线程要更改共享数据时,先将其锁定,此时资源状态为**” 锁定 ,其他线程不能更改;直到该线程释放资源,将资源的状态改为 ”**非锁定 “, 其他的线程才能再次锁定该资源。互拆锁保证了 每次只有一个线程进行写入操作 ,从而保证了多线程数据的正确性
import threading


g_num = 0


def fun_1(num):
    global g_num
    for i in range(num):
        # 上锁,如果之前没上锁,此时调用就会上锁;如果之前上锁了,此时调用就会阻塞,直接锁被别人释放
        lock.acquire()
        g_num += 1
        # 释放锁
        lock.release()
    print("------fun_1的g_num值:%d" % g_num)




def fun_2(num):
    global g_num
    for i in range(num):
        # 上锁,如果之前没上锁,此时调用就会上锁;如果之前上锁了,此时调用就会阻塞,直接锁被别人释放
        lock.acquire()
        g_num += 1
        # 释放锁
        lock.release()
    print("------fun_2的g_num值:%d" % g_num)




if __name__ == '__main__':
    # 创建一个互斥锁,默认是没上锁的
    lock=threading.Lock()


    t1 = threading.Thread(target=fun_1, args=(1000000,))
    t2 = threading.Thread(target=fun_2, args=(1000000,))
    t1.start()
    t2.start()

死锁

  • 在线程共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁
  • 尽管死锁很少发生,但一旦发生就会造成程序停止响应
import threading
import time




def fun_1():
    # 1号锁,上锁
    lock1.acquire()
    print("fun_1..do some thing.........1")
    time.sleep(1)


    # 2号锁,上锁,如果被别人占用了,则会阻塞
    lock2.acquire()
    print("fun_1..do some thing..........2")
    # 释放2号锁
    lock2.release()


    # 释放1号锁
    lock1.release()




def fun_2():
    # 2号锁,上锁
    lock2.acquire()
    print("fun_2..do some thing.........2")
    time.sleep(1)


    # 1号锁,上锁,如果被别人占用了,则会阻塞
    lock1.acquire()
    print("fun_2..do some thing..........1")
    # 释放1号锁
    lock1.release()


    # 释放2号锁
    lock2.release()


if __name__ == '__main__':
    # 创建两个互斥锁
    lock1=threading.Lock()
    lock2=threading.Lock()


    t1 = threading.Thread(target=fun_1)
    t2 = threading.Thread(target=fun_2)
    t1.start()
    t2.start()

输出结果:会一直停止阻塞

进程

死锁解决方案

  • 程序设计时避免

  • **添加超时等待

    **

进程

进程的状态

  • 新建:操作系统调度启动一个新进程
  • 就绪态:运行的条件都已经满足,等待cpu执行
  • 执行态:cpu 正在执行
  • 等待态:等待某些条件满足,例如一个程序sleep了,此时就处于等待态

进程

进程的创建

  • 在python中,提供了 multiprocessing 模块,就是跨平台的多进程模块,模块提供了 Process类来代表一人进程对象,这个对象可以理解为一个独立的进程
  • Process 参数说明
    • target:传递函数的引用,子进程将执行这个函数
    • args:给target指定的函数,传递参数,以元组的方式传递,非必填
    • kwargs:给target指定的函数传递命名参数,非必填
    • name:给进程设定一个名称,非必填
    • group:指定进程组,非必填
  • Process 对象的常用方法
    • start():启动子进程
    • is_alive():判断子进程是否还活着
    • join(timeout):是否等待子进程执行结束,或等待多少秒
    • terminate():不管任务是否完成,立即终止子进程
  • Process 对象的常用属性
    • name:当前进程的别名,默认为Process-x, x为从1开始递增的整数
    • pid:当前进程的pid (进程号)

示例

import time
import os
import multiprocessing




def eat(num):
    for i in range(num):
        print("我正在吃饭......,我的进程号是:%d,父进程的进程号是:%d" % (os.getpid(),os.getppid()))
        time.sleep(1)




def drunk(num):
    for i in range(num):
        print("我正在喝水......我的进程号是:%d,父进程的进程号是:%d" % (os.getpid(),os.getppid()))
        time.sleep(1)




if __name__ == '__main__':
    # 创建两个进程
    p1 = multiprocessing.Process(target=eat, args=(10,))
    p2 = multiprocessing.Process(target=drunk, args=(10,))
    # 启动两个进程
    p1.start()
    p2.start()


    print("主进程的进程号是:%d" %os.getpid())

输出结果

进程

进程线程区别

  • 功能区别
    • 进程:能够完成多任务,比如,在一台电脑上运行多个qq
    • 线程:能够完成多任务,比如,在一个qq中开多个聊天窗口
  • 调度
    • 进程是资源分配的基本单位
    • 线程是cpu调度和执行的最小单位
  • 拥有资源
    • 进程拥有资源的一个独立单位
    • 线程不拥有系统资源,但可以访问隶属于进程的资源
  • 稳定性
  • 进程有独立的地址空间,多进程较稳定,因为其中一个出现状况不影 响另外一个

  • 同一个进程的多个线程,共用地址空间,多线程相比于多进程,稳定性要差,因为一个线程出现问题会严重影响其他线程

  • 依赖关系
    • 一个线程只能属性一个进程
    • 一个进程至少有一个线程
  • 针对全局变量的共享
    • 多进程间不共享全局变量(进程间都是独立的)
    • 多线程间共享全局变量

进程间的通信

  • 不同的进程间有时也需要进行数据传递,在Python中,可以使用 muitiprocessiong模块的 Queue ,来实现进程间的数据传递
  • Queue 是一个消息队列,数据是先进先出的原则

示例

import time
import multiprocessing




def set_data(queue):
    numList=[1,2,3,4]
    for i in numList:
        # 给队列中放数据,如果队列已经满了,则会阻塞,直到能放数据为止
        queue.put(i)


    time.sleep(1)




def get_data(queue):
    while True:
        # 判断队列中如果没有数据了,则退出循环
        if queue.empty():
            break


        # 从队列中取数据
        data=queue.get()
        print("从队列中取出的数据是:",data)


if __name__ == '__main__':
    # 创建一个消息列表,容量是3(表示只能装3个数据)
    queue=multiprocessing.Queue(3)


    # 创建两个进程
    p1 = multiprocessing.Process(target=set_data, args=(queue,))
    p2 = multiprocessing.Process(target=get_data, args=(queue,))
    # 启动两个进程
    p1.start()
    p2.start()

**输出结果 **

进程

进程池

  • 需要创建成百上千个进程时,可以使用 muitiprocessing模块提供的 Pool 方法
  • 初始化Pool时 ,可以指定一个最大进程数,当有新的请求提交到Pool中时,如果池里面没有满,就会创建一个新的进程来执行该请求,如果已经满了,那么就会等待,直到池中有空闲进程,会调用空闲进程来执行新任务

示例

import time
import os
import multiprocessing


def work(msg):
    print("开始执行工作,当前进程是:",os.getpid())
    time.sleep(2)
    print("接收到的消息数据是:%s"%msg)




if __name__ == '__main__':
    # 创建进程池,容量为3
    pool=multiprocessing.Pool(3)


    for i in range(10):
        # pool.apply_async(要调用的目标,(传递给目标的参数元组,))
        # 每次循环将会用空闲的子进程去执行任务
        pool.apply_async(work,("传递参数:%d"%i,))


    # 关闭进程池,关闭后进程池不再接收新请求
    pool.close()
    # 等待进程池中所有的子进程执行完成 ,必须放在close 语句之后
    pool.join()

输出结果

进程

协程

迭代器

  • 迭代是访问集合元素的一种方式
  • 迭代器是一个可以记住遍历位置的对象。
  • 迭代器对象从集合的第一个元素开始访问,直到所有的元素都访问完成
  • 迭代器只会前进,不会后退
  • 我们把对 list、set、str、tuple 等类型的数据使用 for...in ... 的方式从中获取数据,这样的过程称为 遍历(循环)也叫迭代

判断一个数据类型是否可迭代,使用 isinstance(xxx,Iterable)

from collections.abc import Iterable


list1=[1,2]
str1="123"
tuple1=(1,2)
dict1={1:"a",2:"b"}
num=122


print(isinstance(list1,Iterable))
print(isinstance(str1,Iterable))
print(isinstance(tuple1,Iterable))
print(isinstance(dict1,Iterable))
print(isinstance(num,Iterable))

输出结果

进程

自已实现迭代器示例

class MyIterator:
    """自己实现一个迭代器"""


    def __init__(self):
        self.name = list()
        self.currentIndex = 0


    def add(self, arg):
        self.name.append(arg)


    def __iter__(self):
        # 如果想要一个对象成为可以被迭代的对象,即可以使用 for ... in ... ,那么必须实现 __iter__ 方法
        return self


    def __len__(self):
        return len(self.name)


    def __next__(self):
        # 当使用for...in... 迭代时,会先调用 __iter__ 方法,然后调用其返回对象中的 __next__ 方法(即本方法)
        if self.currentIndex < len(self.name):
            result = self.name[self.currentIndex]
            self.currentIndex += 1
            return result
        else:
            # 抛出一个 停止迭代的异常
            raise StopIteration




myIter = MyIterator()
myIter.add("张三")
myIter.add("李四")
myIter.add("王五")


for i in myIter:
    print(i)


# 获取集合长度
print(len(myIter))

**生成器

**

  • 生成器是一种特殊的迭代器
  • 创建生成器有两种方式
    • 方式一:把一个列表生成式的[] 改成 ()
    • # 原始列表
      l=[x*2 for x in range(10)]
      # 构建生成器
      g=(x*2 for x in range(10))
      
      
      # 迭代创建的生成器
      for i in g:
          print(i)
      
  • 方式二:使用 yield 关键字
  • def create_age(num):
        currentAge=0
        while currentAge# 创建生成器中的 值,并打印 send 过来的参数值
            sendMsg=yield currentAge
            print("%s 的年龄是:%d"%(sendMsg,currentAge))
            currentAge+=1
    # 创建生成器,并初始化10个值
    obj=create_age(10)
    
    
    # 迭代一次 生成器中的值
    n=next(obj)
    ss=obj.send("name"+str(0))
    print("send的结果:",ss)
    
    
    # 迭代后续,生成器中的值
    for i in obj:
        print(i)
    

协程

  • 实现方式一:采用yield 实现
import time


def eat():
    while True:
        print("我在吃饭.....")
        time.sleep(1)
        yield




def drunk():
    while True:
        print("我在喝水.....")
        time.sleep(1)
        yield


if __name__ == '__main__':
    # 创建两个生成器
    eat=eat()
    drunk=drunk()


    while True:
        next(eat)
        next(drunk)
  • 实现方式二:采用 greenlet 实现
    • 先安装 greenlet 模块:pip install greenlet
import time
from greenlet import greenlet




def eat():
    while True:
        print("我在吃饭.....")
        # 切换到g2中运行
        g2.switch()
        time.sleep(1)




def drunk():
    while True:
        print("我在喝水.....")
        # 切换到g1中运行
        g1.switch()
        time.sleep(1)




if __name__ == '__main__':
    # 创建两个生成器
    g1 = greenlet(eat)
    g2 = greenlet(drunk)


    # 切换到g1中运行
    g1.switch()
  • 实现方式三:采用 gevent实现 推荐使用
    • 由于 greenlet 需要人手动切换,比较占用IO资源,并且会出现,一旦中间某个程序处于线程等待的话,会一直等待很长时间的问题。所以gevent就应运而生,gevent 遇到延时阻塞会自动切换
    • 先安装 gevent模块:pip install gevent
import time
import gevent
from gevent import monkey


# 有耗时操作时需要, 将程序中用到的耗时操作的代码,换为gevent中自己实现的模块
monkey.patch_all()




def work(num):
    for i in range(num):
        print(gevent.getcurrent(), i)
        # 使用 mokey.patch_all() 之后,程序会自动替换成 gevent里面的 gevent.sleep() 方法
        time.sleep(1)




if __name__ == '__main__':
    # 创建并启动协程
    gevent.joinall({
        gevent.spawn(work, 10),
        gevent.spawn(work, 10)
    })

图片下载器实现

import gevent
from gevent import monkey
from urllib import request


# 有耗时操作时需要, 将程序中用到的耗时操作的代码,换为gevent中自己实现的模块
monkey.patch_all()


def down_pic(filename,url):
    resp=request.urlopen(url)
    data=resp.read()


    # 写入文件
    with open(filename,"wb") as f:
        f.write(data)




if __name__ == '__main__':
    # 创建并启动协程
    gevent.joinall({
        gevent.spawn(down_pic, "1.jpg","https://himg3.qunarzz.com/imgs/201812/14/C._M0DCiiigrWCy4LQi1024.jpg"),
        gevent.spawn(down_pic, "2.jpg","https://source.qunarzz.com/site/images/zhuanti/huodong/shangwu.jpg")
    })

总结

  • 进程是资源分配的单位
  • 线程是操作系统调度的单位
  • 进程切换需要的资源最大,效率很低
  • 线程切换需要的资源一般,效率一般(不考虑GIL的情况下)
  • 协程切换任务资源很小,效率高
  • 多进程、多线程根据cpu核数不一样可能是并行的,但是协程是在一个线程中,所以是并发
打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分