SharedMemory
共享内存块
Python 3.8 新增了 shared_memory 模块,用于多个进程同时对 共享内存 进行访问。
中文文档:https://docs.python.org/zh-cn/3/library/multiprocessing.shared_memory.html
shared_memory 模块定义了一个类,提供了对 共享内存 对象的引用:
from multiprocessing.shared_memory import SharedMemory
这里提到的 共享内存 指的是 System V 类型的共享内存块,而不是 分布式共享内存。
System V 共享内存
在 Linux 中,每个 进程 都有属于自己的 进程控制块 和 地址空间。地址空间 对应了一个 页表,负责将进程的 虚拟地址 映射到 物理地址。当两个不同的虚拟地址映射到同一个物理地址时,这个物理地址指向的区域就是一块 共享内存。
但是,共享内存 没有提供 同步机制,即一个进程的写入操作尚未完成时保护共享内存区域的数据不被其它进程读取或者改写。共享内存的安全机制需要通过其它方式实现,例如 信号量。
使用
ipcs -m命令可以查看系统中的共享存储段。参考博客 https://blog.csdn.net/ypt523/article/details/79958188。
分布式共享内存
应用于分布式系统上,提供了一个逻辑上统一的地址空间。
进程直接从内存读取数据,相对于在进程间发送消息、直接从磁盘读取数据或者通过其它序列化、反序列化方式复制数据,直接读取共享内存更加高效。
class SharedMemory(name=None, create=False, size=0)
name:readonly, 共享内存块的唯一标识size:readonly, 共享内存块的 字节 大小buf:内容,传递给 numpy 可以从中提取出数据close():关闭实例对于共享内存的访问连接unlink():请求销毁底层的共享内存块;只能调用一次。
通过 name 参数和 create 参数组合,可以选择 新建共享内存块 还是 引用存在的共享内存块。
共享内存块的寿命可能会超过创建它的原始进程。当子进程不再使用内存块时,应该调用 close() 关闭当前进程对它的引用;当共享内存块不被任何进行使用时,应该调用 unlink() 方法清理掉它。
共享字节数组
新建共享内存块
原生数组可以通过将其 字节数组 赋值给 SharedMemory 实例的 .buf 属性来实现内存共享。
from multiprocessing.shared_memory import SharedMemory
shm = SharedMemory(create=True, size=10)
buffer = shm.buf
buffer[:4] = bytearray([22, 33, 44, 55])
buffer[4] = 100
重引共享内存块
使用 array 模块来获取共享内存块中的数据。
shm = SharedMemory(name=other_shm.name)
import array
## 从共享内存块中引用出数据
arr = array.array('b', shm.buf[:5])
## 引用的数据可以直接修改,赋值字节类型的数据
arr[:5] = b'howdy'
## 直接访问共享内存块中的字节数据
data = bytes(shm.buf[:5]) #=> b'howdy'
## 不要忘了关闭和注销
shm.close()
shm.unlink()
共享 numpy 数组
新建共享内存块
可以从一个已知的 numpy 数组新建一个共享内存块。
# np.ndarray 实例的 nbytes 属性返回该实例占用的字节大小
arr = np.zeros((1000,), dtype='int8')
# 不指定 name 时会分配一个默认的名字
# create 为 True 表示新建共享内存块
# size 指定共享内存块的字节大小
shm = SharedMemory(create=True, size=arr.nbytes)
## 获取 SharedMemory 实例的 Numpy 数组引用
# buffer=shm.buf 指定了数据来源
ref = np.ndarray(arr.shape, dtype=arr.dtype, buffer=shm.buf)
## 给 ref 赋值相当于给共享内存块赋值
## 需要复制数组的元素,而不是数组的引用
ref[:] = arr[:]
# ref = arr # 这是复制引用,ref的含义变了
## 派发任务到每个进程
...
## 等待所有子进程任务结束
...
## 销毁共享内存块
shm.close().
shm.unlink()
重引共享内存块
一般情况下,我们可以在 主进程 中新建一个共享内存块(上面例子中的 shm),并将 shm.name 传递给 子进程。在子进程中,可以通过指定共享内存块的唯一标识符实例化 SharedMemory 获取对共享内存块的引用。
def worker(shm_name: str, *args, **kwargs):
shm = SharedMemory(name=shm_name)
ref = np.ndarray(shm.size, dtype='int8', buffer=shm.buf)
...
## 关闭引用
shm.close()
return
ShareableList
from multiprocessing.shared_memory import ShareableList
class ShareableList(sequence=None, *, name=None):
## value 出现的次数
def count(self, value): pass
## 返回 value 首次出现的位置
# 如果 value 不存在, 则抛出 ValueError 异常
def index(self, value): pass
## 包含由所有当前存储值所使用的 struct 打包格式的只读属性
format: readonly struct
## 存储了值的 SharedMemory 实例
shm: SharedMemory
一种跟 list 相似的数据结构,只是其数据存储在 共享内存块 中:
sequence不为None时创建一个ShareableList实例sequence为None时通过指定name来关联已经存在的ShareableList实例
然而 ShareableList 与内置的 list 相比存在诸多限制:
- 只能存储基本的数据类型,例如
int,float,bool,str,bytes,None - 长度不能修改,不能使用
append、insert等方法动态插入数据,共享的数据在实例化时指定 - 不能通过对
ShareableList使用 切片 来创建新的ShareableList实例
例如:
smm.ShareableList(range(10))
#> ShareableList([0, 1, 2, 3], name='psm_6572_7512')
smm.ShareableList('abcd')
#> ShareableList(['a', 'b', 'c', 'd'], name='psm_6572_12221')
SharedMemoryManager
SharedMemory 提供了管理共享内存块的 Python API,但是它仍需要用户手动跟踪和释放共享内存资源。此外,还可以通过 multiprocessing.managers.SharedMemoryManager 类来协助管理共享内存的生命周期。SharedMemoryManager 提供了在多进程情景下对共享内存块的自动管理。
SharedMemoryManager 实例主要有两个方法:
.start():启动一个新进程。通过此种方式启动的进程可以使得当前的SharedMemoryManager实例能够管理由子进程创建的共享内存块的生存周期.shutdown():在停止进程前触发实例管理的所有共享内存块的.unlink()方法
from multiprocessing.managers import SharedMemoryManager
with SharedMemoryManager() as smm:
smm.start()
...
smm.shutdown()
上面例子中的 smm 是 SharedMemoryManager 的一个实例。除了 start 和 shutdown 外,它还提供了用于共享数据的两个方法(ShareableList 和 SharedMemory):
.SharedMemory(size)创建一个指定字节大小的SharedMemory对象并返回.ShareableList(sequence)指定输入序列创建并返回一个新的ShareableList对象

评论区