Skip to content

API Reference - OpenAI#

src #

intentional_openai #

Init file for intentional_openai.

__about__ #

Package descriptors for intentional-openai.

chatcompletion_api #

Client for OpenAI's Chat Completion API.

ChatCompletionAPIClient #

Bases: TurnBasedModelClient

A client for interacting with the OpenAI Chat Completion API.

Source code in plugins/intentional-openai/src/intentional_openai/chatcompletion_api.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
class ChatCompletionAPIClient(TurnBasedModelClient):
    """
    A client for interacting with the OpenAI Chat Completion API.
    """

    name: str = "openai"

    def __init__(self, parent: "BotStructure", intent_router: IntentRouter, config: Dict[str, Any]):
        """
        A client for interacting with the OpenAI Chat Completion API.

        Args:
            parent: The parent bot structure.
            intent_router: The intent router.
            config: The configuration dictionary.
        """
        logger.debug("Loading ChatCompletionAPIClient from config: %s", config)
        super().__init__(parent, intent_router)

        self.model_name = config.get("name")
        if not self.model_name:
            raise ValueError("ChatCompletionAPIClient requires a 'name' configuration key to know which model to use.")
        if "realtime" in self.model_name:
            raise ValueError(
                "ChatCompletionAPIClient doesn't support Realtime API. "
                "To use the Realtime API, use RealtimeAPIClient instead (client: openai_realtime)"
            )

        self.api_key_name = config.get("api_key_name", "OPENAI_API_KEY")
        if not os.environ.get(self.api_key_name):
            raise ValueError(
                "ChatCompletionAPIClient requires an API key to authenticate with OpenAI. "
                f"The provided environment variable name ({self.api_key_name}) is not set or is empty."
            )
        self.api_key = os.environ.get(self.api_key_name)

        self.client = openai.AsyncOpenAI(api_key=self.api_key)
        self.system_prompt = self.intent_router.get_prompt()
        self.tools = self.intent_router.current_stage.tools
        self.conversation: List[Dict[str, Any]] = [{"role": "system", "content": self.system_prompt}]

    async def run(self) -> None:
        """
        Handle events from the model by either processing them internally or by translating them into higher-level
        events that the BotStructure class can understand, then re-emitting them.
        """
        logger.debug("ChatCompletionAPIClient.run() is no-op for now")

    async def update_system_prompt(self) -> None:
        """
        Update the system prompt in the model.
        """
        self.conversation = [{"role": "system", "content": self.system_prompt}] + self.conversation[1:]
        await self.emit("on_system_prompt_updated", {"prompt": self.system_prompt})

    async def handle_interruption(self, lenght_to_interruption: int) -> None:
        """
        Handle an interruption while rendering the output to the user.

        Args:
            lenght_to_interruption: The length of the data that was produced to the user before the interruption.
                This value could be number of characters, number of words, milliseconds, number of audio frames, etc.
                depending on the bot structure that implements it.
        """
        logger.warning("TODO! Implement handle_interruption in ChatCompletionAPIClient")

    async def send(self, data: Dict[str, Any]) -> None:
        """
        Send a message to the model.
        """
        await self.emit("on_model_starts_generating_response", {})

        # Generate a response
        message = data["text_message"]
        response = await self._send_message(message)

        # Unwrap the response to make sure it contains no function calls to handle
        call_id = ""
        function_name = ""
        function_args = ""
        assistant_response = ""
        async for r in response:
            if not call_id:
                call_id = r.to_dict()["id"]
            delta = r.to_dict()["choices"][0]["delta"]

            if "tool_calls" not in delta:
                # If this is not a function call, just stream out
                await self.emit("on_text_message_from_model", {"delta": delta.get("content")})
                assistant_response += delta.get("content") or ""
            else:
                # TODO handle multiple parallel function calls
                if delta["tool_calls"][0]["index"] > 0 or len(delta["tool_calls"]) > 1:
                    logger.error("TODO: Multiple parallel function calls not supported yet. Please open an issue.")
                # Consume the response to understand which tool to call with which parameters
                for tool_call in delta["tool_calls"]:
                    if not function_name:
                        function_name = tool_call["function"].get("name")
                    function_args += tool_call["function"]["arguments"]

        if not function_name:
            # If there was no function call, update the conversation history and return
            self.conversation.append(message)
            self.conversation.append({"role": "assistant", "content": assistant_response})
        else:
            # Otherwise deal with the function call
            await self._handle_function_call(message, call_id, function_name, function_args)

        await self.emit("on_model_stops_generating_response", {})

    async def _send_message(self, message: Dict[str, Any]) -> AsyncGenerator[Dict[str, Any], None]:
        """
        Generate a response to a message.

        Args:
            message: The message to respond to.
        """
        return await self.client.chat.completions.create(
            model=self.model_name,
            messages=self.conversation + [message],
            stream=True,
            tools=[{"type": "function", "function": to_openai_tool(t)} for t in self.tools.values()],
            tool_choice="auto",
            n=1,
        )

    async def _handle_function_call(
        self, message: Dict[str, Any], call_id: str, function_name: str, function_args: str
    ):
        """
        Handle a function call from the model.
        """
        logger.debug("Function call detected: %s with args: %s", function_name, function_args)
        function_args = json.loads(function_args)

        # Routing function call - this is special because it should not be recorded in the conversation history
        if function_name == self.intent_router.name:
            await self._route(function_args)
            # Send the same message again with the new system prompt and no trace of the routing call.
            # We don't append the user message to the history in order to avoid message duplication.
            await self.send({"text_message": message})

        else:
            # Handle a regular function call - this one shows up in the history as normal
            # so we start by appending the user message
            self.conversation.append(message)
            output = await self._call_tool(call_id, function_name, function_args)
            await self.send({"text_message": {"role": "tool", "content": json.dumps(output), "tool_call_id": call_id}})

    async def _route(self, routing_info: Dict[str, Any]) -> None:
        """
        Runs the router to determine the next system prompt and tools to use.
        """
        self.system_prompt, self.tools = await self.intent_router.run(routing_info)
        await self.update_system_prompt()
        logger.debug("System prompt updated to: %s", self.system_prompt)
        logger.debug("Tools updated to: %s", self.tools)

    async def _call_tool(self, call_id, function_name, function_args):
        """
        Call a tool with the given arguments.

        Args:
            call_id: The ID of the tool call.
            function_name: The name of the tool function to call.
            function_args: The arguments to pass to the tool
        """
        # Record the tool invocation in the conversation
        self.conversation.append(
            {
                "role": "assistant",
                "tool_calls": [
                    {
                        "id": call_id,
                        "type": "function",
                        "function": {"arguments": json.dumps(function_args), "name": function_name},
                    }
                ],
            }
        )
        # Get the tool output
        logger.debug("Available tools: %s", self.tools)
        if function_name not in self.tools:
            output = f"Tool {function_name} not found."
        else:
            logger.debug("Calling tool %s with args %s.", function_name, function_args)
            output = await self.tools[function_name].run(function_args)
        logger.debug("Tool output: %s", output)
        return output
__init__(parent, intent_router, config) #

A client for interacting with the OpenAI Chat Completion API.

Parameters:

Name Type Description Default
parent BotStructure

The parent bot structure.

required
intent_router IntentRouter

The intent router.

required
config Dict[str, Any]

The configuration dictionary.

required
Source code in plugins/intentional-openai/src/intentional_openai/chatcompletion_api.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def __init__(self, parent: "BotStructure", intent_router: IntentRouter, config: Dict[str, Any]):
    """
    A client for interacting with the OpenAI Chat Completion API.

    Args:
        parent: The parent bot structure.
        intent_router: The intent router.
        config: The configuration dictionary.
    """
    logger.debug("Loading ChatCompletionAPIClient from config: %s", config)
    super().__init__(parent, intent_router)

    self.model_name = config.get("name")
    if not self.model_name:
        raise ValueError("ChatCompletionAPIClient requires a 'name' configuration key to know which model to use.")
    if "realtime" in self.model_name:
        raise ValueError(
            "ChatCompletionAPIClient doesn't support Realtime API. "
            "To use the Realtime API, use RealtimeAPIClient instead (client: openai_realtime)"
        )

    self.api_key_name = config.get("api_key_name", "OPENAI_API_KEY")
    if not os.environ.get(self.api_key_name):
        raise ValueError(
            "ChatCompletionAPIClient requires an API key to authenticate with OpenAI. "
            f"The provided environment variable name ({self.api_key_name}) is not set or is empty."
        )
    self.api_key = os.environ.get(self.api_key_name)

    self.client = openai.AsyncOpenAI(api_key=self.api_key)
    self.system_prompt = self.intent_router.get_prompt()
    self.tools = self.intent_router.current_stage.tools
    self.conversation: List[Dict[str, Any]] = [{"role": "system", "content": self.system_prompt}]
handle_interruption(lenght_to_interruption) async #

Handle an interruption while rendering the output to the user.

Parameters:

Name Type Description Default
lenght_to_interruption int

The length of the data that was produced to the user before the interruption. This value could be number of characters, number of words, milliseconds, number of audio frames, etc. depending on the bot structure that implements it.

required
Source code in plugins/intentional-openai/src/intentional_openai/chatcompletion_api.py
81
82
83
84
85
86
87
88
89
90
async def handle_interruption(self, lenght_to_interruption: int) -> None:
    """
    Handle an interruption while rendering the output to the user.

    Args:
        lenght_to_interruption: The length of the data that was produced to the user before the interruption.
            This value could be number of characters, number of words, milliseconds, number of audio frames, etc.
            depending on the bot structure that implements it.
    """
    logger.warning("TODO! Implement handle_interruption in ChatCompletionAPIClient")
run() async #

Handle events from the model by either processing them internally or by translating them into higher-level events that the BotStructure class can understand, then re-emitting them.

Source code in plugins/intentional-openai/src/intentional_openai/chatcompletion_api.py
67
68
69
70
71
72
async def run(self) -> None:
    """
    Handle events from the model by either processing them internally or by translating them into higher-level
    events that the BotStructure class can understand, then re-emitting them.
    """
    logger.debug("ChatCompletionAPIClient.run() is no-op for now")
send(data) async #

Send a message to the model.

Source code in plugins/intentional-openai/src/intentional_openai/chatcompletion_api.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
async def send(self, data: Dict[str, Any]) -> None:
    """
    Send a message to the model.
    """
    await self.emit("on_model_starts_generating_response", {})

    # Generate a response
    message = data["text_message"]
    response = await self._send_message(message)

    # Unwrap the response to make sure it contains no function calls to handle
    call_id = ""
    function_name = ""
    function_args = ""
    assistant_response = ""
    async for r in response:
        if not call_id:
            call_id = r.to_dict()["id"]
        delta = r.to_dict()["choices"][0]["delta"]

        if "tool_calls" not in delta:
            # If this is not a function call, just stream out
            await self.emit("on_text_message_from_model", {"delta": delta.get("content")})
            assistant_response += delta.get("content") or ""
        else:
            # TODO handle multiple parallel function calls
            if delta["tool_calls"][0]["index"] > 0 or len(delta["tool_calls"]) > 1:
                logger.error("TODO: Multiple parallel function calls not supported yet. Please open an issue.")
            # Consume the response to understand which tool to call with which parameters
            for tool_call in delta["tool_calls"]:
                if not function_name:
                    function_name = tool_call["function"].get("name")
                function_args += tool_call["function"]["arguments"]

    if not function_name:
        # If there was no function call, update the conversation history and return
        self.conversation.append(message)
        self.conversation.append({"role": "assistant", "content": assistant_response})
    else:
        # Otherwise deal with the function call
        await self._handle_function_call(message, call_id, function_name, function_args)

    await self.emit("on_model_stops_generating_response", {})
update_system_prompt() async #

Update the system prompt in the model.

Source code in plugins/intentional-openai/src/intentional_openai/chatcompletion_api.py
74
75
76
77
78
79
async def update_system_prompt(self) -> None:
    """
    Update the system prompt in the model.
    """
    self.conversation = [{"role": "system", "content": self.system_prompt}] + self.conversation[1:]
    await self.emit("on_system_prompt_updated", {"prompt": self.system_prompt})

realtime_api #

Client for OpenAI's Realtime API.

RealtimeAPIClient #

Bases: ContinuousStreamModelClient

A client for interacting with the OpenAI Realtime API that lets you manage the WebSocket connection, send text and audio data, and handle responses and events.

Source code in plugins/intentional-openai/src/intentional_openai/realtime_api.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
class RealtimeAPIClient(ContinuousStreamModelClient):
    """
    A client for interacting with the OpenAI Realtime API that lets you manage the WebSocket connection, send text and
    audio data, and handle responses and events.
    """

    name = "openai_realtime"

    events_translation = {
        "error": "on_error",
        "response.text.delta": "on_text_message_from_model",
        "response.audio.delta": "on_audio_message_from_model",
        "response.created": "on_model_starts_generating_response",
        "response.done": "on_model_stops_generating_response",
        "input_audio_buffer.speech_started": "on_vad_detects_user_speech_started",
        "input_audio_buffer.speech_stopped": "on_vad_detects_user_speech_started_ended",
    }

    def __init__(self, parent: Callable, intent_router: IntentRouter, config: Dict[str, Any]):
        """
        A client for interacting with the OpenAI Realtime API that lets you manage the WebSocket connection, send text
        and audio data, and handle responses and events.
        """
        logger.debug("Loading RealtimeAPIClient from config: %s", config)
        super().__init__(parent, intent_router)

        self.model_name = config.get("name")
        if not self.model_name:
            raise ValueError("RealtimeAPIClient requires a 'name' configuration key to know which model to use.")
        if "realtime" not in self.model_name:
            raise ValueError(
                "RealtimeAPIClient requires a 'realtime' model to use the Realtime API. "
                "To use any other OpenAI model, use the OpenAIClient instead."
            )

        self.api_key_name = config.get("api_key_name", "OPENAI_API_KEY")
        if not os.environ.get(self.api_key_name):
            raise ValueError(
                "RealtimeAPIClient requires an API key to authenticate with OpenAI. "
                f"The provided environment variable name ({self.api_key_name}) is not set or is empty."
            )
        self.api_key = os.environ.get(self.api_key_name)
        self.voice = config.get("voice", "alloy")

        # WebSocket connection data
        self.ws = None
        self.base_url = "wss://api.openai.com/v1/realtime"

        # Track current response state
        self._connecting = False
        self._updating_system_prompt = False
        self._current_response_id = None
        self._current_item_id = None

        # Intent routering data
        self.intent_router = intent_router
        self.system_prompt = config.get("system_prompt", self.intent_router.get_prompt())
        self.tools = self.intent_router.current_stage.tools

    async def connect(self) -> None:
        """
        Establish WebSocket connection with the Realtime API.
        """
        logger.debug("Initializing websocket connection to OpenAI Realtime API")

        url = f"{self.base_url}?model={self.model_name}"
        headers = {"Authorization": f"Bearer {self.api_key}", "OpenAI-Beta": "realtime=v1"}
        self.ws = await websockets.connect(url, extra_headers=headers)

        await self._update_session(
            {
                "modalities": ["text", "audio"],
                "instructions": self.system_prompt,
                "voice": self.voice,
                "input_audio_format": "pcm16",
                "output_audio_format": "pcm16",
                "input_audio_transcription": {"model": "whisper-1"},
                "turn_detection": {
                    "type": "server_vad",
                    "threshold": 0.5,
                    "prefix_padding_ms": 500,
                    "silence_duration_ms": 200,
                },
                "tools": [to_openai_tool(tool) for tool in self.tools.values()],
                "tool_choice": "auto",
                "temperature": 0.8,
            }
        )
        # Flag that we're connecting and look for this event in the run loop
        self._connecting = True

    async def disconnect(self) -> None:
        """
        Close the WebSocket connection.
        """
        if self.ws:
            logger.debug("Disconnecting from OpenAI Realtime API")
            await self.ws.close()
        else:
            logger.debug("Attempted disconnection of a OpenAIRealtimeAPIClient that was never connected, nothing done.")
        await self.emit("on_model_disconnection", {})

    async def run(self) -> None:  # pylint: disable=too-many-branches
        """
        Handles events coming from the WebSocket connection.

        This method is an infinite loop that listens for messages from the WebSocket connection and processes them
        accordingly. It also triggers the event handlers for the corresponding event types.
        """
        try:
            async for message in self.ws:
                event = json.loads(message)
                event_type = event.get("type")
                logger.debug("Received event: %s", event_type)

                if event_type == "error":
                    logger.error("An error response was returned: %s", event)

                elif event_type == "session.updated":
                    logger.debug("Session updated to the following configuration: %s", event)
                    # Check why we updated the session and emit the corresponding event
                    if self._connecting:
                        self._connecting = False
                        await self.emit("on_model_connection", event)
                    if self._updating_system_prompt:
                        self._updating_system_prompt = False
                        await self.emit("on_system_prompt_updated", event)

                # Track agent response state
                elif event_type == "response.created":
                    self._current_response_id = event.get("response", {}).get("id")
                    logger.debug("Agent started responding. Response created with ID: %s", self._current_response_id)

                elif event_type == "response.output_item.added":
                    self._current_item_id = event.get("item", {}).get("id")
                    logger.debug("Agent is responding. Added response item with ID: %s", self._current_item_id)

                elif event_type == "response.done":
                    logger.debug("Agent finished generating a response.")

                # Tool call
                elif event_type == "response.function_call_arguments.done":
                    await self._call_tool(event)

                # Events from VAD related to the user's input
                elif event_type == "input_audio_buffer.speech_started":
                    logger.debug("Speech detected, listening...")

                elif event_type == "input_audio_buffer.speech_stopped":
                    logger.debug("Speech ended.")

                # Relay the event to the parent BotStructure - regardless whether it was processed above or not
                if event_type in self.events_translation:
                    logger.debug(
                        "Translating event type %s to parent's event type %s",
                        event_type,
                        self.events_translation[event_type],
                    )
                    await self.emit(self.events_translation[event_type], event)
                else:
                    logger.debug("Sending native event type %s to parent", event_type)
                    await self.emit(event_type, event)

        except websockets.exceptions.ConnectionClosed:
            logging.info("Connection closed")
        except Exception as e:  # pylint: disable=broad-except
            logging.exception("Error in message handling: %s", str(e))

    async def send(self, data: Dict[str, Any]) -> None:
        """
        Stream raw audio data to the API.

        Args:
            data:
                The data chunk to stream. It should be in the format {"audio": bytes}.
        """
        if "audio_stream" in data:
            await self._send_audio_stream(data["audio_stream"])
        # if "audio_message" in data:
        #     await self._send_audio(data["audio_message"])
        # if "text_message" in data:
        #     await self._send_text_message(data["text_message"])

    async def update_system_prompt(self) -> None:
        """
        Update the system prompt to use in the conversation.
        """
        logger.debug("Setting system prompt to: %s", self.system_prompt)
        logger.debug("Setting tools to: %s", [t.name for t in self.tools])
        await self._update_session(
            {"instructions": self.system_prompt, "tools": [to_openai_tool(t) for t in self.tools.items()]}
        )
        # Flag that we're updating the system prompt and look for this event in the run loop
        self._updating_system_prompt = True

    async def handle_interruption(self, lenght_to_interruption: int) -> None:
        """
        Handle user interruption of the current response.

        Args:
            lenght_to_interruption (int):
                The length in milliseconds of the audio that was played to the user before the interruption.
                May be zero if the interruption happened before any audio was played.
        """
        logging.info("[Handling interruption at %s ms]", lenght_to_interruption)

        # Cancel the current response
        # Cancelling responses is effective when the response is still being generated by the model.
        if self._current_response_id:
            logger.debug("Cancelling response %s due to a user's interruption.", self._current_response_id)
            event = {"type": "response.cancel"}
            await self.ws.send(json.dumps(event))
        else:
            logger.warning("No response ID found to cancel.")

        # Truncate the conversation item to what was actually played
        # Truncating the response is effective when the response has already been generated by the model and is being
        # played out.
        if lenght_to_interruption:
            logger.debug("Truncating the response due to a user's interruption at %s ms", lenght_to_interruption)
            event = {
                "type": "conversation.item.truncate",
                "item_id": self._current_item_id,
                "content_index": 0,
                "audio_end_ms": math.floor(lenght_to_interruption),
            }
            await self.ws.send(json.dumps(event))

    async def _update_session(self, config: Dict[str, Any]) -> None:
        """
        Update session configuration.

        Args:
            config (Dict[str, Any]):
                The new session configuration.
        """
        event = {"type": "session.update", "session": config}
        await self.ws.send(json.dumps(event))

    async def _send_tex_message(self, text: str) -> None:
        """
        Send text message to the API.

        Args:
            text (str):
                The text message to send.
        """
        event = {
            "type": "conversation.item.create",
            "item": {"type": "message", "role": "user", "content": [{"type": "input_text", "text": text}]},
        }
        await self.ws.send(json.dumps(event))
        await self._request_response_from_model()

    async def _send_audio_stream(self, audio_bytes: bytes) -> None:
        audio_b64 = base64.b64encode(audio_bytes).decode()
        append_event = {"type": "input_audio_buffer.append", "audio": audio_b64}
        await self.ws.send(json.dumps(append_event))

    # async def _send_audio_message(self, audio_bytes: bytes) -> None:
    #     """
    #     Send audio data to the API.

    #     Args:
    #         audio_bytes (bytes):
    #             The audio data to send.
    #     """
    #     # Convert audio to required format (24kHz, mono, PCM16)
    #     audio = AudioSegment.from_file(io.BytesIO(audio_bytes))
    #     audio = audio.set_frame_rate(24000).set_channels(1).set_sample_width(2)
    #     pcm_data = base64.b64encode(audio.raw_data).decode()

    #     # Append audio to buffer
    #     append_event = {"type": "input_audio_buffer.append", "audio": pcm_data}
    #     await self.ws.send(json.dumps(append_event))

    #     # Commit the buffer
    #     commit_event = {"type": "input_audio_buffer.commit"}
    #     await self.ws.send(json.dumps(commit_event))

    async def _send_function_result(self, call_id: str, result: Any) -> None:
        """
        Send function call result back to the API.

        Args:
            call_id (str):
                The ID of the function call.
            result (Any):
                The result of the function call.
        """
        event = {
            "type": "conversation.item.create",
            "item": {"type": "function_call_output", "call_id": call_id, "output": result},
        }
        await self.ws.send(json.dumps(event))
        await self._request_response_from_model()

    async def _request_response_from_model(self) -> None:
        """
        Asks the LLM for a response to the messages it just received.
        You need to call this function right after sending a messages that is not streamed like the audio (where the
        model's VAD would decide when to reply instead).
        """
        event = {"type": "response.create", "response": {"modalities": ["text", "audio"]}}
        await self.ws.send(json.dumps(event))

    async def _call_tool(self, event: Dict[str, Any]) -> None:
        """
        Calls the tool requested by the model.

        Args:
            event (Dict[str, Any]):
                The event containing the tool call information.
        """
        call_id = event["call_id"]
        tool_name = event["name"]
        tool_arguments = json.loads(event["arguments"])
        logger.debug("Calling tool %s with arguments %s (call_id: %s)", tool_name, tool_arguments, call_id)

        # Check if it's the router
        if tool_name == self.intent_router.name:
            self.system_prompt, self.tools = await self.intent_router.run(tool_arguments)
            await self.update_system_prompt()
            await self._send_function_result(event["call_id"], "ok")
            return

        # Make sure the tool actually exists
        if tool_name not in self.tools:
            logger.error("Tool %s not found in the list of available tools.", tool_name)
            await self._send_function_result(call_id, f"Error: Tool {tool_name} not found")

        # Invoke the tool and send back the output
        result = await self.tools.get(tool_name).run(tool_arguments)
        logger.debug("Tool %s returned: %s", tool_name, result)
        await self._send_function_result(call_id, str(result))
__init__(parent, intent_router, config) #

A client for interacting with the OpenAI Realtime API that lets you manage the WebSocket connection, send text and audio data, and handle responses and events.

Source code in plugins/intentional-openai/src/intentional_openai/realtime_api.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def __init__(self, parent: Callable, intent_router: IntentRouter, config: Dict[str, Any]):
    """
    A client for interacting with the OpenAI Realtime API that lets you manage the WebSocket connection, send text
    and audio data, and handle responses and events.
    """
    logger.debug("Loading RealtimeAPIClient from config: %s", config)
    super().__init__(parent, intent_router)

    self.model_name = config.get("name")
    if not self.model_name:
        raise ValueError("RealtimeAPIClient requires a 'name' configuration key to know which model to use.")
    if "realtime" not in self.model_name:
        raise ValueError(
            "RealtimeAPIClient requires a 'realtime' model to use the Realtime API. "
            "To use any other OpenAI model, use the OpenAIClient instead."
        )

    self.api_key_name = config.get("api_key_name", "OPENAI_API_KEY")
    if not os.environ.get(self.api_key_name):
        raise ValueError(
            "RealtimeAPIClient requires an API key to authenticate with OpenAI. "
            f"The provided environment variable name ({self.api_key_name}) is not set or is empty."
        )
    self.api_key = os.environ.get(self.api_key_name)
    self.voice = config.get("voice", "alloy")

    # WebSocket connection data
    self.ws = None
    self.base_url = "wss://api.openai.com/v1/realtime"

    # Track current response state
    self._connecting = False
    self._updating_system_prompt = False
    self._current_response_id = None
    self._current_item_id = None

    # Intent routering data
    self.intent_router = intent_router
    self.system_prompt = config.get("system_prompt", self.intent_router.get_prompt())
    self.tools = self.intent_router.current_stage.tools
connect() async #

Establish WebSocket connection with the Realtime API.

Source code in plugins/intentional-openai/src/intentional_openai/realtime_api.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
async def connect(self) -> None:
    """
    Establish WebSocket connection with the Realtime API.
    """
    logger.debug("Initializing websocket connection to OpenAI Realtime API")

    url = f"{self.base_url}?model={self.model_name}"
    headers = {"Authorization": f"Bearer {self.api_key}", "OpenAI-Beta": "realtime=v1"}
    self.ws = await websockets.connect(url, extra_headers=headers)

    await self._update_session(
        {
            "modalities": ["text", "audio"],
            "instructions": self.system_prompt,
            "voice": self.voice,
            "input_audio_format": "pcm16",
            "output_audio_format": "pcm16",
            "input_audio_transcription": {"model": "whisper-1"},
            "turn_detection": {
                "type": "server_vad",
                "threshold": 0.5,
                "prefix_padding_ms": 500,
                "silence_duration_ms": 200,
            },
            "tools": [to_openai_tool(tool) for tool in self.tools.values()],
            "tool_choice": "auto",
            "temperature": 0.8,
        }
    )
    # Flag that we're connecting and look for this event in the run loop
    self._connecting = True
disconnect() async #

Close the WebSocket connection.

Source code in plugins/intentional-openai/src/intentional_openai/realtime_api.py
119
120
121
122
123
124
125
126
127
128
async def disconnect(self) -> None:
    """
    Close the WebSocket connection.
    """
    if self.ws:
        logger.debug("Disconnecting from OpenAI Realtime API")
        await self.ws.close()
    else:
        logger.debug("Attempted disconnection of a OpenAIRealtimeAPIClient that was never connected, nothing done.")
    await self.emit("on_model_disconnection", {})
handle_interruption(lenght_to_interruption) async #

Handle user interruption of the current response.

Parameters:

Name Type Description Default
lenght_to_interruption int

The length in milliseconds of the audio that was played to the user before the interruption. May be zero if the interruption happened before any audio was played.

required
Source code in plugins/intentional-openai/src/intentional_openai/realtime_api.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
async def handle_interruption(self, lenght_to_interruption: int) -> None:
    """
    Handle user interruption of the current response.

    Args:
        lenght_to_interruption (int):
            The length in milliseconds of the audio that was played to the user before the interruption.
            May be zero if the interruption happened before any audio was played.
    """
    logging.info("[Handling interruption at %s ms]", lenght_to_interruption)

    # Cancel the current response
    # Cancelling responses is effective when the response is still being generated by the model.
    if self._current_response_id:
        logger.debug("Cancelling response %s due to a user's interruption.", self._current_response_id)
        event = {"type": "response.cancel"}
        await self.ws.send(json.dumps(event))
    else:
        logger.warning("No response ID found to cancel.")

    # Truncate the conversation item to what was actually played
    # Truncating the response is effective when the response has already been generated by the model and is being
    # played out.
    if lenght_to_interruption:
        logger.debug("Truncating the response due to a user's interruption at %s ms", lenght_to_interruption)
        event = {
            "type": "conversation.item.truncate",
            "item_id": self._current_item_id,
            "content_index": 0,
            "audio_end_ms": math.floor(lenght_to_interruption),
        }
        await self.ws.send(json.dumps(event))
run() async #

Handles events coming from the WebSocket connection.

This method is an infinite loop that listens for messages from the WebSocket connection and processes them accordingly. It also triggers the event handlers for the corresponding event types.

Source code in plugins/intentional-openai/src/intentional_openai/realtime_api.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
async def run(self) -> None:  # pylint: disable=too-many-branches
    """
    Handles events coming from the WebSocket connection.

    This method is an infinite loop that listens for messages from the WebSocket connection and processes them
    accordingly. It also triggers the event handlers for the corresponding event types.
    """
    try:
        async for message in self.ws:
            event = json.loads(message)
            event_type = event.get("type")
            logger.debug("Received event: %s", event_type)

            if event_type == "error":
                logger.error("An error response was returned: %s", event)

            elif event_type == "session.updated":
                logger.debug("Session updated to the following configuration: %s", event)
                # Check why we updated the session and emit the corresponding event
                if self._connecting:
                    self._connecting = False
                    await self.emit("on_model_connection", event)
                if self._updating_system_prompt:
                    self._updating_system_prompt = False
                    await self.emit("on_system_prompt_updated", event)

            # Track agent response state
            elif event_type == "response.created":
                self._current_response_id = event.get("response", {}).get("id")
                logger.debug("Agent started responding. Response created with ID: %s", self._current_response_id)

            elif event_type == "response.output_item.added":
                self._current_item_id = event.get("item", {}).get("id")
                logger.debug("Agent is responding. Added response item with ID: %s", self._current_item_id)

            elif event_type == "response.done":
                logger.debug("Agent finished generating a response.")

            # Tool call
            elif event_type == "response.function_call_arguments.done":
                await self._call_tool(event)

            # Events from VAD related to the user's input
            elif event_type == "input_audio_buffer.speech_started":
                logger.debug("Speech detected, listening...")

            elif event_type == "input_audio_buffer.speech_stopped":
                logger.debug("Speech ended.")

            # Relay the event to the parent BotStructure - regardless whether it was processed above or not
            if event_type in self.events_translation:
                logger.debug(
                    "Translating event type %s to parent's event type %s",
                    event_type,
                    self.events_translation[event_type],
                )
                await self.emit(self.events_translation[event_type], event)
            else:
                logger.debug("Sending native event type %s to parent", event_type)
                await self.emit(event_type, event)

    except websockets.exceptions.ConnectionClosed:
        logging.info("Connection closed")
    except Exception as e:  # pylint: disable=broad-except
        logging.exception("Error in message handling: %s", str(e))
send(data) async #

Stream raw audio data to the API.

Parameters:

Name Type Description Default
data Dict[str, Any]

The data chunk to stream. It should be in the format {"audio": bytes}.

required
Source code in plugins/intentional-openai/src/intentional_openai/realtime_api.py
196
197
198
199
200
201
202
203
204
205
async def send(self, data: Dict[str, Any]) -> None:
    """
    Stream raw audio data to the API.

    Args:
        data:
            The data chunk to stream. It should be in the format {"audio": bytes}.
    """
    if "audio_stream" in data:
        await self._send_audio_stream(data["audio_stream"])
update_system_prompt() async #

Update the system prompt to use in the conversation.

Source code in plugins/intentional-openai/src/intentional_openai/realtime_api.py
211
212
213
214
215
216
217
218
219
220
221
async def update_system_prompt(self) -> None:
    """
    Update the system prompt to use in the conversation.
    """
    logger.debug("Setting system prompt to: %s", self.system_prompt)
    logger.debug("Setting tools to: %s", [t.name for t in self.tools])
    await self._update_session(
        {"instructions": self.system_prompt, "tools": [to_openai_tool(t) for t in self.tools.items()]}
    )
    # Flag that we're updating the system prompt and look for this event in the run loop
    self._updating_system_prompt = True

tools #

Tool utilities to interact with tools in OpenAI.

to_openai_tool(tool) #

The tool definition required by OpenAI.

Source code in plugins/intentional-openai/src/intentional_openai/tools.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def to_openai_tool(tool: Tool):
    """
    The tool definition required by OpenAI.
    """
    return {
        "type": "function",
        "name": tool.name,
        "description": tool.description,
        "parameters": {
            "type": "object",
            "properties": {
                param.name: {"description": param.description, "type": param.type, "default": param.default}
                for param in tool.parameters
            },
            "required": [param.name for param in tool.parameters if param.required],
        },
    }