Skip to content

API Reference - OpenAI#

src #

intentional_openai #

Init file for intentional_openai.

__about__ #

Package descriptors for intentional-openai.

chatcompletion_api #

Client for OpenAI's Chat Completion API.

ChatCompletionAPIClient #

Bases: LLMClient

A client for interacting with the OpenAI Chat Completion API.

Source code in plugins/intentional-openai/src/intentional_openai/chatcompletion_api.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
class ChatCompletionAPIClient(LLMClient):
    """
    A client for interacting with the OpenAI Chat Completion API.
    """

    name: str = "openai"

    def __init__(
        self,
        parent: "BotStructure",
        intent_router: IntentRouter,
        config: Dict[str, Any],
    ):
        """
        A client for interacting with the OpenAI Chat Completion API.

        Args:
            parent: The parent bot structure.
            intent_router: The intent router.
            config: The configuration dictionary.
        """
        log.debug("Loading ChatCompletionAPIClient from config", llm_client_config=config)
        super().__init__(parent, intent_router)

        self.llm_name = config.get("name")
        if not self.llm_name:
            raise ValueError("ChatCompletionAPIClient requires a 'name' configuration key to know which LLM to use.")
        if "realtime" in self.llm_name:
            raise ValueError(
                "ChatCompletionAPIClient doesn't support Realtime API. "
                "To use the Realtime API, use RealtimeAPIClient instead (client: openai_realtime)"
            )

        self.api_key_name = config.get("api_key_name", "OPENAI_API_KEY")
        if not os.environ.get(self.api_key_name):
            raise ValueError(
                "ChatCompletionAPIClient requires an API key to authenticate with OpenAI. "
                f"The provided environment variable name ({self.api_key_name}) is not set or is empty."
            )
        self.api_key = os.environ.get(self.api_key_name)
        self.client = openai.AsyncOpenAI(api_key=self.api_key)
        self.system_prompt = None
        self.tools = None
        self.setup_initial_prompt()
        self.conversation = [{"role": "system", "content": self.system_prompt}]

    def setup_initial_prompt(self) -> None:
        """
        Setup initial prompt and tools. Used also after conversation end to reset the state.
        """
        self.system_prompt = self.intent_router.get_prompt()
        self.tools = self.intent_router.current_stage.tools
        self.conversation: List[Dict[str, Any]] = [{"role": "system", "content": self.system_prompt}]
        log.debug("Initial system prompt set", system_prompt=self.system_prompt)

    async def run(self) -> None:
        """
        Handle events from the LLM by either processing them internally or by translating them into higher-level
        events that the BotStructure class can understand, then re-emitting them.
        """
        log.debug("ChatCompletionAPIClient.run() is no-op for now")

    async def update_system_prompt(self) -> None:
        """
        Update the system prompt in the LLM.
        """
        self.conversation = [{"role": "system", "content": self.system_prompt}] + self.conversation[1:]
        await self.emit("on_system_prompt_updated", {"system_prompt": self.system_prompt})

    async def handle_interruption(self, lenght_to_interruption: int) -> None:
        """
        Handle an interruption while rendering the output to the user.

        Args:
            lenght_to_interruption: The length of the data that was produced to the user before the interruption.
                This value could be number of characters, number of words, milliseconds, number of audio frames, etc.
                depending on the bot structure that implements it.
        """
        log.warning("TODO! Implement handle_interruption in ChatCompletionAPIClient")

    async def send(self, data: Dict[str, Any]) -> None:
        """
        Send a message to the LLM.
        """
        await self.emit("on_llm_starts_generating_response", {})

        # Generate a response
        message = data["text_message"]
        response = await self._send_message(message)

        # Unwrap the response to make sure it contains no function calls to handle
        call_id = ""
        function_name = ""
        function_args = ""
        assistant_response = ""
        async for r in response:
            if not call_id:
                call_id = r.to_dict()["id"]
            delta = r.to_dict()["choices"][0]["delta"]

            if "tool_calls" not in delta:
                # If this is not a function call, just stream out
                await self.emit("on_text_message_from_llm", {"delta": delta.get("content")})
                assistant_response += delta.get("content") or ""
            else:
                # TODO handle multiple parallel function calls
                if delta["tool_calls"][0]["index"] > 0 or len(delta["tool_calls"]) > 1:
                    log.error("TODO: Multiple parallel function calls not supported yet. Please open an issue.")
                    log.debug("Multiple parallel function calls", delta=delta)
                # Consume the response to understand which tool to call with which parameters
                for tool_call in delta["tool_calls"]:
                    if not function_name:
                        function_name = tool_call["function"].get("name")
                    function_args += tool_call["function"]["arguments"]

        if not function_name:
            # If there was no function call, update the conversation history and return
            self.conversation.append(message)
            self.conversation.append({"role": "assistant", "content": assistant_response})
        else:
            # Otherwise deal with the function call
            await self._handle_function_call(message, call_id, function_name, function_args)

        await self.emit("on_llm_stops_generating_response", {})

    async def _send_message(self, message: Dict[str, Any]) -> AsyncGenerator[Dict[str, Any], None]:
        """
        Generate a response to a message.

        Args:
            message: The message to respond to.
        """
        return await self.client.chat.completions.create(
            model=self.llm_name,
            messages=self.conversation + [message],
            stream=True,
            tools=[{"type": "function", "function": to_openai_tool(t)} for t in self.tools.values()],
            tool_choice="auto",
            n=1,
        )

    async def _handle_function_call(
        self,
        message: Dict[str, Any],
        call_id: str,
        function_name: str,
        function_args: str,
    ):
        """
        Handle a function call from the LLM.
        """
        log.debug(
            "Function call detected",
            function_name=function_name,
            function_args=function_args,
        )
        function_args = json.loads(function_args)

        # Routing function call - this is special because it should not be recorded in the conversation history
        if function_name == self.intent_router.name:
            await self._route(function_args)
            # Send the same message again with the new system prompt and no trace of the routing call.
            # We don't append the user message to the history in order to avoid message duplication.
            await self.send({"text_message": message})

        # Check if the conversation should end
        elif function_name == EndConversationTool.name:
            await self.tools[EndConversationTool.name].run()
            self.setup_initial_prompt()
            await self.emit("on_conversation_ended", {})

        else:
            # Handle a regular function call - this one shows up in the history as normal
            # so we start by appending the user message
            self.conversation.append(message)
            output = await self._call_tool(call_id, function_name, function_args)
            await self.send(
                {
                    "text_message": {
                        "role": "tool",
                        "content": json.dumps(output),
                        "tool_call_id": call_id,
                    }
                }
            )

    async def _route(self, routing_info: Dict[str, Any]) -> None:
        """
        Runs the router to determine the next system prompt and tools to use.
        """
        self.system_prompt, self.tools = await self.intent_router.run(routing_info)
        await self.update_system_prompt()
        log.debug("System prompt updated", system_prompt=self.system_prompt)
        log.debug("Tools updated", tools=self.tools)

    async def _call_tool(self, call_id, function_name, function_args):
        """
        Call a tool with the given arguments.

        Args:
            call_id: The ID of the tool call.
            function_name: The name of the tool function to call.
            function_args: The arguments to pass to the tool
        """
        await self.emit("on_tool_invoked", {"name": function_name, "args": function_args})

        # Record the tool invocation in the conversation
        self.conversation.append(
            {
                "role": "assistant",
                "tool_calls": [
                    {
                        "id": call_id,
                        "type": "function",
                        "function": {
                            "arguments": json.dumps(function_args),
                            "name": function_name,
                        },
                    }
                ],
            }
        )

        # Get the tool output
        if function_name not in self.tools:
            log.debug("The LLM called a non-existing tool.", tool=function_name)
            output = f"Tool '{function_name}' not found."
        else:
            log.debug("Calling tool", function_name=function_name, function_args=function_args)
            output = await self.tools[function_name].run(function_args)
        log.debug("Tool run", tool_output=output)
        return output
__init__(parent, intent_router, config) #

A client for interacting with the OpenAI Chat Completion API.

Parameters:

Name Type Description Default
parent BotStructure

The parent bot structure.

required
intent_router IntentRouter

The intent router.

required
config Dict[str, Any]

The configuration dictionary.

required
Source code in plugins/intentional-openai/src/intentional_openai/chatcompletion_api.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def __init__(
    self,
    parent: "BotStructure",
    intent_router: IntentRouter,
    config: Dict[str, Any],
):
    """
    A client for interacting with the OpenAI Chat Completion API.

    Args:
        parent: The parent bot structure.
        intent_router: The intent router.
        config: The configuration dictionary.
    """
    log.debug("Loading ChatCompletionAPIClient from config", llm_client_config=config)
    super().__init__(parent, intent_router)

    self.llm_name = config.get("name")
    if not self.llm_name:
        raise ValueError("ChatCompletionAPIClient requires a 'name' configuration key to know which LLM to use.")
    if "realtime" in self.llm_name:
        raise ValueError(
            "ChatCompletionAPIClient doesn't support Realtime API. "
            "To use the Realtime API, use RealtimeAPIClient instead (client: openai_realtime)"
        )

    self.api_key_name = config.get("api_key_name", "OPENAI_API_KEY")
    if not os.environ.get(self.api_key_name):
        raise ValueError(
            "ChatCompletionAPIClient requires an API key to authenticate with OpenAI. "
            f"The provided environment variable name ({self.api_key_name}) is not set or is empty."
        )
    self.api_key = os.environ.get(self.api_key_name)
    self.client = openai.AsyncOpenAI(api_key=self.api_key)
    self.system_prompt = None
    self.tools = None
    self.setup_initial_prompt()
    self.conversation = [{"role": "system", "content": self.system_prompt}]
handle_interruption(lenght_to_interruption) async #

Handle an interruption while rendering the output to the user.

Parameters:

Name Type Description Default
lenght_to_interruption int

The length of the data that was produced to the user before the interruption. This value could be number of characters, number of words, milliseconds, number of audio frames, etc. depending on the bot structure that implements it.

required
Source code in plugins/intentional-openai/src/intentional_openai/chatcompletion_api.py
 96
 97
 98
 99
100
101
102
103
104
105
async def handle_interruption(self, lenght_to_interruption: int) -> None:
    """
    Handle an interruption while rendering the output to the user.

    Args:
        lenght_to_interruption: The length of the data that was produced to the user before the interruption.
            This value could be number of characters, number of words, milliseconds, number of audio frames, etc.
            depending on the bot structure that implements it.
    """
    log.warning("TODO! Implement handle_interruption in ChatCompletionAPIClient")
run() async #

Handle events from the LLM by either processing them internally or by translating them into higher-level events that the BotStructure class can understand, then re-emitting them.

Source code in plugins/intentional-openai/src/intentional_openai/chatcompletion_api.py
82
83
84
85
86
87
async def run(self) -> None:
    """
    Handle events from the LLM by either processing them internally or by translating them into higher-level
    events that the BotStructure class can understand, then re-emitting them.
    """
    log.debug("ChatCompletionAPIClient.run() is no-op for now")
send(data) async #

Send a message to the LLM.

Source code in plugins/intentional-openai/src/intentional_openai/chatcompletion_api.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
async def send(self, data: Dict[str, Any]) -> None:
    """
    Send a message to the LLM.
    """
    await self.emit("on_llm_starts_generating_response", {})

    # Generate a response
    message = data["text_message"]
    response = await self._send_message(message)

    # Unwrap the response to make sure it contains no function calls to handle
    call_id = ""
    function_name = ""
    function_args = ""
    assistant_response = ""
    async for r in response:
        if not call_id:
            call_id = r.to_dict()["id"]
        delta = r.to_dict()["choices"][0]["delta"]

        if "tool_calls" not in delta:
            # If this is not a function call, just stream out
            await self.emit("on_text_message_from_llm", {"delta": delta.get("content")})
            assistant_response += delta.get("content") or ""
        else:
            # TODO handle multiple parallel function calls
            if delta["tool_calls"][0]["index"] > 0 or len(delta["tool_calls"]) > 1:
                log.error("TODO: Multiple parallel function calls not supported yet. Please open an issue.")
                log.debug("Multiple parallel function calls", delta=delta)
            # Consume the response to understand which tool to call with which parameters
            for tool_call in delta["tool_calls"]:
                if not function_name:
                    function_name = tool_call["function"].get("name")
                function_args += tool_call["function"]["arguments"]

    if not function_name:
        # If there was no function call, update the conversation history and return
        self.conversation.append(message)
        self.conversation.append({"role": "assistant", "content": assistant_response})
    else:
        # Otherwise deal with the function call
        await self._handle_function_call(message, call_id, function_name, function_args)

    await self.emit("on_llm_stops_generating_response", {})
setup_initial_prompt() #

Setup initial prompt and tools. Used also after conversation end to reset the state.

Source code in plugins/intentional-openai/src/intentional_openai/chatcompletion_api.py
73
74
75
76
77
78
79
80
def setup_initial_prompt(self) -> None:
    """
    Setup initial prompt and tools. Used also after conversation end to reset the state.
    """
    self.system_prompt = self.intent_router.get_prompt()
    self.tools = self.intent_router.current_stage.tools
    self.conversation: List[Dict[str, Any]] = [{"role": "system", "content": self.system_prompt}]
    log.debug("Initial system prompt set", system_prompt=self.system_prompt)
update_system_prompt() async #

Update the system prompt in the LLM.

Source code in plugins/intentional-openai/src/intentional_openai/chatcompletion_api.py
89
90
91
92
93
94
async def update_system_prompt(self) -> None:
    """
    Update the system prompt in the LLM.
    """
    self.conversation = [{"role": "system", "content": self.system_prompt}] + self.conversation[1:]
    await self.emit("on_system_prompt_updated", {"system_prompt": self.system_prompt})

realtime_api #

Client for OpenAI's Realtime API.

RealtimeAPIClient #

Bases: LLMClient

A client for interacting with the OpenAI Realtime API that lets you manage the WebSocket connection, send text and audio data, and handle responses and events.

Source code in plugins/intentional-openai/src/intentional_openai/realtime_api.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
class RealtimeAPIClient(LLMClient):
    """
    A client for interacting with the OpenAI Realtime API that lets you manage the WebSocket connection, send text and
    audio data, and handle responses and events.
    """

    name = "openai_realtime"

    events_translation = {
        "error": "on_error",
        "response.text.delta": "on_text_message_from_llm",
        "response.audio.delta": "on_audio_message_from_llm",
        "response.created": "on_llm_starts_generating_response",
        "response.done": "on_llm_stops_generating_response",
        "input_audio_buffer.speech_started": "on_user_speech_started",
        "input_audio_buffer.speech_stopped": "on_user_speech_ended",
        "conversation.item.input_audio_transcription.completed": "on_user_speech_transcribed",
        "response.audio_transcript.done": "on_llm_speech_transcribed",
    }

    def __init__(self, parent: Callable, intent_router: IntentRouter, config: Dict[str, Any]):
        """
        A client for interacting with the OpenAI Realtime API that lets you manage the WebSocket connection, send text
        and audio data, and handle responses and events.
        """
        log.debug("Loading %s from config", self.__class__.__name__, llm_client_config=config)
        super().__init__(parent, intent_router)

        self.llm_name = config.get("name")
        if not self.llm_name:
            raise ValueError("RealtimeAPIClient requires a 'name' configuration key to know which LLM to use.")
        if "realtime" not in self.llm_name:
            raise ValueError(
                "RealtimeAPIClient requires a 'realtime' LLM to use the Realtime API. "
                "To use any other OpenAI LLM, use the OpenAIClient instead."
            )

        self.api_key_name = config.get("api_key_name", "OPENAI_API_KEY")
        if not os.environ.get(self.api_key_name):
            raise ValueError(
                "RealtimeAPIClient requires an API key to authenticate with OpenAI. "
                f"The provided environment variable name ({self.api_key_name}) is not set or is empty."
            )
        self.api_key = os.environ.get(self.api_key_name)
        self.voice = config.get("voice", "alloy")

        # WebSocket connection data
        self.ws = None
        self.base_url = "wss://api.openai.com/v1/realtime"

        # Track current response state
        self._connecting = False
        self._updating_system_prompt = False
        self._current_response_id = None
        self._current_item_id = None

        # Intent routering data
        self.intent_router = intent_router
        self.system_prompt = None
        self.tools = None
        self.setup_initial_prompt()

    def setup_initial_prompt(self) -> None:
        """
        Setup initial prompt and tools. Used also after conversation end to reset the state.
        """
        self.system_prompt = self.intent_router.get_prompt()
        self.tools = self.intent_router.current_stage.tools

    async def connect(self) -> None:
        """
        Establish WebSocket connection with the Realtime API.
        """
        log.debug("Initializing websocket connection to OpenAI Realtime API")

        url = f"{self.base_url}?model={self.llm_name}"
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "OpenAI-Beta": "realtime=v1",
        }
        self.ws = await websockets.connect(url, extra_headers=headers)

        await self._update_session(
            {
                "modalities": ["text", "audio"],
                "instructions": self.system_prompt,
                "voice": self.voice,
                "input_audio_format": "pcm16",
                "output_audio_format": "pcm16",
                "input_audio_transcription": {"model": "whisper-1"},
                "turn_detection": {
                    "type": "server_vad",
                    "threshold": 0.5,
                    "prefix_padding_ms": 500,
                    "silence_duration_ms": 200,
                },
                "tools": [to_openai_tool(tool) for tool in self.tools.values()],
                "tool_choice": "auto",
                "temperature": 0.8,
            }
        )
        # Flag that we're connecting and look for this event in the run loop
        self._connecting = True

    async def disconnect(self) -> None:
        """
        Close the WebSocket connection.
        """
        if self.ws:
            log.debug("Disconnecting from OpenAI Realtime API")
            await self.ws.close()
        else:
            log.debug("Attempted disconnection of a OpenAIRealtimeAPIClient that was never connected, nothing done.")
        await self.emit("on_llm_disconnection", {})

    async def run(self) -> None:  # pylint: disable=too-many-branches, too-many-statements
        """
        Handles events coming from the WebSocket connection.

        This method is an infinite loop that listens for messages from the WebSocket connection and processes them
        accordingly. It also triggers the event handlers for the corresponding event types.
        """
        try:
            async for message in self.ws:
                event = json.loads(message)
                event_name = event.get("type")
                log.debug("Received event", event_name=event_name)

                # Handle errors
                if event_name == "error":
                    log.error("An error response was returned", event_data=event)
                elif event_name == "conversation.item.input_audio_transcription.failed":
                    log.error("An error happened during transcription", event_data=event)

                elif event_name == "session.updated":
                    log.debug("Session configuration updated", event_data=event)
                    # Check why we updated the session and emit the corresponding event
                    if self._connecting:
                        self._connecting = False
                        await self.emit("on_llm_connection", event)
                    if self._updating_system_prompt:
                        self._updating_system_prompt = False
                        await self.emit(
                            "on_system_prompt_updated",
                            {"system_prompt": event["session"]["instructions"]},
                        )

                # Track agent response state
                elif event_name == "response.created":
                    self._current_response_id = event.get("response", {}).get("id")
                    log.debug(
                        "Agent started responding. Response created.",
                        response_id=self._current_response_id,
                    )

                elif event_name == "response.output_item.added":
                    self._current_item_id = event.get("item", {}).get("id")
                    log.debug(
                        "Agent is responding. Added response item.",
                        response_id=self._current_item_id,
                    )

                elif event_name == "response.done":
                    log.debug(
                        "Agent finished generating a response.",
                        response_id=self._current_item_id,
                    )

                # Tool call
                elif event_name == "response.function_call_arguments.done":
                    await self._call_tool(event)

                # Events from VAD related to the user's input
                elif event_name == "input_audio_buffer.speech_started":
                    log.debug("Speech detected.")

                elif event_name == "input_audio_buffer.speech_stopped":
                    log.debug("Speech ended.")

                # Decode the audio from base64
                elif event_name == "response.audio.delta":
                    audio_b64 = event.get("delta", "")
                    audio_bytes = base64.b64decode(audio_b64)
                    event["delta"] = audio_bytes

                # Relay the event to the parent BotStructure - regardless whether it was processed above or not
                if event_name in self.events_translation:
                    log.debug(
                        "Translating event",
                        old_event_name=event_name,
                        new_event_name=self.events_translation[event_name],
                    )
                    event["type"] = self.events_translation[event_name]
                    await self.emit(self.events_translation[event_name], event)
                else:
                    log.debug("Sending native event to parent", event_name=event_name)
                    await self.emit(event_name, event)

        except websockets.exceptions.ConnectionClosedOK:
            await asyncio.sleep(1)
            log.warning("Connection closed.")
            return

        except Exception:  # pylint: disable=broad-except
            await asyncio.sleep(1)
            log.exception("Error in message handling")
            return

        log.debug(".run() exited without errors.")

    async def send(self, data: Dict[str, Any]) -> None:
        """
        Stream data to the API.

        Args:
            data:
                The data chunk to stream. It should be in the format {"audio": bytes}.
        """
        if "audio_chunk" in data:
            await self._send_audio_stream(data["audio_chunk"])
        # if "audio_message" in data:
        #     await self._send_audio(data["audio_message"])
        # if "text_message" in data:
        #     await self._send_text_message(data["text_message"])

    async def update_system_prompt(self) -> None:
        """
        Update the system prompt to use in the conversation.
        """
        log.debug("Setting new system prompt", system_prompt=self.system_prompt)
        log.debug("Setting new tools", tools=list(self.tools.keys()))
        await self._update_session(
            {
                "instructions": self.system_prompt,
                "tools": [to_openai_tool(t) for t in self.tools.values()],
            }
        )
        # Flag that we're updating the system prompt and look for this event in the run loop
        self._updating_system_prompt = True

    async def handle_interruption(self, lenght_to_interruption: int) -> None:
        """
        Handle user interruption of the current response.

        Args:
            lenght_to_interruption (int):
                The length in milliseconds of the audio that was played to the user before the interruption.
                May be zero if the interruption happened before any audio was played.
        """
        log.info(
            "[Handling interruption at %s ms]",
            lenght_to_interruption,
            interruption_time=lenght_to_interruption,
        )

        # Cancel the current response
        # Cancelling responses is effective when the response is still being generated by the LLM.
        if self._current_response_id:
            log.debug(
                "Cancelling response due to a user's interruption.",
                response_id=self._current_response_id,
            )
            event = {"type": "response.cancel"}
            await self.ws.send(json.dumps(event))
        else:
            log.warning("No response ID found to cancel.")

        # Truncate the conversation item to what was actually played
        # Truncating the response is effective when the response has already been generated by the LLM and is being
        # played out.
        if lenght_to_interruption:
            log.debug(
                "Truncating the response due to a user's interruption at %s ms",
                lenght_to_interruption,
                interruption_time=lenght_to_interruption,
            )
            event = {
                "type": "conversation.item.truncate",
                "item_id": self._current_item_id,
                "content_index": 0,
                "audio_end_ms": math.floor(lenght_to_interruption),
            }
            await self.ws.send(json.dumps(event))

    async def _update_session(self, config: Dict[str, Any]) -> None:
        """
        Update session configuration.

        Args:
            config (Dict[str, Any]):
                The new session configuration.
        """
        event = {"type": "session.update", "session": config}
        await self.ws.send(json.dumps(event))

    async def _send_text_message(self, text: str) -> None:
        """
        Send text message to the API.

        Args:
            text (str):
                The text message to send.
        """
        event = {
            "type": "conversation.item.create",
            "item": {
                "type": "message",
                "role": "user",
                "content": [{"type": "input_text", "text": text}],
            },
        }
        await self.ws.send(json.dumps(event))
        await self._request_response_from_llm()

    async def _send_audio_stream(self, audio_bytes: bytes) -> None:
        audio_b64 = base64.b64encode(audio_bytes).decode()
        append_event = {"type": "input_audio_buffer.append", "audio": audio_b64}
        await self.ws.send(json.dumps(append_event))

    # async def _send_audio_message(self, audio_bytes: bytes) -> None:
    #     """
    #     Send audio data to the API.

    #     Args:
    #         audio_bytes (bytes):
    #             The audio data to send.
    #     """
    #     # Convert audio to required format (24kHz, mono, PCM16)
    #     audio = AudioSegment.from_file(io.BytesIO(audio_bytes))
    #     audio = audio.set_frame_rate(24000).set_channels(1).set_sample_width(2)
    #     pcm_data = base64.b64encode(audio.raw_data).decode()

    #     # Append audio to buffer
    #     append_event = {"type": "input_audio_buffer.append", "audio": pcm_data}
    #     await self.ws.send(json.dumps(append_event))

    #     # Commit the buffer
    #     commit_event = {"type": "input_audio_buffer.commit"}
    #     await self.ws.send(json.dumps(commit_event))

    async def _send_function_result(self, call_id: str, result: Any) -> None:
        """
        Send function call result back to the API.

        Args:
            call_id (str):
                The ID of the function call.
            result (Any):
                The result of the function call.
        """
        event = {
            "type": "conversation.item.create",
            "item": {
                "type": "function_call_output",
                "call_id": call_id,
                "output": result,
            },
        }
        await self.ws.send(json.dumps(event))
        await self._request_response_from_llm()

    async def _request_response_from_llm(self) -> None:
        """
        Asks the LLM for a response to the messages it just received.
        You need to call this function right after sending a messages that is not streamed like the audio (where the
        LLM's VAD would decide when to reply instead).
        """
        event = {
            "type": "response.create",
            "response": {"modalities": ["text", "audio"]},
        }
        await self.ws.send(json.dumps(event))

    async def _call_tool(self, event: Dict[str, Any]) -> None:
        """
        Calls the tool requested by the LLM.

        Args:
            event (Dict[str, Any]):
                The event containing the tool call information.
        """
        call_id = event["call_id"]
        tool_name = event["name"]
        tool_arguments = json.loads(event["arguments"])
        log.debug(
            "Calling tool",
            tool_name=tool_name,
            tool_arguments=tool_arguments,
            call_id=call_id,
        )

        # Check if it's the router
        if tool_name == self.intent_router.name:
            self.system_prompt, self.tools = await self.intent_router.run(tool_arguments)
            await self.update_system_prompt()
            await self._send_function_result(event["call_id"], "ok")
            return

        # Check if the conversation should end
        if tool_name == EndConversationTool.name:
            await self.tools[EndConversationTool.name].run()
            # await self.disconnect()
            # self.setup_initial_prompt()
            await self.emit("on_conversation_ended", {})
            # await self.connect()
            return

        # Emit the event
        self.emit("on_tool_invoked", {"name": tool_name, "args": tool_arguments})

        # Make sure the tool actually exists
        if tool_name not in self.tools:
            log.error("Tool '%s' not found in the list of available tools.", tool_name)
            await self._send_function_result(call_id, f"Error: Tool {tool_name} not found")

        # Invoke the tool and send back the output
        result = await self.tools.get(tool_name).run(tool_arguments)
        log.debug("Tool run", tool_name=tool_name, tool_output=result)
        await self._send_function_result(call_id, str(result))
__init__(parent, intent_router, config) #

A client for interacting with the OpenAI Realtime API that lets you manage the WebSocket connection, send text and audio data, and handle responses and events.

Source code in plugins/intentional-openai/src/intentional_openai/realtime_api.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def __init__(self, parent: Callable, intent_router: IntentRouter, config: Dict[str, Any]):
    """
    A client for interacting with the OpenAI Realtime API that lets you manage the WebSocket connection, send text
    and audio data, and handle responses and events.
    """
    log.debug("Loading %s from config", self.__class__.__name__, llm_client_config=config)
    super().__init__(parent, intent_router)

    self.llm_name = config.get("name")
    if not self.llm_name:
        raise ValueError("RealtimeAPIClient requires a 'name' configuration key to know which LLM to use.")
    if "realtime" not in self.llm_name:
        raise ValueError(
            "RealtimeAPIClient requires a 'realtime' LLM to use the Realtime API. "
            "To use any other OpenAI LLM, use the OpenAIClient instead."
        )

    self.api_key_name = config.get("api_key_name", "OPENAI_API_KEY")
    if not os.environ.get(self.api_key_name):
        raise ValueError(
            "RealtimeAPIClient requires an API key to authenticate with OpenAI. "
            f"The provided environment variable name ({self.api_key_name}) is not set or is empty."
        )
    self.api_key = os.environ.get(self.api_key_name)
    self.voice = config.get("voice", "alloy")

    # WebSocket connection data
    self.ws = None
    self.base_url = "wss://api.openai.com/v1/realtime"

    # Track current response state
    self._connecting = False
    self._updating_system_prompt = False
    self._current_response_id = None
    self._current_item_id = None

    # Intent routering data
    self.intent_router = intent_router
    self.system_prompt = None
    self.tools = None
    self.setup_initial_prompt()
connect() async #

Establish WebSocket connection with the Realtime API.

Source code in plugins/intentional-openai/src/intentional_openai/realtime_api.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
async def connect(self) -> None:
    """
    Establish WebSocket connection with the Realtime API.
    """
    log.debug("Initializing websocket connection to OpenAI Realtime API")

    url = f"{self.base_url}?model={self.llm_name}"
    headers = {
        "Authorization": f"Bearer {self.api_key}",
        "OpenAI-Beta": "realtime=v1",
    }
    self.ws = await websockets.connect(url, extra_headers=headers)

    await self._update_session(
        {
            "modalities": ["text", "audio"],
            "instructions": self.system_prompt,
            "voice": self.voice,
            "input_audio_format": "pcm16",
            "output_audio_format": "pcm16",
            "input_audio_transcription": {"model": "whisper-1"},
            "turn_detection": {
                "type": "server_vad",
                "threshold": 0.5,
                "prefix_padding_ms": 500,
                "silence_duration_ms": 200,
            },
            "tools": [to_openai_tool(tool) for tool in self.tools.values()],
            "tool_choice": "auto",
            "temperature": 0.8,
        }
    )
    # Flag that we're connecting and look for this event in the run loop
    self._connecting = True
disconnect() async #

Close the WebSocket connection.

Source code in plugins/intentional-openai/src/intentional_openai/realtime_api.py
135
136
137
138
139
140
141
142
143
144
async def disconnect(self) -> None:
    """
    Close the WebSocket connection.
    """
    if self.ws:
        log.debug("Disconnecting from OpenAI Realtime API")
        await self.ws.close()
    else:
        log.debug("Attempted disconnection of a OpenAIRealtimeAPIClient that was never connected, nothing done.")
    await self.emit("on_llm_disconnection", {})
handle_interruption(lenght_to_interruption) async #

Handle user interruption of the current response.

Parameters:

Name Type Description Default
lenght_to_interruption int

The length in milliseconds of the audio that was played to the user before the interruption. May be zero if the interruption happened before any audio was played.

required
Source code in plugins/intentional-openai/src/intentional_openai/realtime_api.py
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
async def handle_interruption(self, lenght_to_interruption: int) -> None:
    """
    Handle user interruption of the current response.

    Args:
        lenght_to_interruption (int):
            The length in milliseconds of the audio that was played to the user before the interruption.
            May be zero if the interruption happened before any audio was played.
    """
    log.info(
        "[Handling interruption at %s ms]",
        lenght_to_interruption,
        interruption_time=lenght_to_interruption,
    )

    # Cancel the current response
    # Cancelling responses is effective when the response is still being generated by the LLM.
    if self._current_response_id:
        log.debug(
            "Cancelling response due to a user's interruption.",
            response_id=self._current_response_id,
        )
        event = {"type": "response.cancel"}
        await self.ws.send(json.dumps(event))
    else:
        log.warning("No response ID found to cancel.")

    # Truncate the conversation item to what was actually played
    # Truncating the response is effective when the response has already been generated by the LLM and is being
    # played out.
    if lenght_to_interruption:
        log.debug(
            "Truncating the response due to a user's interruption at %s ms",
            lenght_to_interruption,
            interruption_time=lenght_to_interruption,
        )
        event = {
            "type": "conversation.item.truncate",
            "item_id": self._current_item_id,
            "content_index": 0,
            "audio_end_ms": math.floor(lenght_to_interruption),
        }
        await self.ws.send(json.dumps(event))
run() async #

Handles events coming from the WebSocket connection.

This method is an infinite loop that listens for messages from the WebSocket connection and processes them accordingly. It also triggers the event handlers for the corresponding event types.

Source code in plugins/intentional-openai/src/intentional_openai/realtime_api.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
async def run(self) -> None:  # pylint: disable=too-many-branches, too-many-statements
    """
    Handles events coming from the WebSocket connection.

    This method is an infinite loop that listens for messages from the WebSocket connection and processes them
    accordingly. It also triggers the event handlers for the corresponding event types.
    """
    try:
        async for message in self.ws:
            event = json.loads(message)
            event_name = event.get("type")
            log.debug("Received event", event_name=event_name)

            # Handle errors
            if event_name == "error":
                log.error("An error response was returned", event_data=event)
            elif event_name == "conversation.item.input_audio_transcription.failed":
                log.error("An error happened during transcription", event_data=event)

            elif event_name == "session.updated":
                log.debug("Session configuration updated", event_data=event)
                # Check why we updated the session and emit the corresponding event
                if self._connecting:
                    self._connecting = False
                    await self.emit("on_llm_connection", event)
                if self._updating_system_prompt:
                    self._updating_system_prompt = False
                    await self.emit(
                        "on_system_prompt_updated",
                        {"system_prompt": event["session"]["instructions"]},
                    )

            # Track agent response state
            elif event_name == "response.created":
                self._current_response_id = event.get("response", {}).get("id")
                log.debug(
                    "Agent started responding. Response created.",
                    response_id=self._current_response_id,
                )

            elif event_name == "response.output_item.added":
                self._current_item_id = event.get("item", {}).get("id")
                log.debug(
                    "Agent is responding. Added response item.",
                    response_id=self._current_item_id,
                )

            elif event_name == "response.done":
                log.debug(
                    "Agent finished generating a response.",
                    response_id=self._current_item_id,
                )

            # Tool call
            elif event_name == "response.function_call_arguments.done":
                await self._call_tool(event)

            # Events from VAD related to the user's input
            elif event_name == "input_audio_buffer.speech_started":
                log.debug("Speech detected.")

            elif event_name == "input_audio_buffer.speech_stopped":
                log.debug("Speech ended.")

            # Decode the audio from base64
            elif event_name == "response.audio.delta":
                audio_b64 = event.get("delta", "")
                audio_bytes = base64.b64decode(audio_b64)
                event["delta"] = audio_bytes

            # Relay the event to the parent BotStructure - regardless whether it was processed above or not
            if event_name in self.events_translation:
                log.debug(
                    "Translating event",
                    old_event_name=event_name,
                    new_event_name=self.events_translation[event_name],
                )
                event["type"] = self.events_translation[event_name]
                await self.emit(self.events_translation[event_name], event)
            else:
                log.debug("Sending native event to parent", event_name=event_name)
                await self.emit(event_name, event)

    except websockets.exceptions.ConnectionClosedOK:
        await asyncio.sleep(1)
        log.warning("Connection closed.")
        return

    except Exception:  # pylint: disable=broad-except
        await asyncio.sleep(1)
        log.exception("Error in message handling")
        return

    log.debug(".run() exited without errors.")
send(data) async #

Stream data to the API.

Parameters:

Name Type Description Default
data Dict[str, Any]

The data chunk to stream. It should be in the format {"audio": bytes}.

required
Source code in plugins/intentional-openai/src/intentional_openai/realtime_api.py
241
242
243
244
245
246
247
248
249
250
async def send(self, data: Dict[str, Any]) -> None:
    """
    Stream data to the API.

    Args:
        data:
            The data chunk to stream. It should be in the format {"audio": bytes}.
    """
    if "audio_chunk" in data:
        await self._send_audio_stream(data["audio_chunk"])
setup_initial_prompt() #

Setup initial prompt and tools. Used also after conversation end to reset the state.

Source code in plugins/intentional-openai/src/intentional_openai/realtime_api.py
93
94
95
96
97
98
def setup_initial_prompt(self) -> None:
    """
    Setup initial prompt and tools. Used also after conversation end to reset the state.
    """
    self.system_prompt = self.intent_router.get_prompt()
    self.tools = self.intent_router.current_stage.tools
update_system_prompt() async #

Update the system prompt to use in the conversation.

Source code in plugins/intentional-openai/src/intentional_openai/realtime_api.py
256
257
258
259
260
261
262
263
264
265
266
267
268
269
async def update_system_prompt(self) -> None:
    """
    Update the system prompt to use in the conversation.
    """
    log.debug("Setting new system prompt", system_prompt=self.system_prompt)
    log.debug("Setting new tools", tools=list(self.tools.keys()))
    await self._update_session(
        {
            "instructions": self.system_prompt,
            "tools": [to_openai_tool(t) for t in self.tools.values()],
        }
    )
    # Flag that we're updating the system prompt and look for this event in the run loop
    self._updating_system_prompt = True

tools #

Tool utilities to interact with tools in OpenAI.

to_openai_tool(tool) #

The tool definition required by OpenAI.

Source code in plugins/intentional-openai/src/intentional_openai/tools.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def to_openai_tool(tool: Tool):
    """
    The tool definition required by OpenAI.
    """
    return {
        "type": "function",
        "name": tool.name,
        "description": tool.description,
        "parameters": {
            "type": "object",
            "properties": {
                param.name: {
                    "description": param.description,
                    "type": param.type,
                    "default": param.default,
                }
                for param in tool.parameters
            },
            "required": [param.name for param in tool.parameters if param.required],
        },
    }