跳转至

async_exec

本模块提供并行执行器, 及方便函数 io_bound, cpu_bound.

提示: 若需要代替, 建议使用 unsync 库.

ParallelExecutor 🔗

并行执行器.

Source code in src/graia/ariadne/util/async_exec.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
class ParallelExecutor:
    """并行执行器."""

    thread_exec: ThreadPoolExecutor
    proc_exec: ProcessPoolExecutor
    loop_ref_dict: ClassVar[Dict[AbstractEventLoop, "ParallelExecutor"]] = {}
    func_mapping: ClassVar[Dict[Tuple[str, str], Callable]] = {}

    def __init__(
        self,
        loop: Optional[AbstractEventLoop] = None,
        max_thread: Optional[int] = None,
        max_process: Optional[int] = None,
    ):
        """初始化并行执行器.

        Args:
            loop (AbstractEventLoop, optional): 要绑定的事件循环, 会自动获取当前事件循环. Defaults to None.
            max_thread (int, optional): 最大线程数. Defaults to None.
            max_process (int, optional): 最大进程数. Defaults to None.

        `max_thread` 与 `max_process` 参数默认值请参阅 `concurrent.futures`.
        """
        self.thread_exec = ThreadPoolExecutor(max_workers=max_thread)
        self.proc_exec = ProcessPoolExecutor(
            max_workers=max_process, initializer=_reg_sigint
        )  # see issue #50
        self.bind_loop(loop or asyncio.get_running_loop())

    @classmethod
    def get(cls, loop: Optional[AbstractEventLoop] = None) -> "ParallelExecutor":
        """获取 ParallelExecutor 实例

        Args:
            loop (AbstractEventLoop, optional): 查找的事件循环. Defaults to None.

        Returns:
            ParallelExecutor: 找到的 / 新创建的 ParallelExecutor.
        """
        loop = loop or asyncio.get_running_loop()
        if loop not in cls.loop_ref_dict:
            cls.loop_ref_dict[loop] = ParallelExecutor()
        return cls.loop_ref_dict[loop]

    def bind_loop(self, loop: AbstractEventLoop):
        """绑定本实例到 loop.

        Args:
            loop (AbstractEventLoop): 要绑定到的事件循环.
        """
        self.loop_ref_dict[loop] = self

    @classmethod
    def shutdown(cls):
        """关闭本类的所有底层 Executor."""
        for exec in cls.loop_ref_dict.values():
            exec.close()

    def close(self):
        """关闭实例的所有底层 Executor."""
        self.thread_exec.shutdown()
        self.proc_exec.shutdown()

    @classmethod
    def run_func(cls, name: str, module: str, args: tuple, kwargs: dict) -> Any:
        """运行函数的实现

        Args:
            name (str): 函数名 (__qualname__)
            module (str): 函数所在模块名 (__module__)
            args (tuple): 位置参数
            kwargs (dict): 关键字参数

        Returns:
            Any: 底层函数的返回值
        """
        importlib.import_module(module)
        return cls.func_mapping[module, name](*args, **kwargs)

    @classmethod
    def run_func_static(cls, func: Callable[..., R], args: tuple, kwargs: dict) -> R:
        """调用一个静态函数 (会自动解包装已被 ParallelExecutor 包装过的函数)

        Args:
            func (Callable[..., R]): 要调用的函数
            args (tuple): 位置参数
            kwargs (dict): 关键字参数

        Returns:
            R: 底层函数的返回值
        """
        if (func.__module__, func.__qualname__) in cls.func_mapping:
            func = cls.func_mapping[func.__module__, func.__qualname__]
        return func(*args, **kwargs)  # type: ignore

    def to_thread(self, func: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> Awaitable[R]:
        """在线程中异步运行 func 函数.

        Args:
            func (Callable[P, R]): 要调用的函数.
            *args (P.args): 附带的位置参数.
            **kwargs (P.kwargs): 附带的关键词参数.

        Returns:
            Future[R]: 返回结果. 需要被异步等待.
        """
        return asyncio.get_running_loop().run_in_executor(  # type: ignore
            self.thread_exec,
            ParallelExecutor.run_func_static,
            func,
            args,
            kwargs,
        )

    def to_process(self, func: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> Awaitable[R]:
        """在进程中异步运行 func 函数. 需要先注册过才行.

        Args:
            func (Callable[P, R]): 要调用的函数.
            *args (P.args): 附带的位置参数.
            **kwargs (P.kwargs): 附带的关键词参数.

        Returns:
            Future[R]: 返回结果. 需要被异步等待.
        """
        return asyncio.get_running_loop().run_in_executor(  # type: ignore
            self.proc_exec,
            ParallelExecutor.run_func_static,
            func,
            args,
            kwargs,
        )

__init__(loop=None, max_thread=None, max_process=None) 🔗

初始化并行执行器.

Parameters:

Name Type Description Default
loop AbstractEventLoop

要绑定的事件循环, 会自动获取当前事件循环. Defaults to None.

None
max_thread int

最大线程数. Defaults to None.

None
max_process int

最大进程数. Defaults to None.

None

max_threadmax_process 参数默认值请参阅 concurrent.futures.

Source code in src/graia/ariadne/util/async_exec.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def __init__(
    self,
    loop: Optional[AbstractEventLoop] = None,
    max_thread: Optional[int] = None,
    max_process: Optional[int] = None,
):
    """初始化并行执行器.

    Args:
        loop (AbstractEventLoop, optional): 要绑定的事件循环, 会自动获取当前事件循环. Defaults to None.
        max_thread (int, optional): 最大线程数. Defaults to None.
        max_process (int, optional): 最大进程数. Defaults to None.

    `max_thread` 与 `max_process` 参数默认值请参阅 `concurrent.futures`.
    """
    self.thread_exec = ThreadPoolExecutor(max_workers=max_thread)
    self.proc_exec = ProcessPoolExecutor(
        max_workers=max_process, initializer=_reg_sigint
    )  # see issue #50
    self.bind_loop(loop or asyncio.get_running_loop())

bind_loop(loop) 🔗

绑定本实例到 loop.

Parameters:

Name Type Description Default
loop AbstractEventLoop

要绑定到的事件循环.

required
Source code in src/graia/ariadne/util/async_exec.py
77
78
79
80
81
82
83
def bind_loop(self, loop: AbstractEventLoop):
    """绑定本实例到 loop.

    Args:
        loop (AbstractEventLoop): 要绑定到的事件循环.
    """
    self.loop_ref_dict[loop] = self

close() 🔗

关闭实例的所有底层 Executor.

Source code in src/graia/ariadne/util/async_exec.py
91
92
93
94
def close(self):
    """关闭实例的所有底层 Executor."""
    self.thread_exec.shutdown()
    self.proc_exec.shutdown()

get(loop=None) classmethod 🔗

获取 ParallelExecutor 实例

Parameters:

Name Type Description Default
loop AbstractEventLoop

查找的事件循环. Defaults to None.

None

Returns:

Name Type Description
ParallelExecutor ParallelExecutor

找到的 / 新创建的 ParallelExecutor.

Source code in src/graia/ariadne/util/async_exec.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@classmethod
def get(cls, loop: Optional[AbstractEventLoop] = None) -> "ParallelExecutor":
    """获取 ParallelExecutor 实例

    Args:
        loop (AbstractEventLoop, optional): 查找的事件循环. Defaults to None.

    Returns:
        ParallelExecutor: 找到的 / 新创建的 ParallelExecutor.
    """
    loop = loop or asyncio.get_running_loop()
    if loop not in cls.loop_ref_dict:
        cls.loop_ref_dict[loop] = ParallelExecutor()
    return cls.loop_ref_dict[loop]

run_func(name, module, args, kwargs) classmethod 🔗

运行函数的实现

Parameters:

Name Type Description Default
name str

函数名 (qualname)

required
module str

函数所在模块名 (module)

required
args tuple

位置参数

required
kwargs dict

关键字参数

required

Returns:

Name Type Description
Any Any

底层函数的返回值

Source code in src/graia/ariadne/util/async_exec.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
@classmethod
def run_func(cls, name: str, module: str, args: tuple, kwargs: dict) -> Any:
    """运行函数的实现

    Args:
        name (str): 函数名 (__qualname__)
        module (str): 函数所在模块名 (__module__)
        args (tuple): 位置参数
        kwargs (dict): 关键字参数

    Returns:
        Any: 底层函数的返回值
    """
    importlib.import_module(module)
    return cls.func_mapping[module, name](*args, **kwargs)

run_func_static(func, args, kwargs) classmethod 🔗

调用一个静态函数 (会自动解包装已被 ParallelExecutor 包装过的函数)

Parameters:

Name Type Description Default
func Callable[..., R]

要调用的函数

required
args tuple

位置参数

required
kwargs dict

关键字参数

required

Returns:

Name Type Description
R R

底层函数的返回值

Source code in src/graia/ariadne/util/async_exec.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
@classmethod
def run_func_static(cls, func: Callable[..., R], args: tuple, kwargs: dict) -> R:
    """调用一个静态函数 (会自动解包装已被 ParallelExecutor 包装过的函数)

    Args:
        func (Callable[..., R]): 要调用的函数
        args (tuple): 位置参数
        kwargs (dict): 关键字参数

    Returns:
        R: 底层函数的返回值
    """
    if (func.__module__, func.__qualname__) in cls.func_mapping:
        func = cls.func_mapping[func.__module__, func.__qualname__]
    return func(*args, **kwargs)  # type: ignore

shutdown() classmethod 🔗

关闭本类的所有底层 Executor.

Source code in src/graia/ariadne/util/async_exec.py
85
86
87
88
89
@classmethod
def shutdown(cls):
    """关闭本类的所有底层 Executor."""
    for exec in cls.loop_ref_dict.values():
        exec.close()

to_process(func, *args, **kwargs) 🔗

在进程中异步运行 func 函数. 需要先注册过才行.

Parameters:

Name Type Description Default
func Callable[P, R]

要调用的函数.

required
*args args

附带的位置参数.

()
**kwargs kwargs

附带的关键词参数.

{}

Returns:

Type Description
Awaitable[R]

Future[R]: 返回结果. 需要被异步等待.

Source code in src/graia/ariadne/util/async_exec.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def to_process(self, func: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> Awaitable[R]:
    """在进程中异步运行 func 函数. 需要先注册过才行.

    Args:
        func (Callable[P, R]): 要调用的函数.
        *args (P.args): 附带的位置参数.
        **kwargs (P.kwargs): 附带的关键词参数.

    Returns:
        Future[R]: 返回结果. 需要被异步等待.
    """
    return asyncio.get_running_loop().run_in_executor(  # type: ignore
        self.proc_exec,
        ParallelExecutor.run_func_static,
        func,
        args,
        kwargs,
    )

to_thread(func, *args, **kwargs) 🔗

在线程中异步运行 func 函数.

Parameters:

Name Type Description Default
func Callable[P, R]

要调用的函数.

required
*args args

附带的位置参数.

()
**kwargs kwargs

附带的关键词参数.

{}

Returns:

Type Description
Awaitable[R]

Future[R]: 返回结果. 需要被异步等待.

Source code in src/graia/ariadne/util/async_exec.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def to_thread(self, func: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> Awaitable[R]:
    """在线程中异步运行 func 函数.

    Args:
        func (Callable[P, R]): 要调用的函数.
        *args (P.args): 附带的位置参数.
        **kwargs (P.kwargs): 附带的关键词参数.

    Returns:
        Future[R]: 返回结果. 需要被异步等待.
    """
    return asyncio.get_running_loop().run_in_executor(  # type: ignore
        self.thread_exec,
        ParallelExecutor.run_func_static,
        func,
        args,
        kwargs,
    )

IS_MAIN_PROCESS() 🔗

返回是否为主进程

Returns:

Name Type Description
bool bool

是否为主进程

Source code in src/graia/ariadne/util/async_exec.py
16
17
18
19
20
21
22
def IS_MAIN_PROCESS() -> bool:
    """返回是否为主进程

    Returns:
        bool: 是否为主进程
    """
    return multiprocessing.parent_process() is None

cpu_bound(func) 🔗

包装一个函数在进程中异步运行.

Parameters:

Name Type Description Default
func Callable[P, R]

要包装的函数

required

Returns:

Type Description
Callable[P, Awaitable[R]]

Callable[P, Awaitable[R]]: 包装后的函数

Source code in src/graia/ariadne/util/async_exec.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def cpu_bound(func: Callable[P, R]) -> Callable[P, Awaitable[R]]:
    """包装一个函数在进程中异步运行.

    Args:
        func (Callable[P, R]): 要包装的函数

    Returns:
        Callable[P, Awaitable[R]]: 包装后的函数
    """
    mod = func.__module__
    ParallelExecutor.func_mapping["__main__" if mod == "__mp_main__" else mod, func.__qualname__] = func

    @functools.wraps(func)
    async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
        mod = func.__module__
        loop = asyncio.get_running_loop()
        executor = ParallelExecutor.get(loop)
        return await loop.run_in_executor(
            executor.proc_exec,
            ParallelExecutor.run_func,
            func.__qualname__,
            "__main__" if mod == "__mp_main__" else mod,
            args,
            kwargs,
        )

    return wrapper

io_bound(func) 🔗

包装一个函数在线程中异步运行.

Parameters:

Name Type Description Default
func Callable[P, R]

要包装的函数

required

Returns:

Type Description
Callable[P, Awaitable[R]]

Callable[P, Awaitable[R]]: 包装后的函数

Source code in src/graia/ariadne/util/async_exec.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def io_bound(func: Callable[P, R]) -> Callable[P, Awaitable[R]]:
    """包装一个函数在线程中异步运行.

    Args:
        func (Callable[P, R]): 要包装的函数

    Returns:
        Callable[P, Awaitable[R]]: 包装后的函数
    """
    ParallelExecutor.func_mapping[func.__module__, func.__qualname__] = func

    @functools.wraps(func)
    async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
        loop = asyncio.get_running_loop()
        executor = ParallelExecutor.get(loop)
        return await loop.run_in_executor(
            executor.thread_exec,
            ParallelExecutor.run_func,
            func.__qualname__,
            func.__module__,
            args,
            kwargs,
        )

    return wrapper