Post

Python多进程开发

记录一下 multiprocess的多种使用方法

Python多进程开发

Pool的基础使用方法

1. 使用 Pool.map() 并行处理数据

这个方法可以用来对一个列表的每个元素应用同一个函数,并收集结果。

1
2
3
4
5
6
7
8
from multiprocessing import Pool

def square(x):
    return x * x

if __name__ == "__main__":
    with Pool(5) as p:
        print(p.map(square, [1, 2, 3, 4, 5]))

2. 使用 Pool.apply_async() 异步执行任务

这个方法允许异步执行一个任务,不必等待所有任务完成。

1
2
3
4
5
6
7
8
9
10
11
12
from multiprocessing import Pool

def cube(x):
    return x * x * x

if __name__ == "__main__":
    pool = Pool(processes=4)
    results = [pool.apply_async(cube, (i,)) for i in range(1, 7)]
    output = [result.get() for result in results]
    print(output)
    pool.close()
    pool.join()

3. 使用 Pool.starmap() 并行处理多参数函数

这个方法适用于需要传递多个参数给处理函数的场景。

1
2
3
4
5
6
7
8
from multiprocessing import Pool

def add(x, y):
    return x + y

if __name__ == "__main__":
    with Pool(4) as p:
        print(p.starmap(add, [(1, 2), (3, 4), (5, 6)]))

4. 使用 Pool.imap()Pool.imap_unordered()

这两个方法用于迭代大型数据或当你想即时开始处理任务而不是等待整个迭代完成时。

1
2
3
4
5
6
7
8
9
10
11
from multiprocessing import Pool
import time

def slow_increment(x):
    time.sleep(2)  # Simulate a slow function
    return x + 1

if __name__ == "__main__":
    with Pool(4) as p:
        result = list(p.imap(slow_increment, range(10), chunksize=1))
        print(result)

Pool.imap_unordered() 类似于 Pool.imap(),但结果的顺序可能与输入的不一致,适用于结果顺序不重要的场景。

由于imap只支持单参数传入,为了支持多参数,需要传入元组列表,并且在解析时进行分解

类中使用多进程开发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import multiprocessing

class Calculator:
    def square(self, x):
        return x * x

class ParallelCalculator:
    def __init__(self, numbers):
        self.numbers = numbers
        self.calculator = Calculator()
    
    def worker(self, x):
        return self.calculator.square(x) # 可以使用类的变量,但是由于多进程,不可进行修改
    
    def run_parallel(self):
        with multiprocessing.Pool(multiprocessing.cpu_count()) as pool:
            results = pool.map(self.worker, self.numbers)
        return results

if __name__ == "__main__":
    numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    parallel_calculator = ParallelCalculator(numbers)
    print(parallel_calculator.run_parallel())

使用tqdm进行多进程进度条显示

1
2
3
4
5
6
7
8
9
10
11
class Calculator:
    def square(self, x):
        time.sleep(0.3)
        return x * x

    def bar(self):
        from tqdm import tqdm

        with Pool(4) as pool:
            results = list(tqdm(pool.imap(self.square, range(10)), total=10))
            print(results)
This post is licensed under CC BY 4.0 by the author.