Code前端首页关于Code前端联系我们

多进程编程与进程间通信及数据传输Python代码实现

terry 2年前 (2023-09-25) 阅读数 46 #后端开发

多进程编程与进程间通信

  • 含义:充分利用计算机资源,提高程序的计算速度♺❀:使用计算机通过应用程序多核达到同时执行多个任务的目标,从而提高计算机运行速度
  • 执行计划:多进程、多线程
  • 并行:计算机同时处理多个任务
  • 并发:同时处理多个任务,内核不断地在任务之间切换,达到它们仍在处理和运行的效果,但实际上内核一次只能处理其中一个任务

优点和多进程的缺点

  • 优点
    • 可以使用计算机有多个核心同时执行任务并提高执行效率,空间独立性♶提高安全性运行不受其他进程影响,易于创建
  • 缺点
    • 删除和创建进程会消耗大量系统资源

进程(进程)

查看进程树:pstree 显示主要流程: ps -ajx
logo
  • ❓❓❓❓❓❓❓❓❓S等待
  • 可中断等待 D 等待 不间断等待 站立状态RR❙
    释放状态 包括就绪
    Z僵尸进程
    动态。 顶部,使用到页面
    • 值范围:-20 -- 19 -20为最高优先级❓♶使用正在运行的程序❓♺ 不错:设置运行优先级,例如:好-9./while.py --->>以优先级-9运行

    进程特性

    1. 进程之间互不影响,独立工作
    2. 进程是最小的操作系统资源单元每个进程都有独立的空间,各自占用一定的虚拟内存

    孤儿进程

    1. 父进程先于子进程退出。目前,子进程称为孤儿进程
    2. 孤儿进程是操作系统指定的进程 接受后,系统进程成为孤儿进程的主进程

    僵尸进程

    1. 进程在父进程退出前退出,但父进程不处理子进程的退出状态。这时子进程就变成了僵尸进程
    2. 僵尸进程在内存中保留了少量的PCB信息。大量僵尸进程消耗系统资源。应避免生成僵尸进程
    • 如何避免生成僵尸进程
      • 处理子进程退出状态
        pid, status = os.wait()

      • 功能:子进程退出及等待时间

        返回值:

    需要理解

    1. 什么是流程? ?
    2. 子进程从下一个fork语句开始执行,不干扰父进程
    3. 父子进程的执行顺序可能不固定。父子进程共享一个终端屏幕
    4. 父子进程通常会根据fork返回值的差异来选择执行不同的进程。 if代码的结构几乎是fork的标准
    5. 父子进程空间是独立的,操作都是这个空间的内容,互不影响
    6. 子进程也有自己的属性,比如PID号、PCB、指令集等
    7. 进程相关函数

      功能方法 参数 进程说明
      当前进程的编号 Pren。
      os.getppid() 返回进程父进程当前 PID 号
      os._exit( status ) 程序退出状态 进程退出
      它)❙] Number:显示退出状态,不写 默认为 进程退出

      import os
      pid = os.fork()
      if pid < 0:
          print("创建进程失败")
      elif pid == 0:
          print("子进程我的真实PID为:",os.getpid(),"我的父进程PID为:",os.getppid())
      else:
          print("我是父进程执行的代码,当前的变量pid为:",pid,"我的真实PID为:",os.getpid())
          
      '''打印内容''''''
      我是父进程执行的代码,当前的变量pid为: 10992 我的真实PID为: 10991
      子进程我的真实PID为: 10992 我的父进程PID为: 10991
      ''''''打印内容'''
          
      # 如果pid 10992和子进程真实PID不同,那么这个子进程就变成了孤儿进程

      from multiprocessing import Process

      Process () 根据位置

    8. kwargs 的函数目标函数:字典,成对传递参数给目标函数键值
    9. 名称:字符串,新进程名称
    10. 例如: p = Process(target = fun,args=(a,b))
    11. 函数方法说明
      p.start() 启动进程,目标函数自动执行,此时进程真正创建完成 on([p.等待子进程回收的块,超时为过期时间
      p.is_alive() 设置进程的生命周期状态,这个是生命周期中的,返回一个布尔值 p.name()获取进程名称
      p.pid() 获取进程pid
      p.daemon() 默认状态为False。退出父进程不会影响子进程。正确:子进程随主进程结束
    • 使用多处理创建子进程。同样,子进程复制父进程的所有代码。父子进程独立执行,互不影响。父子进程有自己的工作空间
    • 如果不使用 join 引用子进程的话,子进程退出后就会变成僵尸进程
    from multiprocessing import Process 
    from time import sleep 
    ​
    #带参数的进程函数
    def worker(sec,name):
        for i in range(3):
            sleep(sec)
            print("I'm %s"%name)
            print("I'm working...")
    ​
    p = Process(target = worker,args = (2,),\
        kwargs = {'name':'Daivl'},name = "Worker")
    p.start()
    ​
    print("Process name:",p.name) #进程名称
    print("Process PID:",p.pid) #获取进程PID
    ​
    #进程alive情况
    print("Process is alive:",p.is_alive())
    ​
    p.join(3)
    print("==================")

    自定义继承进程类
    1. 继承进程
    2. 自己写__init__,同时加载这个_init方法
    3. 并覆盖 Start 方法,可自动运行通过在创建的对象上调用 start
    from  import Process
    import time
    ​
    class ClockProcess(Process):
    ​
        def __init__(self,value):
            self.value = value
            super(ClockProcess,self).__init__()
    ​
        def run(self):
            for i in range(5):
                print("现在的时间是",time.ctime())
                time.sleep(self.value)
    ​
    # 创建自定义进的类的对象
    p =ClockProcess(2)
    ​
    # 自动调用run
    p.start()
    p.join()    

    原因是需要完成几个过程❝ 。进程需要频繁的创建和删除,给计算机带来较多的资源消耗

  • 基本思想
    • 创建一个合适的进程,放入进程池中,利用池中的处理时间。当前任务处理完成后,进程不会被销毁。 ,其他时间进程池仍在等待处理。进程复用,减少系统资源消耗
  • 使用方法
    1. 创建进程池,并在池中放入合适的进程
    2. 将事件添加到事件队列中 ❀ 直到所有进程都启动
    3. 关闭进程池并回收进程
  • Pool function

    • Pool(Processes)
      • 功能:创建多个进程池对象 † ive func as tuple 传递参数
      • kwds :将参数传递给函数 func in字典的形式
    • 返回对象值
  • pool.apply( func, args, kwds)
    • 功能:为进程池添加时间? pool.join()
      • 功能:重用进程池
    • pool.map(func,iter)
      • 功能:将要完成的事件添加到进程池中 func:执行函数iter:迭代对象 返回值:多个返回结果列
    from multiprocessing import Process
    import time
    ​
    class ClockProcess(Process):
    ​
        def __init__(self,value):
            self.value = value
            super(ClockProcess,self).__init__()
    ​
        def run(self):
            for i in range(5):
                print("现在的时间是",time.ctime())
                time.sleep(self.value)
    ​
    # 创建自定义进的类的对象
    p =ClockProcess(2)
    ​
    # 自动调用run
    p.start()
    p.join()    

    包含进程间通信(IPC)、消息队列、共享内存、信号、信号量、套接字

    管道 消息队列 共享内存
    可用空间内存内存
    读写方式双端读写[双向/单向]先入先出 覆盖之前的内容
    效率
    通用❙♺更高
    应用多用于父子进程广泛灵活必须注意相互排斥

  • 管道连通

    • 在内存中打开管道空间,生成管道操作对象,多个进程使用“同一个”管道对象进行操作,即。可实现通信

      multiprocessing --> Pipe

      • fd1,fd2 = Pipe (duplex = True)
        • 功能:创建管道❀❀❀❀❀❀是管道的默认参数 Set False 为单向管道
      • 返回值
        • 如果是双向管道,可读写
        • 如果是单向管道,fd1 为只读,fd2 为写入-only
    • fd.recv()
      • 功能:从管道
      • 读取信息返回值:读取内容
      • 如果管道为空则阻塞endfta)s。功能:向管道写入内容
      • 参数:可写内容
      • 可以发送任何Python数据类型

    多进程管道传输数据示例❀消息队列 顺序:先入先出出,按顺序

    通信原理:在内存中创建队列数据结构模型。多个进程可以通过队列存储内容。检索内容的顺序与保存内容的顺序一致。

    • 创建队列。默认表示按照内存分配存储
    • 返回值:队列对象
  • q.put(data, [block, ti​​meout])
    • 功能:将消息存储到队列中 数据:保存内容
    • 阻塞:默认队列满时阻塞。如果设置为False,则不会被阻止。
    • timeout:超时时间
  • data = q.get([block, ti​​meout])
    • 功能:获取队列消息
    • 参数默认阻塞,当为空时。如果设置为False,则不会阻塞。
    • timeout:超时
  • 返回值:返回下载的内容
  • q.full():判断队列是否满❙空❙q。 :判断队列是否为空。

    用于数据传输的多处理消息队列

    from multiprocessing import Queue,Process
    from time import sleep
    ​
    # 创建队列,可以放3条消息
    q = Queue(3)
    ​
    def fun1():
        sleep(1)
        q.put({"a":1,"b":2})
    ​
    def fun2():
        sleep(2)
        print("收到消息",q.get())
    ​
    p1 = Process(target = fun1)
    p2 = Process(target = fun2)
    p1.start()
    p2.start()
    p1.join()
    p2.join()

    共享内存

    通信原理:在内存中打开对多个进程可见的空间,但每个先前进程都可见能写就写。内容。

    只能发送单个数据

    • obj = Value(ctype, obj)
      • 功能:打开共享内存空间 obj:共享内存初始数据
    • 返回值:共享内存对象
    • obj .value 是共享内存值。改变它可以改变内存
    • from multiprocessing import Value
      from time import sleep
      import os
      # 创建共享内存对象
      money = Value('i',2000)
      ​
      # 操作共享内存
      def deposite():
          while True:
              i = int(input("请输入:"))
              money.value = i
              sleep(0.05)
      ​
      def withdraw():
          data = money.value
          while True:
              if data != money.value :
                  data = money.value
                  print(data)
                  
      pid = os.fork()
      if pid == 0 :
          deposite()
      withdraw() 
  • obj = Array(ctype, obj)
    • 功能:打开共享内存空间
    • 参数:
      • ctype: 存储的数据格式:存储的内容如列表、字符串。如果是数字,则表示开放内存空间的数量
    • 返回值:返回一个可遍历的共享内存对象
    • from multiprocessing import Array,Process
      from time import sleep
      import os
      ​
      # 开辟100字符内存空间,'c'代表字符,'i'代表整形
      shm = Array('c',100)
      # 必须使用字节流
      shm.value = "哈哈哈".encode()
      def fun1():
          print(os.getpid(),"子进程1:",shm.value.decode())
          shm.value = "夜夜夜".encode()
      ​
      def fun2():
          sleep(1)
          print(os.getpid(), "子进程2:",shm.value.decode())
      ​
      p1 = Process(target = fun1)  
      p2 = Process(target = fun2)
      p1.start()
      p2.start()
      p1.join()
      p2.join()
      ​
      ​

  • 信号通信 向另一个进程发送信号传输某些信息,接收者根据发送的信息做相应的事情

    $ Kill -l 参见系统信号描述

    $ Kill -9 PID 号 发送信号给进程

    信号名称 描述
    1) SIGHUP中止2) SIGINTctrl+cctrl+c♶♶❀♶ +\
    20) SIG TSTP ctrl +z
    9) SIGKILL19) SIGSTOPctrl +z
    停止进程
    26) SIGVTALRM1)❙ LD 进程退出时发送给父进程的信号

    Python 导入信号接收信号

    • os.kill(pid, sig )signal♺d s
      • pid : 要发送的P信号ID号
      • sig:信号名称
    import os
    import signal
    os.kill(12345,signal.SIGKILL) #杀死进程

    版权声明

    本文仅代表作者观点,不代表Code前端网立场。
    本文系作者Code前端网发表,如需转载,请注明页面地址。

    发表评论:

    ◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

    热门