Skip to content

API Reference - FastAPI REST API#

src #

intentional_fastapi #

FastAPI REST API interface for Intentional.

__about__ #

Package descriptors for intentional-fastapi.

bot_interface #

FastAPI REST API interface for Intentional.

FastAPIBotInterface #

Bases: BotInterface

Bot that lets you use a FastAPI REST API to interact with the user.

Source code in plugins/intentional-fastapi/src/intentional_fastapi/bot_interface.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
class FastAPIBotInterface(BotInterface):
    """
    Bot that lets you use a FastAPI REST API to interact with the user.
    """

    name = "fastapi"

    def __init__(self, config: Dict[str, Any], intent_router: IntentRouter):
        # Init the structure
        bot_structure_config = config.pop("bot", None)
        if not bot_structure_config:
            raise ValueError(
                f"{self.__class__.__name__} requires a 'bot' configuration key to know how to structure the bot."
            )
        log.debug("Creating bot structure", bot_structure_config=bot_structure_config)
        self.bot: BotStructure = load_bot_structure_from_dict(intent_router=intent_router, config=bot_structure_config)

        # Check the modality
        self.modality = config.pop("modality")
        log.debug("Modality for %s is set", self.__class__.__name__, modality=self.modality)

    async def run(self) -> None:
        """
        Chooses the specific loop to use for this combination of bot and modality and kicks it off.
        """
        if self.modality == "text_messages":
            await self._run_text_messages(self.bot)
        elif self.modality == "audio_stream":
            await self._run_audio_stream(self.bot)
        else:
            raise ValueError(
                f"Modality '{self.modality}' is not yet supported."
                "These are the supported modalities: 'text_messages', 'audio_stream'."
            )

    async def handle_response_chunks(self, response: ResponseChunksIterator, event: Dict[str, Any]) -> None:
        """
        Stream out text messages from the bot through a TextChunksIterator.
        """
        if event["delta"]:
            await response.asend(event["delta"])

    async def _run_text_messages(self, bot: BotStructure) -> None:
        """
        Runs the interface for the text turns modality.
        """
        app = FastAPI(title="Intentional FastAPI")

        @app.get("/send")
        async def send_message(message: str):
            """
            Send a message to the bot.
            """
            response = ResponseChunksIterator()
            bot.add_event_handler(
                "on_text_message_from_llm",
                lambda event: self.handle_response_chunks(response, event),
            )
            await self.bot.send({"text_message": {"role": "user", "content": message}})
            return StreamingResponse(response)

        await bot.connect()

        config = uvicorn.Config(app, host="0.0.0.0", port=8000)
        server = uvicorn.Server(config)
        await server.serve()

    # TODO TEST THIS MODALITY!
    async def _run_audio_stream(self, bot: BotStructure) -> None:
        """
        Runs the interface for the audio stream modality.
        """

        app = FastAPI(title="Intentional FastAPI")

        @app.get("/ws/input")
        async def input_stream(websocket: WebSocket):
            await websocket.accept()
            while True:
                data = await websocket.receive()
                self.bot.send({"audio_chunk": data})

        @app.get("/ws/output")
        async def output_stream(websocket: WebSocket):
            await websocket.accept()

            async def send_audio_chunk(event: Dict[str, Any]) -> None:
                if event["delta"]:
                    await websocket.send_bytes(event["delta"])

            response = ResponseChunksIterator()
            bot.add_event_handler("on_audio_message_from_llm", send_audio_chunk)
            return StreamingResponse(response)

        await bot.connect()

        config = uvicorn.Config(app, host="0.0.0.0", port=8000)
        server = uvicorn.Server(config)
        await server.serve()
handle_response_chunks(response, event) async #

Stream out text messages from the bot through a TextChunksIterator.

Source code in plugins/intentional-fastapi/src/intentional_fastapi/bot_interface.py
83
84
85
86
87
88
async def handle_response_chunks(self, response: ResponseChunksIterator, event: Dict[str, Any]) -> None:
    """
    Stream out text messages from the bot through a TextChunksIterator.
    """
    if event["delta"]:
        await response.asend(event["delta"])
run() async #

Chooses the specific loop to use for this combination of bot and modality and kicks it off.

Source code in plugins/intentional-fastapi/src/intentional_fastapi/bot_interface.py
69
70
71
72
73
74
75
76
77
78
79
80
81
async def run(self) -> None:
    """
    Chooses the specific loop to use for this combination of bot and modality and kicks it off.
    """
    if self.modality == "text_messages":
        await self._run_text_messages(self.bot)
    elif self.modality == "audio_stream":
        await self._run_audio_stream(self.bot)
    else:
        raise ValueError(
            f"Modality '{self.modality}' is not yet supported."
            "These are the supported modalities: 'text_messages', 'audio_stream'."
        )
ResponseChunksIterator #

Async iterator that collects the response chunks from the bot and streams them out.

Source code in plugins/intentional-fastapi/src/intentional_fastapi/bot_interface.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class ResponseChunksIterator:
    """
    Async iterator that collects the response chunks from the bot and streams them out.
    """

    def __init__(self):
        self.buffer = []

    def __aiter__(self):
        return self

    async def __anext__(self):
        if not self.buffer:
            raise StopAsyncIteration

        next_chunk = self.buffer[0]
        self.buffer = self.buffer[1:]
        return next_chunk

    async def asend(self, value):  # pylint: disable=missing-function-docstring
        self.buffer.append(value)
        return