跳转至

console

Ariadne 控制台

注意, 本实现并不 robust, 但是可以使用

Console 🔗

Ariadne 的控制台, 可以脱离 Ariadne 实例运行

警告: 本实现无法确保稳定性

Source code in src/graia/ariadne/console/__init__.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
class Console:
    """Ariadne 的控制台, 可以脱离 Ariadne 实例运行

    警告: 本实现无法确保稳定性
    """

    def __init__(
        self,
        broadcast: Broadcast,
        prompt: Union[Callable[[], str], AnyFormattedText] = "{library_name} {graia_ariadne_version}>",
        *,
        r_prompt: Union[Callable[[], str], AnyFormattedText] = "",
        style: Optional[Style] = None,
        extra_data_getter: Iterable[Callable[[], Dict[str, Any]]] = (),
        replace_logger: bool = True,
        listen_launch: bool = True,
        listen_shutdown: bool = True,
    ) -> None:
        """初始化控制台.

        Args:
            broadcast (Broadcast): 事件系统.
            prompt (AnyFormattedText, optional): 输入提示, 可使用 f-string 形式的格式化字符串. \
            默认为 "{library_name} {graia_ariadne_version}>".
            r_prompt (AnyFormattedText, optional): 右侧提示, 可使用 f-string 形式的格式化字符串. 默认为空.
            style (Style, optional): 输入提示的格式, 详见 prompt_toolkit 的介绍.
            extra_data_getter (Iterable[() -> Dict[str, Any], optional): \
                额外的 Callable, 用于生成 prompt 的格式化数据.
            replace_logger (bool, optional): \
                是否尝试替换 loguru 的 0 号 handler (sys.stderr) 为 StdoutProxy. 默认为 True.
            listen_launch (bool, optional): \
                是否监听 Ariadne 的 ApplicationLaunched 事件并启动自身, 默认为 True.
            listen_shutdown (bool, optional): \
                是否监听 Ariadne 的 ApplicationShutdowned 事件并停止自身, 默认为 True.
        """
        self.broadcast = broadcast

        # Handle Ariadne Event
        if listen_launch:
            broadcast.receiver(ApplicationLaunch)(self.start)

        if listen_shutdown:
            broadcast.receiver(ApplicationShutdown)(self.stop)

        self.session: PromptSession[str] = PromptSession()

        self.style = style or Style([])

        self.l_prompt: AnyFormattedText = prompt
        self.r_prompt: AnyFormattedText = r_prompt

        self.registry: List[Tuple[Callable, List[BaseDispatcher], List[Decorator]]] = []
        self.extra_data_getter = extra_data_getter

        self.running: bool = False
        self.task: Optional[Task] = None

        self.handler_id: int = 0
        self.replace_logger: bool = replace_logger

        logger.warning("Please note that console is NOT STABLE.")
        logger.warning("Use it at your own risk.")

    def data_getter(self) -> Dict[str, Any]:
        """返回用于 prompt 的数据

        Returns:
            Dict[str, Any]: 可用于 format_map 的数据字典
        """
        data = {
            "library_name": "Ariadne",
        }

        for dist in importlib.metadata.distributions():
            name: str = dist.metadata["Name"]
            version: str = dist.version
            if name.startswith("graia"):
                if name == "graia-ariadne-dev":
                    name = "graia-ariadne"
                data[f"{'_'.join(name.split('-') + ['version'])}"] = version

        for func in self.extra_data_getter:
            data.update(func())

        return data

    async def prompt(
        self,
        l_prompt: Optional[AnyFormattedText] = None,
        r_prompt: Optional[AnyFormattedText] = None,
        style: Optional[Style] = None,
    ) -> str:
        """向控制台发送一个输入请求, 异步

        Args:
            l_prompt (AnyFormattedText, optional): 左输入提示, 可使用 f-string 形式的格式化字符串. \
            默认为 "{library_name} {graia_ariadne_version}>". 注意为 l_prompt .
            r_prompt (AnyFormattedText, optional): 右侧提示, 可使用 f-string 形式的格式化字符串. 默认为空.
            style (Style, optional): 输入提示的格式, 详见 prompt_toolkit 的介绍.

        Returns:
            str: 输入结果
        """
        l_prompt = l_prompt or self.l_prompt
        r_prompt = r_prompt or self.r_prompt
        style = style or self.style
        if isinstance(l_prompt, str):
            l_prompt = l_prompt.format_map(self.data_getter())

        if isinstance(r_prompt, str):
            r_prompt = r_prompt.format_map(self.data_getter())

        try:
            return await self.session.prompt_async(
                message=l_prompt,
                rprompt=r_prompt,
                style=style,
                set_exception_handler=False,
            )
        except KeyboardInterrupt:
            self.stop()
            raise

    async def loop(self) -> None:
        """Console 的输入循环"""
        from graia.amnesia.message import MessageChain as BaseMessageChain

        from ..message.chain import MessageChain
        from ..message.element import Plain

        class _Dispatcher(BaseDispatcher):
            def __init__(self, command: str, console: Console) -> None:
                self.command = command
                self.console = console

            async def catch(self, interface: DispatcherInterface):
                if interface.annotation is str and interface.name == "command":
                    return self.command
                if interface.annotation in (MessageChain, BaseMessageChain):
                    return MessageChain([Plain(self.command)], inline=True)
                if interface.annotation is Console:
                    return self.console
                if interface.annotation is Broadcast:
                    return self.console.broadcast
                if interface.annotation is AbstractEventLoop:
                    return asyncio.get_running_loop()

        while self.running:
            try:
                command = await self.prompt()
            except KeyboardInterrupt:
                self.stop()
                raise
            for func, dispatchers, decorators in self.registry:
                try:
                    result = await self.broadcast.Executor(
                        ExecTarget(
                            func,
                            resolve_dispatchers_mixin(
                                [_Dispatcher(command, self), ContextDispatcher(), *dispatchers]
                            ),
                            decorators,
                        ),
                    )
                except DisabledNamespace as e:
                    logger.exception(e)
                except PropagationCancelled:
                    break
                except Exception:
                    pass
                else:
                    if isinstance(result, str):
                        logger.info(result)
                    elif isinstance(result, MessageChain):
                        logger.info(result.display)

    def start(self):
        """启动 Console, 幂等"""
        if not self.running:
            self.running = True

            if self.replace_logger:
                with contextlib.suppress(ValueError):
                    logger.remove(0)
                self.handler_id = logger.add(StdoutProxy(raw=True))  # type: ignore

            self.task = it(AbstractEventLoop).create_task(self.loop())

    def stop(self):
        """提示 Console 停止, 非异步, 幂等"""

        if self.running:
            logger.info("Stopping console...")

            self.running = False

            if self.replace_logger:
                logger.remove(self.handler_id)
                self.handler_id = logger.add(sys.stderr)

    async def join(self):
        """等待 Console 结束, 异步, 幂等"""
        if self.task:
            await self.task
            self.task = None

    def register(
        self,
        dispatchers: Optional[List[BaseDispatcher]] = None,
        decorators: Optional[List[Decorator]] = None,
    ):
        """注册命令处理函数

        Args:
            dispatchers (List[BaseDispatcher], optional): 使用的 Dispatcher 列表.
            decorators (List[Decorator], optional): 使用的 Decorator 列表.
        """

        def wrapper(func: Callable):
            self.registry.append((func, dispatchers or [], decorators or []))
            return func

        return wrapper

__init__(broadcast, prompt='{library_name} {graia_ariadne_version}>', *, r_prompt='', style=None, extra_data_getter=(), replace_logger=True, listen_launch=True, listen_shutdown=True) 🔗

初始化控制台.

Parameters:

Name Type Description Default
broadcast Broadcast

事件系统.

required
prompt AnyFormattedText

输入提示, 可使用 f-string 形式的格式化字符串. 默认为 "{library_name} {graia_ariadne_version}>".

'{library_name} {graia_ariadne_version}>'
r_prompt AnyFormattedText

右侧提示, 可使用 f-string 形式的格式化字符串. 默认为空.

''
style Style

输入提示的格式, 详见 prompt_toolkit 的介绍.

None
extra_data_getter Iterable[() -> Dict[str, Any]

额外的 Callable, 用于生成 prompt 的格式化数据.

()
replace_logger bool

是否尝试替换 loguru 的 0 号 handler (sys.stderr) 为 StdoutProxy. 默认为 True.

True
listen_launch bool

是否监听 Ariadne 的 ApplicationLaunched 事件并启动自身, 默认为 True.

True
listen_shutdown bool

是否监听 Ariadne 的 ApplicationShutdowned 事件并停止自身, 默认为 True.

True
Source code in src/graia/ariadne/console/__init__.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def __init__(
    self,
    broadcast: Broadcast,
    prompt: Union[Callable[[], str], AnyFormattedText] = "{library_name} {graia_ariadne_version}>",
    *,
    r_prompt: Union[Callable[[], str], AnyFormattedText] = "",
    style: Optional[Style] = None,
    extra_data_getter: Iterable[Callable[[], Dict[str, Any]]] = (),
    replace_logger: bool = True,
    listen_launch: bool = True,
    listen_shutdown: bool = True,
) -> None:
    """初始化控制台.

    Args:
        broadcast (Broadcast): 事件系统.
        prompt (AnyFormattedText, optional): 输入提示, 可使用 f-string 形式的格式化字符串. \
        默认为 "{library_name} {graia_ariadne_version}>".
        r_prompt (AnyFormattedText, optional): 右侧提示, 可使用 f-string 形式的格式化字符串. 默认为空.
        style (Style, optional): 输入提示的格式, 详见 prompt_toolkit 的介绍.
        extra_data_getter (Iterable[() -> Dict[str, Any], optional): \
            额外的 Callable, 用于生成 prompt 的格式化数据.
        replace_logger (bool, optional): \
            是否尝试替换 loguru 的 0 号 handler (sys.stderr) 为 StdoutProxy. 默认为 True.
        listen_launch (bool, optional): \
            是否监听 Ariadne 的 ApplicationLaunched 事件并启动自身, 默认为 True.
        listen_shutdown (bool, optional): \
            是否监听 Ariadne 的 ApplicationShutdowned 事件并停止自身, 默认为 True.
    """
    self.broadcast = broadcast

    # Handle Ariadne Event
    if listen_launch:
        broadcast.receiver(ApplicationLaunch)(self.start)

    if listen_shutdown:
        broadcast.receiver(ApplicationShutdown)(self.stop)

    self.session: PromptSession[str] = PromptSession()

    self.style = style or Style([])

    self.l_prompt: AnyFormattedText = prompt
    self.r_prompt: AnyFormattedText = r_prompt

    self.registry: List[Tuple[Callable, List[BaseDispatcher], List[Decorator]]] = []
    self.extra_data_getter = extra_data_getter

    self.running: bool = False
    self.task: Optional[Task] = None

    self.handler_id: int = 0
    self.replace_logger: bool = replace_logger

    logger.warning("Please note that console is NOT STABLE.")
    logger.warning("Use it at your own risk.")

data_getter() 🔗

返回用于 prompt 的数据

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: 可用于 format_map 的数据字典

Source code in src/graia/ariadne/console/__init__.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def data_getter(self) -> Dict[str, Any]:
    """返回用于 prompt 的数据

    Returns:
        Dict[str, Any]: 可用于 format_map 的数据字典
    """
    data = {
        "library_name": "Ariadne",
    }

    for dist in importlib.metadata.distributions():
        name: str = dist.metadata["Name"]
        version: str = dist.version
        if name.startswith("graia"):
            if name == "graia-ariadne-dev":
                name = "graia-ariadne"
            data[f"{'_'.join(name.split('-') + ['version'])}"] = version

    for func in self.extra_data_getter:
        data.update(func())

    return data

join() async 🔗

等待 Console 结束, 异步, 幂等

Source code in src/graia/ariadne/console/__init__.py
233
234
235
236
237
async def join(self):
    """等待 Console 结束, 异步, 幂等"""
    if self.task:
        await self.task
        self.task = None

loop() async 🔗

Console 的输入循环

Source code in src/graia/ariadne/console/__init__.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
async def loop(self) -> None:
    """Console 的输入循环"""
    from graia.amnesia.message import MessageChain as BaseMessageChain

    from ..message.chain import MessageChain
    from ..message.element import Plain

    class _Dispatcher(BaseDispatcher):
        def __init__(self, command: str, console: Console) -> None:
            self.command = command
            self.console = console

        async def catch(self, interface: DispatcherInterface):
            if interface.annotation is str and interface.name == "command":
                return self.command
            if interface.annotation in (MessageChain, BaseMessageChain):
                return MessageChain([Plain(self.command)], inline=True)
            if interface.annotation is Console:
                return self.console
            if interface.annotation is Broadcast:
                return self.console.broadcast
            if interface.annotation is AbstractEventLoop:
                return asyncio.get_running_loop()

    while self.running:
        try:
            command = await self.prompt()
        except KeyboardInterrupt:
            self.stop()
            raise
        for func, dispatchers, decorators in self.registry:
            try:
                result = await self.broadcast.Executor(
                    ExecTarget(
                        func,
                        resolve_dispatchers_mixin(
                            [_Dispatcher(command, self), ContextDispatcher(), *dispatchers]
                        ),
                        decorators,
                    ),
                )
            except DisabledNamespace as e:
                logger.exception(e)
            except PropagationCancelled:
                break
            except Exception:
                pass
            else:
                if isinstance(result, str):
                    logger.info(result)
                elif isinstance(result, MessageChain):
                    logger.info(result.display)

prompt(l_prompt=None, r_prompt=None, style=None) async 🔗

向控制台发送一个输入请求, 异步

Parameters:

Name Type Description Default
l_prompt AnyFormattedText

左输入提示, 可使用 f-string 形式的格式化字符串. 默认为 "{library_name} {graia_ariadne_version}>". 注意为 l_prompt .

None
r_prompt AnyFormattedText

右侧提示, 可使用 f-string 形式的格式化字符串. 默认为空.

None
style Style

输入提示的格式, 详见 prompt_toolkit 的介绍.

None

Returns:

Name Type Description
str str

输入结果

Source code in src/graia/ariadne/console/__init__.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
async def prompt(
    self,
    l_prompt: Optional[AnyFormattedText] = None,
    r_prompt: Optional[AnyFormattedText] = None,
    style: Optional[Style] = None,
) -> str:
    """向控制台发送一个输入请求, 异步

    Args:
        l_prompt (AnyFormattedText, optional): 左输入提示, 可使用 f-string 形式的格式化字符串. \
        默认为 "{library_name} {graia_ariadne_version}>". 注意为 l_prompt .
        r_prompt (AnyFormattedText, optional): 右侧提示, 可使用 f-string 形式的格式化字符串. 默认为空.
        style (Style, optional): 输入提示的格式, 详见 prompt_toolkit 的介绍.

    Returns:
        str: 输入结果
    """
    l_prompt = l_prompt or self.l_prompt
    r_prompt = r_prompt or self.r_prompt
    style = style or self.style
    if isinstance(l_prompt, str):
        l_prompt = l_prompt.format_map(self.data_getter())

    if isinstance(r_prompt, str):
        r_prompt = r_prompt.format_map(self.data_getter())

    try:
        return await self.session.prompt_async(
            message=l_prompt,
            rprompt=r_prompt,
            style=style,
            set_exception_handler=False,
        )
    except KeyboardInterrupt:
        self.stop()
        raise

register(dispatchers=None, decorators=None) 🔗

注册命令处理函数

Parameters:

Name Type Description Default
dispatchers List[BaseDispatcher]

使用的 Dispatcher 列表.

None
decorators List[Decorator]

使用的 Decorator 列表.

None
Source code in src/graia/ariadne/console/__init__.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
def register(
    self,
    dispatchers: Optional[List[BaseDispatcher]] = None,
    decorators: Optional[List[Decorator]] = None,
):
    """注册命令处理函数

    Args:
        dispatchers (List[BaseDispatcher], optional): 使用的 Dispatcher 列表.
        decorators (List[Decorator], optional): 使用的 Decorator 列表.
    """

    def wrapper(func: Callable):
        self.registry.append((func, dispatchers or [], decorators or []))
        return func

    return wrapper

start() 🔗

启动 Console, 幂等

Source code in src/graia/ariadne/console/__init__.py
209
210
211
212
213
214
215
216
217
218
219
def start(self):
    """启动 Console, 幂等"""
    if not self.running:
        self.running = True

        if self.replace_logger:
            with contextlib.suppress(ValueError):
                logger.remove(0)
            self.handler_id = logger.add(StdoutProxy(raw=True))  # type: ignore

        self.task = it(AbstractEventLoop).create_task(self.loop())

stop() 🔗

提示 Console 停止, 非异步, 幂等

Source code in src/graia/ariadne/console/__init__.py
221
222
223
224
225
226
227
228
229
230
231
def stop(self):
    """提示 Console 停止, 非异步, 幂等"""

    if self.running:
        logger.info("Stopping console...")

        self.running = False

        if self.replace_logger:
            logger.remove(self.handler_id)
            self.handler_id = logger.add(sys.stderr)