Skip to content

API Reference - Local CLI output#

src #

intentional_local #

Init file for intentional_local.

__about__ #

Package descriptors for intentional-local.

bot_interface #

Local bot interface for Intentional.

LocalBotInterface #

Bases: BotInterface

Bot that uses the local command line interface to interact with the user.

Source code in plugins/intentional-local/src/intentional_local/bot_interface.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
class LocalBotInterface(BotInterface):
    """
    Bot that uses the local command line interface to interact with the user.
    """

    name = "local"

    def __init__(self, intent_router: IntentRouter, config: Dict[str, Any]):
        # Init the structure
        bot_structure_config = config.pop("bot", None)
        if not bot_structure_config:
            raise ValueError("LocalBotInterface requires a 'bot' configuration key to know how to structure the bot.")
        logger.debug("Creating bot structure of type '%s'", bot_structure_config)
        self.bot: BotStructure = load_bot_structure_from_dict(intent_router, bot_structure_config)

        # Check the modality
        self.modality = config.pop("modality")
        logger.debug("Modality for LocalBotInterface is set to: %s", self.modality)

        self.audio_handler = None
        self.input_handler = None

    async def run(self) -> None:
        """
        Chooses the specific loop to use for this combination of bot and modality and kicks it off.
        """
        if isinstance(self.bot, ContinuousStreamBotStructure):
            if self.modality == "audio_stream":
                await self._run_audio_stream(self.bot)
            else:
                raise ValueError(
                    f"Modality '{self.modality}' is not yet supported for '{self.bot.name}' bots."
                    "These are the supported modalities: 'audio_stream'."
                )

        if isinstance(self.bot, TurnBasedBotStructure):
            if self.modality == "text_turns":
                await self._run_text_turns(self.bot)
            else:
                raise ValueError(
                    f"Modality '{self.modality}' is not yet supported for '{self.bot.name}' bots."
                    "These are the supported modalities: 'text_turns'."
                )

    async def _run_text_turns(self, bot: TurnBasedBotStructure) -> None:
        """
        Runs the CLI interface for the text turns modality.
        """
        logger.debug("Running the LocalBotInterface in text turns mode.")
        bot.add_event_handler("on_text_message_from_model", self.handle_text_messages)
        bot.add_event_handler("on_model_starts_generating_response", self.handle_start_text_response)
        bot.add_event_handler("on_model_stops_generating_response", self.handle_finish_text_response)
        bot.add_event_handler("on_model_connection", self.handle_model_connection)

        await bot.connect()

    async def _run_audio_stream(self, bot: ContinuousStreamBotStructure) -> None:
        """
        Runs the CLI interface for the continuous audio streaming modality.
        """
        logger.debug("Running the LocalBotInterface in continuous audio streaming mode.")

        # Create the handlers
        self.audio_handler = AudioHandler()
        self.input_handler = InputHandler()
        self.input_handler.loop = asyncio.get_running_loop()

        # Connect the event handlers
        bot.add_event_handler("*", self.check_for_transcripts)
        # bot.add_event_handler("on_text_message_from_model", self.handle_text_messages)
        bot.add_event_handler("on_audio_message_from_model", self.handle_audio_messages)
        bot.add_event_handler("on_vad_detects_user_speech_started", self.speech_started)
        bot.add_event_handler("on_vad_detects_user_speech_ended", self.speech_stopped)

        # Start keyboard listener in a separate thread
        listener = keyboard.Listener(on_press=self.input_handler.on_press)
        listener.start()

        try:
            logger.debug("Asking the bot to connect to the model...")
            await bot.connect()
            asyncio.create_task(bot.run())

            print("Chat is ready. Start speaking!")
            print("Press 'q' to quit")
            print("")

            # Start continuous audio streaming
            asyncio.create_task(self.audio_handler.start_streaming(bot.send))

            # Simple input loop for quit command
            while True:
                command, _ = await self.input_handler.command_queue.get()

                if command == "q":
                    break

        except Exception as e:  # pylint: disable=broad-except
            logger.exception("An error occurred: %s", str(e))
        finally:
            self.audio_handler.stop_streaming()
            self.audio_handler.cleanup()
            await bot.disconnect()
            print("Chat is finished. Bye!")

    async def check_for_transcripts(self, event: Dict[str, Any]) -> None:
        """
        Checks for transcripts from the bot.

        Args:
            event: The event dictionary containing the transcript.
        """
        if "transcript" in event:
            print(f"[{event["type"]}] Transcript: {event['transcript']}")

    async def handle_start_text_response(self, _) -> None:
        """
        Prints to the console when the bot starts generating a text response.
        """
        print("Assistant: ", end="")

    async def handle_finish_text_response(self, _) -> None:
        """
        Prints to the console when the bot starts generating a text response.
        """
        print("")
        await self.bot.send({"role": "user", "content": input("User: ")})

    async def handle_model_connection(self, event: Dict[str, Any]) -> None:
        """
        Prints to the console when the bot connects to the model.

        Args:
            event: The event dictionary containing the model connection event.
        """
        print("########## Chat is ready! ###########")
        await self.handle_finish_text_response(event)

    async def handle_text_messages(self, event: Dict[str, Any]) -> None:
        """
        Prints to the console any text message from the bot.

        Args:
            event: The event dictionary containing the message.
        """
        if event["delta"]:
            print(event["delta"], end="", flush=True)

    async def handle_audio_messages(self, event: Dict[str, Any]) -> None:
        """
        Plays audio responses from the bot.

        Args:
            event: The event dictionary containing the audio message.
        """
        self.audio_handler.play_audio(base64.b64decode(event["delta"]))

    async def speech_started(self, event: Dict[str, Any]) -> None:  # pylint: disable=unused-argument
        """
        Prints to the console when the bot starts speaking.

        Args:
            event: The event dictionary containing the speech start event.
        """
        print("[User is speaking]")

        # Handle interruptions if it is the case
        played_milliseconds = self.audio_handler.stop_playback_immediately()
        logging.debug("Played the response for %s milliseconds.", played_milliseconds)

        # If we're interrupting the bot, handle the interruption on the model side too
        if played_milliseconds:
            logging.info("Handling interruption...")
            await self.bot.handle_interruption(played_milliseconds)

    async def speech_stopped(self, event: Dict[str, Any]) -> None:  # pylint: disable=unused-argument
        """
        Prints to the console when the bot stops speaking.

        Args:
            event: The event dictionary containing the speech stop event.
        """
        print("[User stopped speaking]")
check_for_transcripts(event) async #

Checks for transcripts from the bot.

Parameters:

Name Type Description Default
event Dict[str, Any]

The event dictionary containing the transcript.

required
Source code in plugins/intentional-local/src/intentional_local/bot_interface.py
134
135
136
137
138
139
140
141
142
async def check_for_transcripts(self, event: Dict[str, Any]) -> None:
    """
    Checks for transcripts from the bot.

    Args:
        event: The event dictionary containing the transcript.
    """
    if "transcript" in event:
        print(f"[{event["type"]}] Transcript: {event['transcript']}")
handle_audio_messages(event) async #

Plays audio responses from the bot.

Parameters:

Name Type Description Default
event Dict[str, Any]

The event dictionary containing the audio message.

required
Source code in plugins/intentional-local/src/intentional_local/bot_interface.py
177
178
179
180
181
182
183
184
async def handle_audio_messages(self, event: Dict[str, Any]) -> None:
    """
    Plays audio responses from the bot.

    Args:
        event: The event dictionary containing the audio message.
    """
    self.audio_handler.play_audio(base64.b64decode(event["delta"]))
handle_finish_text_response(_) async #

Prints to the console when the bot starts generating a text response.

Source code in plugins/intentional-local/src/intentional_local/bot_interface.py
150
151
152
153
154
155
async def handle_finish_text_response(self, _) -> None:
    """
    Prints to the console when the bot starts generating a text response.
    """
    print("")
    await self.bot.send({"role": "user", "content": input("User: ")})
handle_model_connection(event) async #

Prints to the console when the bot connects to the model.

Parameters:

Name Type Description Default
event Dict[str, Any]

The event dictionary containing the model connection event.

required
Source code in plugins/intentional-local/src/intentional_local/bot_interface.py
157
158
159
160
161
162
163
164
165
async def handle_model_connection(self, event: Dict[str, Any]) -> None:
    """
    Prints to the console when the bot connects to the model.

    Args:
        event: The event dictionary containing the model connection event.
    """
    print("########## Chat is ready! ###########")
    await self.handle_finish_text_response(event)
handle_start_text_response(_) async #

Prints to the console when the bot starts generating a text response.

Source code in plugins/intentional-local/src/intentional_local/bot_interface.py
144
145
146
147
148
async def handle_start_text_response(self, _) -> None:
    """
    Prints to the console when the bot starts generating a text response.
    """
    print("Assistant: ", end="")
handle_text_messages(event) async #

Prints to the console any text message from the bot.

Parameters:

Name Type Description Default
event Dict[str, Any]

The event dictionary containing the message.

required
Source code in plugins/intentional-local/src/intentional_local/bot_interface.py
167
168
169
170
171
172
173
174
175
async def handle_text_messages(self, event: Dict[str, Any]) -> None:
    """
    Prints to the console any text message from the bot.

    Args:
        event: The event dictionary containing the message.
    """
    if event["delta"]:
        print(event["delta"], end="", flush=True)
run() async #

Chooses the specific loop to use for this combination of bot and modality and kicks it off.

Source code in plugins/intentional-local/src/intentional_local/bot_interface.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
async def run(self) -> None:
    """
    Chooses the specific loop to use for this combination of bot and modality and kicks it off.
    """
    if isinstance(self.bot, ContinuousStreamBotStructure):
        if self.modality == "audio_stream":
            await self._run_audio_stream(self.bot)
        else:
            raise ValueError(
                f"Modality '{self.modality}' is not yet supported for '{self.bot.name}' bots."
                "These are the supported modalities: 'audio_stream'."
            )

    if isinstance(self.bot, TurnBasedBotStructure):
        if self.modality == "text_turns":
            await self._run_text_turns(self.bot)
        else:
            raise ValueError(
                f"Modality '{self.modality}' is not yet supported for '{self.bot.name}' bots."
                "These are the supported modalities: 'text_turns'."
            )
speech_started(event) async #

Prints to the console when the bot starts speaking.

Parameters:

Name Type Description Default
event Dict[str, Any]

The event dictionary containing the speech start event.

required
Source code in plugins/intentional-local/src/intentional_local/bot_interface.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
async def speech_started(self, event: Dict[str, Any]) -> None:  # pylint: disable=unused-argument
    """
    Prints to the console when the bot starts speaking.

    Args:
        event: The event dictionary containing the speech start event.
    """
    print("[User is speaking]")

    # Handle interruptions if it is the case
    played_milliseconds = self.audio_handler.stop_playback_immediately()
    logging.debug("Played the response for %s milliseconds.", played_milliseconds)

    # If we're interrupting the bot, handle the interruption on the model side too
    if played_milliseconds:
        logging.info("Handling interruption...")
        await self.bot.handle_interruption(played_milliseconds)
speech_stopped(event) async #

Prints to the console when the bot stops speaking.

Parameters:

Name Type Description Default
event Dict[str, Any]

The event dictionary containing the speech stop event.

required
Source code in plugins/intentional-local/src/intentional_local/bot_interface.py
204
205
206
207
208
209
210
211
async def speech_stopped(self, event: Dict[str, Any]) -> None:  # pylint: disable=unused-argument
    """
    Prints to the console when the bot stops speaking.

    Args:
        event: The event dictionary containing the speech stop event.
    """
    print("[User stopped speaking]")

handlers #

Init file for intentional_local.

audio_handler #

CLI handler for the bot's audio input and output.

Uses PyAudio for audio input and output, and runs a separate thread for recording and playing audio.

When playing audio, it uses a buffer to store audio data and plays it continuously to ensure smooth playback.

AudioHandler #

Handles audio input and output for the chatbot.

Uses PyAudio for audio input and output, and runs a separate thread for recording and playing audio.

When playing audio, it uses a buffer to store audio data and plays it continuously to ensure smooth playback.

Parameters:

Name Type Description Default
audio_format int

The audio format (paInt16).

paInt16
channels int

The number of audio channels (1).

1
rate int

The sample rate (24000).

24000
chunk int

The size of the audio buffer (1024).

1024
Source code in plugins/intentional-local/src/intentional_local/handlers/audio_handler.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
class AudioHandler:
    """
    Handles audio input and output for the chatbot.

    Uses PyAudio for audio input and output, and runs a separate thread for recording and playing audio.

    When playing audio, it uses a buffer to store audio data and plays it continuously to ensure smooth playback.

    Args:
        audio_format:
            The audio format (paInt16).
        channels:
            The number of audio channels (1).
        rate:
            The sample rate (24000).
        chunk:
            The size of the audio buffer (1024).
    """

    def __init__(
        self,
        audio_format: int = pyaudio.paInt16,
        channels: int = 1,
        rate: int = 24000,
        chunk: int = 1024,
    ):
        # Audio parameters
        self.audio_format = audio_format
        self.channels = channels
        self.rate = rate
        self.chunk = chunk

        self.audio = pyaudio.PyAudio()

        # Recording attributes
        self.recording_stream: Optional[pyaudio.Stream] = None
        self.recording_thread = None
        self.recording = False

        # Model streaming attributes
        self.streaming = False
        self.model_stream = None

        # Playback attributes
        self.playback_stream = None
        self.playback_play_time = 0
        self.playback_buffer = queue.Queue()
        self.playback_event = threading.Event()
        self.playback_thread = None
        self.stop_playback = False

        self.frames = []
        self.currently_playing = False

    def start_recording(self) -> bytes:
        """Start recording audio from microphone and return bytes"""
        if self.recording:
            return b""

        self.recording = True
        self.recording_stream = self.audio.open(
            format=self.audio_format, channels=self.channels, rate=self.rate, input=True, frames_per_buffer=self.chunk
        )

        print("\nRecording... Press 'space' to stop.")

        self.frames = []
        self.recording_thread = threading.Thread(target=self._record)
        self.recording_thread.start()

        return b""  # Return empty bytes, we'll send audio later

    def _record(self):
        while self.recording:
            try:
                data = self.recording_stream.read(self.chunk)
                self.frames.append(data)
            except Exception as e:  # pylint: disable=broad-except
                logger.exception("Error recording audio: %s", e)
                break

    # def stop_recording(self) -> bytes:
    #     """Stop recording and return the recorded audio as bytes"""
    #     if not self.recording:
    #         return b""

    #     self.recording = False
    #     if self.recording_thread:
    #         self.recording_thread.join()

    #     # Clean up recording stream
    #     if self.recording_stream:
    #         self.recording_stream.stop_stream()
    #         self.recording_stream.close()
    #         self.recording_stream = None

    #     # Convert frames to WAV format in memory
    #     wav_buffer = io.BytesIO()
    #     with wave.open(wav_buffer, "wb") as wf:
    #         wf: wave.Wave_write
    #         wf.setnchannels(self.channels)
    #         wf.setsampwidth(self.audio.get_sample_size(self.audio_format))
    #         wf.setframerate(self.rate)
    #         wf.writeframes(b"".join(self.frames))

    #     # Get the WAV data
    #     wav_buffer.seek(0)
    #     return wav_buffer.read()

    async def start_streaming(self, client_streaming_callback):
        """Start continuous audio streaming."""
        if self.streaming:
            return

        self.streaming = True
        self.model_stream = self.audio.open(
            format=self.audio_format, channels=self.channels, rate=self.rate, input=True, frames_per_buffer=self.chunk
        )

        print("\nStreaming audio... Press 'q' to stop.")

        while self.streaming:
            try:
                # Read raw PCM data
                data = self.model_stream.read(self.chunk, exception_on_overflow=False)
                # Stream directly without trying to decode
                await client_streaming_callback({"audio_stream": data})
            except Exception as e:  # pylint: disable=broad-except
                logger.exception("Error streaming: %s", e)
                break
            await asyncio.sleep(0.01)

    def stop_streaming(self):
        """
        Stop audio streaming.
        """
        self.streaming = False
        if self.model_stream:
            self.model_stream.stop_stream()
            self.model_stream.close()
            self.model_stream = None

    def play_audio(self, audio_data: bytes):
        """
        Add audio data to the buffer

        Args:
            audio_data: The audio data to play.
        """
        audio_segment = AudioSegment(audio_data, sample_width=2, frame_rate=24000, channels=1)
        try:
            self.playback_buffer.put_nowait(audio_segment)
        except queue.Full:
            # If the buffer is full, remove the oldest chunk and add the new one
            self.playback_buffer.get_nowait()
            self.playback_buffer.put_nowait(audio_segment)

        if not self.playback_thread or not self.playback_thread.is_alive():
            self.stop_playback = False
            self.playback_event.clear()
            self.playback_thread = threading.Thread(target=self._continuous_playback)
            self.playback_thread.start()

    def _continuous_playback(self):
        """
        Continuously play audio from the buffer.
        """
        self.playback_stream = self.audio.open(
            format=self.audio_format, channels=self.channels, rate=self.rate, output=True, frames_per_buffer=self.chunk
        )
        while not self.stop_playback:
            try:
                audio_segment = self.playback_buffer.get(timeout=0.1)
                self.playback_play_time += len(audio_segment)
                self._play_audio_chunk(audio_segment)
            except queue.Empty:
                self.playback_play_time = 0
                continue

            if self.playback_event.is_set():
                break

        if self.playback_stream:
            self.playback_stream.stop_stream()
            self.playback_stream.close()
            self.playback_stream = None

    def _play_audio_chunk(self, audio_segment: AudioSegment):
        try:
            # Ensure the audio is in the correct format for playback
            audio_data = audio_segment.raw_data

            # Play the audio chunk in smaller portions to allow for quicker interruption
            chunk_size = 1024  # Adjust this value as needed
            for i in range(0, len(audio_data), chunk_size):
                if self.playback_event.is_set():
                    break
                chunk = audio_data[i : i + chunk_size]
                self.playback_stream.write(chunk)
        except Exception as e:  # pylint: disable=broad-except
            logger.exception("Error playing audio chunk: %s", e)

    def stop_playback_immediately(self) -> datetime.timedelta:
        """
        Stop audio playback immediately. Sets the relevant flags and empties the queue.

        """
        played_milliseconds = 0
        if self.playback_play_time:
            played_milliseconds = self.playback_play_time
            self.playback_play_time = 0

        self.stop_playback = True
        self.playback_buffer.queue.clear()  # Clear any pending audio
        self.currently_playing = False
        self.playback_event.set()
        return played_milliseconds

    def cleanup(self):
        """
        Clean up audio resources.
        """
        self.stop_playback_immediately()

        self.stop_playback = True
        if self.playback_thread:
            self.playback_thread.join()

        self.recording = False
        if self.recording_stream:
            self.recording_stream.stop_stream()
            self.recording_stream.close()

        if self.model_stream:
            self.model_stream.stop_stream()
            self.model_stream.close()

        self.audio.terminate()
cleanup() #

Clean up audio resources.

Source code in plugins/intentional-local/src/intentional_local/handlers/audio_handler.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
def cleanup(self):
    """
    Clean up audio resources.
    """
    self.stop_playback_immediately()

    self.stop_playback = True
    if self.playback_thread:
        self.playback_thread.join()

    self.recording = False
    if self.recording_stream:
        self.recording_stream.stop_stream()
        self.recording_stream.close()

    if self.model_stream:
        self.model_stream.stop_stream()
        self.model_stream.close()

    self.audio.terminate()
play_audio(audio_data) #

Add audio data to the buffer

Parameters:

Name Type Description Default
audio_data bytes

The audio data to play.

required
Source code in plugins/intentional-local/src/intentional_local/handlers/audio_handler.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def play_audio(self, audio_data: bytes):
    """
    Add audio data to the buffer

    Args:
        audio_data: The audio data to play.
    """
    audio_segment = AudioSegment(audio_data, sample_width=2, frame_rate=24000, channels=1)
    try:
        self.playback_buffer.put_nowait(audio_segment)
    except queue.Full:
        # If the buffer is full, remove the oldest chunk and add the new one
        self.playback_buffer.get_nowait()
        self.playback_buffer.put_nowait(audio_segment)

    if not self.playback_thread or not self.playback_thread.is_alive():
        self.stop_playback = False
        self.playback_event.clear()
        self.playback_thread = threading.Thread(target=self._continuous_playback)
        self.playback_thread.start()
start_recording() #

Start recording audio from microphone and return bytes

Source code in plugins/intentional-local/src/intentional_local/handlers/audio_handler.py
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def start_recording(self) -> bytes:
    """Start recording audio from microphone and return bytes"""
    if self.recording:
        return b""

    self.recording = True
    self.recording_stream = self.audio.open(
        format=self.audio_format, channels=self.channels, rate=self.rate, input=True, frames_per_buffer=self.chunk
    )

    print("\nRecording... Press 'space' to stop.")

    self.frames = []
    self.recording_thread = threading.Thread(target=self._record)
    self.recording_thread.start()

    return b""  # Return empty bytes, we'll send audio later
start_streaming(client_streaming_callback) async #

Start continuous audio streaming.

Source code in plugins/intentional-local/src/intentional_local/handlers/audio_handler.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
async def start_streaming(self, client_streaming_callback):
    """Start continuous audio streaming."""
    if self.streaming:
        return

    self.streaming = True
    self.model_stream = self.audio.open(
        format=self.audio_format, channels=self.channels, rate=self.rate, input=True, frames_per_buffer=self.chunk
    )

    print("\nStreaming audio... Press 'q' to stop.")

    while self.streaming:
        try:
            # Read raw PCM data
            data = self.model_stream.read(self.chunk, exception_on_overflow=False)
            # Stream directly without trying to decode
            await client_streaming_callback({"audio_stream": data})
        except Exception as e:  # pylint: disable=broad-except
            logger.exception("Error streaming: %s", e)
            break
        await asyncio.sleep(0.01)
stop_playback_immediately() #

Stop audio playback immediately. Sets the relevant flags and empties the queue.

Source code in plugins/intentional-local/src/intentional_local/handlers/audio_handler.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
def stop_playback_immediately(self) -> datetime.timedelta:
    """
    Stop audio playback immediately. Sets the relevant flags and empties the queue.

    """
    played_milliseconds = 0
    if self.playback_play_time:
        played_milliseconds = self.playback_play_time
        self.playback_play_time = 0

    self.stop_playback = True
    self.playback_buffer.queue.clear()  # Clear any pending audio
    self.currently_playing = False
    self.playback_event.set()
    return played_milliseconds
stop_streaming() #

Stop audio streaming.

Source code in plugins/intentional-local/src/intentional_local/handlers/audio_handler.py
160
161
162
163
164
165
166
167
168
def stop_streaming(self):
    """
    Stop audio streaming.
    """
    self.streaming = False
    if self.model_stream:
        self.model_stream.stop_stream()
        self.model_stream.close()
        self.model_stream = None
input_handler #

Handles keyboard input for the bot's CLI interface.

This module is responsible for capturing keyboard input and translating it into commands for the bot.

InputHandler #

Handles keyboard input for the chatbot.

This class is responsible for capturing keyboard input and translating it into commands for the chatbot.

Attributes:

Name Type Description
text_input str

The current text input from the user.

text_ready Event

An event that is set when the user has finished typing.

command_queue Queue

A queue that stores commands for the chatbot.

loop AbstractEventLoop

The event loop for the input handler.

Source code in plugins/intentional-local/src/intentional_local/handlers/input_handler.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
class InputHandler:
    """
    Handles keyboard input for the chatbot.

    This class is responsible for capturing keyboard input and translating it into commands for the chatbot.

    Attributes:
        text_input (str): The current text input from the user.
        text_ready (asyncio.Event): An event that is set when the user has finished typing.
        command_queue (asyncio.Queue): A queue that stores commands for the chatbot.
        loop (asyncio.AbstractEventLoop): The event loop for the input handler.
    """

    def __init__(self):
        """
        Handles keyboard input for the chatbot.
        """
        self.text_input = ""
        self.text_ready = asyncio.Event()
        self.command_queue = asyncio.Queue()
        self.loop = None

    def on_press(self, key):
        """
        Keyboard event handler.
        """
        try:
            if key == keyboard.Key.space:
                self.loop.call_soon_threadsafe(self.command_queue.put_nowait, ("space", None))
            elif key == keyboard.Key.enter:
                self.loop.call_soon_threadsafe(self.command_queue.put_nowait, ("enter", self.text_input))
                self.text_input = ""
            elif key == keyboard.KeyCode.from_char("r"):
                self.loop.call_soon_threadsafe(self.command_queue.put_nowait, ("r", None))
            elif key == keyboard.KeyCode.from_char("q"):
                self.loop.call_soon_threadsafe(self.command_queue.put_nowait, ("q", None))
            elif hasattr(key, "char"):
                if key == keyboard.Key.backspace:
                    self.text_input = self.text_input[:-1]
                else:
                    self.text_input += key.char

        except AttributeError as e:
            logger.exception("Error processing key event", exc_info=e)
__init__() #

Handles keyboard input for the chatbot.

Source code in plugins/intentional-local/src/intentional_local/handlers/input_handler.py
33
34
35
36
37
38
39
40
def __init__(self):
    """
    Handles keyboard input for the chatbot.
    """
    self.text_input = ""
    self.text_ready = asyncio.Event()
    self.command_queue = asyncio.Queue()
    self.loop = None
on_press(key) #

Keyboard event handler.

Source code in plugins/intentional-local/src/intentional_local/handlers/input_handler.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def on_press(self, key):
    """
    Keyboard event handler.
    """
    try:
        if key == keyboard.Key.space:
            self.loop.call_soon_threadsafe(self.command_queue.put_nowait, ("space", None))
        elif key == keyboard.Key.enter:
            self.loop.call_soon_threadsafe(self.command_queue.put_nowait, ("enter", self.text_input))
            self.text_input = ""
        elif key == keyboard.KeyCode.from_char("r"):
            self.loop.call_soon_threadsafe(self.command_queue.put_nowait, ("r", None))
        elif key == keyboard.KeyCode.from_char("q"):
            self.loop.call_soon_threadsafe(self.command_queue.put_nowait, ("q", None))
        elif hasattr(key, "char"):
            if key == keyboard.Key.backspace:
                self.text_input = self.text_input[:-1]
            else:
                self.text_input += key.char

    except AttributeError as e:
        logger.exception("Error processing key event", exc_info=e)