Skip to content

API Reference - Pipecat#

src #

intentional_pipecat #

Intentional plugin for Pipecat

__about__ #

Package descriptors for intentional-pipecat.

bot_structure #

Pipecat bot structure implementation.

PipecatBotStructure #

Bases: BotStructure

Bot structure that uses Pipecat to make text-only models able to handle spoken input.

Source code in plugins/intentional-pipecat/src/intentional_pipecat/bot_structure.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
class PipecatBotStructure(BotStructure):  # pylint: disable=too-many-instance-attributes
    """
    Bot structure that uses Pipecat to make text-only models able to handle spoken input.
    """

    name = "pipecat"

    def __init__(self, config: Dict[str, Any], intent_router: IntentRouter):
        """
        Args:
            config:
                The configuration dictionary for the bot structure.
                It includes only the LLM definition under the `llm` key.
        """
        super().__init__()
        log.debug("Loading bot structure from config", bot_structure_config=config)

        # Init the model client
        llm_config = config.pop("llm", None)
        if not llm_config:
            raise ValueError(f"{self.__class__.__name__} requires a 'llm' configuration key.")
        self.llm: LLMClient = load_llm_client_from_dict(parent=self, intent_router=intent_router, config=llm_config)

        # Import the correct VAD, STT and TTS clients from Pipecat
        self.vad_class, self.vad_params = self._load_class_from_config(
            config.pop("vad", None), "vad", {"start_secs": 0.1, "stop_secs": 0.1, "min_volume": 0.6}
        )
        self.stt_class, self.stt_params = self._load_class_from_config(
            config.pop("stt", None), "stt", {"sample_rate": 16000}
        )
        self.tts_class, self.tts_params = self._load_class_from_config(config.pop("tts", None), "tts", {})

        # Pipecat pipeline
        self.pipecat_task = None
        self.publisher = None
        self.transport = None
        self.assistant_reply = ""

    def _load_class_from_config(self, config: Dict[str, Any], key: str, defaults: Optional[Dict[str, Any]] = None):
        if not config:
            raise ValueError(f"{self.__class__.__name__} requires a '{key}' configuration key.")
        module = importlib.import_module(PIPECAT_MODULES_FOR_KEY[key] + config["module"])
        class_ = getattr(module, config["class"])
        params = config.get("params", {})

        # Load env vars if necessary
        usable_params = defaults or {}
        for param_key in params.keys():
            if param_key.endswith("__envvar"):
                usable_params[param_key.removesuffix("__envvar")] = os.getenv(params[param_key])
            else:
                usable_params[param_key] = params[param_key]

        return class_, usable_params

    async def connect(self) -> None:
        """
        Initializes the model and connects to it as/if necessary.
        """
        # Prepares the Pipecat pipeline
        transport_params = TransportParams(
            audio_in_enabled=True,
            audio_out_enabled=True,
            transcription_enabled=True,
            vad_enabled=True,
            vad_analyzer=self.vad_class(params=VADParams(**self.vad_params)),
            vad_audio_passthrough=True,
        )
        self.transport = AudioTransport(transport_params, self.llm.emit)

        stt = self.stt_class(**self.stt_params)
        tts = self.tts_class(**self.tts_params)
        user_response = LLMUserResponseAggregator()
        send_to_llm = UserToLLMFrameProcessor(self.llm)
        self.publisher = LLMToUserFrameProcessor()
        pipeline = Pipeline(
            [
                self.transport.input(),
                stt,
                user_response,
                send_to_llm,
                self.publisher,
                tts,
                self.transport.output(),
            ]
        )
        self.pipecat_task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

        self.add_event_handler("on_text_message_from_llm", self.handle_llm_text_messages)
        self.add_event_handler("on_llm_starts_generating_response", self.handle_llm_starts_generating_response)
        self.add_event_handler("on_llm_stops_generating_response", self.handle_llm_stops_generating_response)
        # Start the pipeline
        asyncio.create_task(self.pipecat_task.run())
        # Wait for the pipeline to actually connect and start
        while True:
            if self.transport.input().ready:
                break
            await asyncio.sleep(0.1)
        # Connects to the model, if necessary
        await self.llm.connect()

    async def handle_llm_text_messages(self, event: Dict[str, Any]) -> None:
        """
        Sends the text message to the Pipecat pipeline to be converted into audio.
        """
        if event["delta"]:
            await self.publisher.push_frame(TextFrame(event["delta"]), FrameDirection.DOWNSTREAM)
            self.assistant_reply += event["delta"]

    async def handle_llm_starts_generating_response(self, _: Dict[str, Any]) -> None:
        """
        Warns the Pipecat pipeline of the start of a response from the LLM by sending an LLMFullResponseStartFrame()
        """
        await self.publisher.push_frame(LLMFullResponseStartFrame(), FrameDirection.DOWNSTREAM)

    async def handle_llm_stops_generating_response(self, _: Dict[str, Any]) -> None:
        """
        Warns the Pipecat pipeline of the end of a response from the LLM by sending an LLMFullResponseEndFrame()
        """
        await self.publisher.push_frame(LLMFullResponseEndFrame(), FrameDirection.DOWNSTREAM)
        if self.assistant_reply:
            await self.llm.emit("on_llm_speech_transcribed", {"type": "assistant", "transcript": self.assistant_reply})
            self.assistant_reply = ""

    async def disconnect(self) -> None:
        """
        Disconnects from the LLM and unloads/closes it as/if necessary.
        """
        await self.llm.disconnect()

    async def run(self) -> None:
        """
        Main loop for the bot.
        """
        log.debug(".run() is no-op for PipecatBotStructure, the Pipecat pipeline is self-sufficient.")

    async def send(self, data: Dict[str, Any]) -> AsyncGenerator[Dict[str, Any], None]:
        """
        Sends a message to the LLM and forward the response.

        Args:
            data: The message to send to the model in OpenAI format, like {"role": "user", "content": "Hello!"}
        """
        if "audio_chunk" in data:
            await self.transport.input().send_audio_frame(data["audio_chunk"])
        else:
            raise ValueError("PipecatBotStructure only supports audio data for now.")

    async def handle_interruption(self, lenght_to_interruption: int) -> None:
        """
        Handle an interruption in the streaming.

        Args:
            lenght_to_interruption: The length of the data that was produced to the user before the interruption.
                This value could be number of characters, number of words, milliseconds, number of audio frames, etc.
                depending on the bot structure that implements it.
        """
        log.warning("handle interruptions: TODO")
__init__(config, intent_router) #

Parameters:

Name Type Description Default
config Dict[str, Any]

The configuration dictionary for the bot structure. It includes only the LLM definition under the llm key.

required
Source code in plugins/intentional-pipecat/src/intentional_pipecat/bot_structure.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def __init__(self, config: Dict[str, Any], intent_router: IntentRouter):
    """
    Args:
        config:
            The configuration dictionary for the bot structure.
            It includes only the LLM definition under the `llm` key.
    """
    super().__init__()
    log.debug("Loading bot structure from config", bot_structure_config=config)

    # Init the model client
    llm_config = config.pop("llm", None)
    if not llm_config:
        raise ValueError(f"{self.__class__.__name__} requires a 'llm' configuration key.")
    self.llm: LLMClient = load_llm_client_from_dict(parent=self, intent_router=intent_router, config=llm_config)

    # Import the correct VAD, STT and TTS clients from Pipecat
    self.vad_class, self.vad_params = self._load_class_from_config(
        config.pop("vad", None), "vad", {"start_secs": 0.1, "stop_secs": 0.1, "min_volume": 0.6}
    )
    self.stt_class, self.stt_params = self._load_class_from_config(
        config.pop("stt", None), "stt", {"sample_rate": 16000}
    )
    self.tts_class, self.tts_params = self._load_class_from_config(config.pop("tts", None), "tts", {})

    # Pipecat pipeline
    self.pipecat_task = None
    self.publisher = None
    self.transport = None
    self.assistant_reply = ""
connect() async #

Initializes the model and connects to it as/if necessary.

Source code in plugins/intentional-pipecat/src/intentional_pipecat/bot_structure.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
async def connect(self) -> None:
    """
    Initializes the model and connects to it as/if necessary.
    """
    # Prepares the Pipecat pipeline
    transport_params = TransportParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
        transcription_enabled=True,
        vad_enabled=True,
        vad_analyzer=self.vad_class(params=VADParams(**self.vad_params)),
        vad_audio_passthrough=True,
    )
    self.transport = AudioTransport(transport_params, self.llm.emit)

    stt = self.stt_class(**self.stt_params)
    tts = self.tts_class(**self.tts_params)
    user_response = LLMUserResponseAggregator()
    send_to_llm = UserToLLMFrameProcessor(self.llm)
    self.publisher = LLMToUserFrameProcessor()
    pipeline = Pipeline(
        [
            self.transport.input(),
            stt,
            user_response,
            send_to_llm,
            self.publisher,
            tts,
            self.transport.output(),
        ]
    )
    self.pipecat_task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

    self.add_event_handler("on_text_message_from_llm", self.handle_llm_text_messages)
    self.add_event_handler("on_llm_starts_generating_response", self.handle_llm_starts_generating_response)
    self.add_event_handler("on_llm_stops_generating_response", self.handle_llm_stops_generating_response)
    # Start the pipeline
    asyncio.create_task(self.pipecat_task.run())
    # Wait for the pipeline to actually connect and start
    while True:
        if self.transport.input().ready:
            break
        await asyncio.sleep(0.1)
    # Connects to the model, if necessary
    await self.llm.connect()
disconnect() async #

Disconnects from the LLM and unloads/closes it as/if necessary.

Source code in plugins/intentional-pipecat/src/intentional_pipecat/bot_structure.py
165
166
167
168
169
async def disconnect(self) -> None:
    """
    Disconnects from the LLM and unloads/closes it as/if necessary.
    """
    await self.llm.disconnect()
handle_interruption(lenght_to_interruption) async #

Handle an interruption in the streaming.

Parameters:

Name Type Description Default
lenght_to_interruption int

The length of the data that was produced to the user before the interruption. This value could be number of characters, number of words, milliseconds, number of audio frames, etc. depending on the bot structure that implements it.

required
Source code in plugins/intentional-pipecat/src/intentional_pipecat/bot_structure.py
189
190
191
192
193
194
195
196
197
198
async def handle_interruption(self, lenght_to_interruption: int) -> None:
    """
    Handle an interruption in the streaming.

    Args:
        lenght_to_interruption: The length of the data that was produced to the user before the interruption.
            This value could be number of characters, number of words, milliseconds, number of audio frames, etc.
            depending on the bot structure that implements it.
    """
    log.warning("handle interruptions: TODO")
handle_llm_starts_generating_response(_) async #

Warns the Pipecat pipeline of the start of a response from the LLM by sending an LLMFullResponseStartFrame()

Source code in plugins/intentional-pipecat/src/intentional_pipecat/bot_structure.py
150
151
152
153
154
async def handle_llm_starts_generating_response(self, _: Dict[str, Any]) -> None:
    """
    Warns the Pipecat pipeline of the start of a response from the LLM by sending an LLMFullResponseStartFrame()
    """
    await self.publisher.push_frame(LLMFullResponseStartFrame(), FrameDirection.DOWNSTREAM)
handle_llm_stops_generating_response(_) async #

Warns the Pipecat pipeline of the end of a response from the LLM by sending an LLMFullResponseEndFrame()

Source code in plugins/intentional-pipecat/src/intentional_pipecat/bot_structure.py
156
157
158
159
160
161
162
163
async def handle_llm_stops_generating_response(self, _: Dict[str, Any]) -> None:
    """
    Warns the Pipecat pipeline of the end of a response from the LLM by sending an LLMFullResponseEndFrame()
    """
    await self.publisher.push_frame(LLMFullResponseEndFrame(), FrameDirection.DOWNSTREAM)
    if self.assistant_reply:
        await self.llm.emit("on_llm_speech_transcribed", {"type": "assistant", "transcript": self.assistant_reply})
        self.assistant_reply = ""
handle_llm_text_messages(event) async #

Sends the text message to the Pipecat pipeline to be converted into audio.

Source code in plugins/intentional-pipecat/src/intentional_pipecat/bot_structure.py
142
143
144
145
146
147
148
async def handle_llm_text_messages(self, event: Dict[str, Any]) -> None:
    """
    Sends the text message to the Pipecat pipeline to be converted into audio.
    """
    if event["delta"]:
        await self.publisher.push_frame(TextFrame(event["delta"]), FrameDirection.DOWNSTREAM)
        self.assistant_reply += event["delta"]
run() async #

Main loop for the bot.

Source code in plugins/intentional-pipecat/src/intentional_pipecat/bot_structure.py
171
172
173
174
175
async def run(self) -> None:
    """
    Main loop for the bot.
    """
    log.debug(".run() is no-op for PipecatBotStructure, the Pipecat pipeline is self-sufficient.")
send(data) async #

Sends a message to the LLM and forward the response.

Parameters:

Name Type Description Default
data Dict[str, Any]

The message to send to the model in OpenAI format, like {"role": "user", "content": "Hello!"}

required
Source code in plugins/intentional-pipecat/src/intentional_pipecat/bot_structure.py
177
178
179
180
181
182
183
184
185
186
187
async def send(self, data: Dict[str, Any]) -> AsyncGenerator[Dict[str, Any], None]:
    """
    Sends a message to the LLM and forward the response.

    Args:
        data: The message to send to the model in OpenAI format, like {"role": "user", "content": "Hello!"}
    """
    if "audio_chunk" in data:
        await self.transport.input().send_audio_frame(data["audio_chunk"])
    else:
        raise ValueError("PipecatBotStructure only supports audio data for now.")

frame_processor #

Pipecat frame processor implementation

LLMToUserFrameProcessor #

Bases: FrameProcessor

FrameProcessor that takes the LLM output and sends it to the user.

Note: this processor itself is doing nothing else than changing the default behavior of process_frame() to not swallow frames when they reach it. The processor is actually used by PipecatBotStructure to generate frames when a reply from the LLM is received.

Source code in plugins/intentional-pipecat/src/intentional_pipecat/frame_processor.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class LLMToUserFrameProcessor(FrameProcessor):
    """
    FrameProcessor that takes the LLM output and sends it to the user.

    Note: this processor itself is doing nothing else than changing the default behavior of `process_frame()` to not
    swallow frames when they reach it. The processor is actually used by `PipecatBotStructure` to generate frames when
    a reply from the LLM is received.
    """

    def __init__(self):
        super().__init__()

    async def process_frame(self, frame: Frame, direction: FrameDirection):
        """
        Simply forwards all framews ahead. The default behavior of FrameProcessor is to block them instead.
        """
        await super().process_frame(frame, direction)
        await self.push_frame(frame, direction)
process_frame(frame, direction) async #

Simply forwards all framews ahead. The default behavior of FrameProcessor is to block them instead.

Source code in plugins/intentional-pipecat/src/intentional_pipecat/frame_processor.py
63
64
65
66
67
68
async def process_frame(self, frame: Frame, direction: FrameDirection):
    """
    Simply forwards all framews ahead. The default behavior of FrameProcessor is to block them instead.
    """
    await super().process_frame(frame, direction)
    await self.push_frame(frame, direction)
UserToLLMFrameProcessor #

Bases: FrameProcessor

FrameProcessor that takes the user input and sends it to the LLM.

Source code in plugins/intentional-pipecat/src/intentional_pipecat/frame_processor.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class UserToLLMFrameProcessor(FrameProcessor):
    """
    FrameProcessor that takes the user input and sends it to the LLM.
    """

    def __init__(self, llm_client: LLMClient):
        super().__init__()
        self.llm_client = llm_client
        self.transcription = ""

    async def process_frame(self, frame: Frame, direction: FrameDirection):
        """
        Processes the incoming frames if relevant.
        """
        await super().process_frame(frame, direction)
        if isinstance(frame, LLMMessagesFrame):
            user_message = frame.messages[-1]["content"]
            log.debug("LLMMessageFrame received, sending message to LLM", user_message=user_message)
            await self.llm_client.emit("on_user_speech_transcribed", {"type": "user", "transcript": user_message})
            await self.llm_client.send({"text_message": {"role": "user", "content": user_message}})
        else:
            if isinstance(frame, UserStartedSpeakingFrame):
                await self.llm_client.emit("on_user_speech_started", {})
            elif isinstance(frame, UserStoppedSpeakingFrame):
                await self.llm_client.emit("on_user_speech_ended", {})
            await self.push_frame(frame, direction)
process_frame(frame, direction) async #

Processes the incoming frames if relevant.

Source code in plugins/intentional-pipecat/src/intentional_pipecat/frame_processor.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
async def process_frame(self, frame: Frame, direction: FrameDirection):
    """
    Processes the incoming frames if relevant.
    """
    await super().process_frame(frame, direction)
    if isinstance(frame, LLMMessagesFrame):
        user_message = frame.messages[-1]["content"]
        log.debug("LLMMessageFrame received, sending message to LLM", user_message=user_message)
        await self.llm_client.emit("on_user_speech_transcribed", {"type": "user", "transcript": user_message})
        await self.llm_client.send({"text_message": {"role": "user", "content": user_message}})
    else:
        if isinstance(frame, UserStartedSpeakingFrame):
            await self.llm_client.emit("on_user_speech_started", {})
        elif isinstance(frame, UserStoppedSpeakingFrame):
            await self.llm_client.emit("on_user_speech_ended", {})
        await self.push_frame(frame, direction)

transport #

Pipecat transport class implementation that is compatible with Intentional.

AudioInputTransport #

Bases: BaseInputTransport

Pipecat input transport class implementation that is compatible with Intentional (supports audio only).

This class' task is to take the user's input and convert it into frames that Pipecat can process.

Source code in plugins/intentional-pipecat/src/intentional_pipecat/transport.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class AudioInputTransport(BaseInputTransport):
    """
    Pipecat input transport class implementation that is compatible with Intentional (supports audio only).

    This class' task is to take the user's input and convert it into frames that Pipecat can process.
    """

    def __init__(self, params: TransportParams):
        super().__init__(params)
        self.ready = False

    async def send_audio_frame(self, audio: bytes):
        """
        Public method used by the Intentional bot structure to publish audio of user's speech to the Pipecat pipeline.
        """
        if not self.ready:
            log.debug("Audio input transport not ready yet, won't send this audio frame")
            return
        frame = InputAudioRawFrame(
            audio=audio,
            sample_rate=16000,
            num_channels=self._params.audio_in_channels,
        )
        await self.push_audio_frame(frame)

    async def start(self, frame: StartFrame):
        """
        Starts the transport's resources.
        """
        log.debug("Starting audio input transport")
        await super().start(frame)
        self.ready = True

    async def cleanup(self):
        """
        Cleans up the transport's resources.
        """
        await super().cleanup()
cleanup() async #

Cleans up the transport's resources.

Source code in plugins/intentional-pipecat/src/intentional_pipecat/transport.py
54
55
56
57
58
async def cleanup(self):
    """
    Cleans up the transport's resources.
    """
    await super().cleanup()
send_audio_frame(audio) async #

Public method used by the Intentional bot structure to publish audio of user's speech to the Pipecat pipeline.

Source code in plugins/intentional-pipecat/src/intentional_pipecat/transport.py
32
33
34
35
36
37
38
39
40
41
42
43
44
async def send_audio_frame(self, audio: bytes):
    """
    Public method used by the Intentional bot structure to publish audio of user's speech to the Pipecat pipeline.
    """
    if not self.ready:
        log.debug("Audio input transport not ready yet, won't send this audio frame")
        return
    frame = InputAudioRawFrame(
        audio=audio,
        sample_rate=16000,
        num_channels=self._params.audio_in_channels,
    )
    await self.push_audio_frame(frame)
start(frame) async #

Starts the transport's resources.

Source code in plugins/intentional-pipecat/src/intentional_pipecat/transport.py
46
47
48
49
50
51
52
async def start(self, frame: StartFrame):
    """
    Starts the transport's resources.
    """
    log.debug("Starting audio input transport")
    await super().start(frame)
    self.ready = True
AudioOutputTransport #

Bases: BaseOutputTransport

Pipecat output transport class implementation that is compatible with Intentional (supports audio only).

This class' task is to take the audio frames generated by the TTS and publish them through events that Intentional can understand.

Source code in plugins/intentional-pipecat/src/intentional_pipecat/transport.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
class AudioOutputTransport(BaseOutputTransport):
    """
    Pipecat output transport class implementation that is compatible with Intentional (supports audio only).

    This class' task is to take the audio frames generated by the TTS and publish them through events that Intentional
    can understand.
    """

    def __init__(self, params: TransportParams, emitter_callback: Callable):
        super().__init__(params)
        self._emitter_callback = emitter_callback

    async def start(self, frame: StartFrame):
        """
        Starts the transport's resources.
        """
        await super().start(frame)

    async def cleanup(self):
        """
        Cleans up the transport's resources.
        """
        await super().cleanup()

    async def process_frame(self, frame: Frame, direction: FrameDirection):
        """
        When it receives a TTSAudioRawFrame, makes the llm emit a `on_audio_message_from_llm` event with the content
        of the frame.
        """
        if isinstance(frame, TTSAudioRawFrame):
            await self._emitter_callback("on_audio_message_from_llm", {"delta": frame.audio})
        # return await super().process_frame(frame, direction)

    async def _audio_out_task_handler(self):
        """
        Internal: overrides the method of the base class to not perform a few actions we don't need.
        """
        try:
            async for frame in self._next_audio_frame():
                # Also, push frame downstream in case anyone else needs it.
                await self.push_frame(frame)
                # Send audio.
                await self.write_raw_audio_frames(frame.audio)
        except asyncio.CancelledError:
            pass
        except Exception as e:  # pylint: disable=broad-except
            log.exception(f"{self} error writing to microphone: {e}")
cleanup() async #

Cleans up the transport's resources.

Source code in plugins/intentional-pipecat/src/intentional_pipecat/transport.py
79
80
81
82
83
async def cleanup(self):
    """
    Cleans up the transport's resources.
    """
    await super().cleanup()
process_frame(frame, direction) async #

When it receives a TTSAudioRawFrame, makes the llm emit a on_audio_message_from_llm event with the content of the frame.

Source code in plugins/intentional-pipecat/src/intentional_pipecat/transport.py
85
86
87
88
89
90
91
async def process_frame(self, frame: Frame, direction: FrameDirection):
    """
    When it receives a TTSAudioRawFrame, makes the llm emit a `on_audio_message_from_llm` event with the content
    of the frame.
    """
    if isinstance(frame, TTSAudioRawFrame):
        await self._emitter_callback("on_audio_message_from_llm", {"delta": frame.audio})
start(frame) async #

Starts the transport's resources.

Source code in plugins/intentional-pipecat/src/intentional_pipecat/transport.py
73
74
75
76
77
async def start(self, frame: StartFrame):
    """
    Starts the transport's resources.
    """
    await super().start(frame)
AudioTransport #

Bases: BaseTransport

Pipecat transport class implementation that is compatible with Intentional (supports audio only).

This class is a simple wrapper around AudioInputTransport and AudioOutputTransport, that makes sure both classes receive the same parameters at initialization.

Source code in plugins/intentional-pipecat/src/intentional_pipecat/transport.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
class AudioTransport(BaseTransport):
    """
    Pipecat transport class implementation that is compatible with Intentional (supports audio only).

    This class is a simple wrapper around AudioInputTransport and AudioOutputTransport, that makes sure both classes
    receive the same parameters at initialization.
    """

    def __init__(self, params: TransportParams, emitter_callback: Callable):
        super().__init__(params)
        self._emitter_callback = emitter_callback
        self._params = params
        self._input = AudioInputTransport(self._params)
        self._output = AudioOutputTransport(self._params, self._emitter_callback)

    def input(self) -> AudioInputTransport:
        return self._input

    def output(self) -> AudioOutputTransport:
        return self._output