Saltar a contenido

Workflows (Orquestación)

Los Workflows son los componentes encargados de coordinar las diferentes funcionalidades del sistema para completar tareas complejas de negocio.


RecordingWorkflow

Gestiona el proceso completo de grabación, desde la captura inicial hasta la transcripción final.

v2m.orchestration.recording_workflow.RecordingWorkflow

Orquestador para el flujo de grabación y transcripción asíncrona.

Source code in apps/daemon/backend/src/v2m/orchestration/recording_workflow.py
class RecordingWorkflow:
    """Orquestador para el flujo de grabación y transcripción asíncrona."""

    def __init__(self, broadcast_fn: BroadcastFn | None = None) -> None:
        """Inicializa el workflow.

        Args:
            broadcast_fn: Función opcional para emitir actualizaciones de estado.
        """
        self._is_recording = False
        self._model_loaded = False
        self._broadcast_fn = broadcast_fn

        self._worker: PersistentWhisperWorker | None = None
        self._recorder: AudioRecorder | None = None
        self._transcriber: StreamingTranscriber | None = None
        self._clipboard: LinuxClipboardAdapter | None = None
        self._notifications: LinuxNotificationService | None = None

    @property
    def worker(self):
        if self._worker is None:
            from v2m.features.transcription.persistent_model import PersistentWhisperWorker

            whisper_cfg = config.transcription.whisper
            self._worker = PersistentWhisperWorker(
                model_size=whisper_cfg.model,
                device=whisper_cfg.device,
                compute_type=whisper_cfg.compute_type,
                device_index=whisper_cfg.device_index,
                num_workers=whisper_cfg.num_workers,
                keep_warm=whisper_cfg.keep_warm,
            )
        return self._worker

    @property
    def recorder(self):
        if self._recorder is None:
            from v2m.features.audio.recorder import AudioRecorder

            whisper_cfg = config.transcription.whisper
            self._recorder = AudioRecorder(
                sample_rate=16000,
                channels=1,
                device_index=whisper_cfg.audio_device_index,
            )
        return self._recorder

    @property
    def transcriber(self):
        if self._transcriber is None:
            from v2m.features.audio.streaming_transcriber import StreamingTranscriber

            session_adapter = WebSocketSessionAdapter(self._broadcast_fn)
            self._transcriber = StreamingTranscriber(
                worker=self.worker,
                session_manager=session_adapter,
                recorder=self.recorder,
            )
        return self._transcriber

    @property
    def clipboard(self):
        if self._clipboard is None:
            from v2m.features.desktop.linux_adapters import LinuxClipboardAdapter

            self._clipboard = LinuxClipboardAdapter()
        return self._clipboard

    @property
    def notifications(self):
        if self._notifications is None:
            from v2m.features.desktop.notification_service import LinuxNotificationService

            self._notifications = LinuxNotificationService()
        return self._notifications

    async def warmup(self) -> None:
        if self._model_loaded:
            return
        try:
            loop = asyncio.get_running_loop()
            await loop.run_in_executor(None, self.worker.initialize_sync)
            self._model_loaded = True
            logger.info("✅ Modelo Whisper precargado en VRAM")
        except Exception as e:
            logger.error(f"❌ Error en warmup del modelo: {e}")

    async def toggle(self) -> "ToggleResponse":
        if not self._is_recording:
            return await self.start()
        return await self.stop()

    async def start(self) -> "ToggleResponse":
        from v2m.api.schemas import ToggleResponse

        if self._is_recording:
            return ToggleResponse(status="recording", message="⚠️ Ya está grabando")
        try:
            await self.transcriber.start()
            self._is_recording = True
            config.paths.recording_flag.touch()
            self.notifications.notify("🎤 voice2machine", "grabación iniciada...")
            logger.info("🎙️ Grabación iniciada")
            return ToggleResponse(status="recording", message="🎙️ Grabando...")
        except Exception as e:
            logger.error(f"Error iniciando grabación: {e}")
            return ToggleResponse(status="error", message=f"❌ Error: {e}")

    async def stop(self) -> "ToggleResponse":
        from v2m.api.schemas import ToggleResponse

        if not self._is_recording:
            return ToggleResponse(status="idle", message="⚠️ No hay grabación en curso")
        try:
            self._is_recording = False
            if config.paths.recording_flag.exists():
                config.paths.recording_flag.unlink()
            self.notifications.notify("⚡ v2m procesando", "procesando...")
            transcription = await self.transcriber.stop()
            if not transcription or not transcription.strip():
                # Diagnóstico mejorado: reportar estado de la cola y duración de grabación
                try:
                    queue_size = self.transcriber._audio_queue.qsize()
                    logger.warning(
                        f"Transcripción vacía: audio_queue_size={queue_size}, "
                        f"verificar logs de VAD y Whisper para diagnóstico detallado"
                    )
                except Exception as diag_err:
                    logger.debug(f"Error obteniendo diagnóstico: {diag_err}")

                self.notifications.notify("❌ whisper", "no se detectó voz en el audio")
                return ToggleResponse(status="idle", message="❌ No se detectó voz", text=None)
            self.clipboard.copy(transcription)
            preview = transcription[:80]
            self.notifications.notify("✅ whisper - copiado", f"{preview}...")
            logger.info(f"✅ Transcripción completada: {len(transcription)} chars")
            return ToggleResponse(status="idle", message="✅ Copiado al portapapeles", text=transcription)
        except Exception as e:
            logger.error(f"Error deteniendo grabación: {e}")
            self._is_recording = False
            return ToggleResponse(status="error", message=f"❌ Error: {e}")

    def get_status(self) -> "StatusResponse":
        from v2m.api.schemas import StatusResponse

        state = "recording" if self._is_recording else "idle"
        return StatusResponse(state=state, recording=self._is_recording, model_loaded=self._model_loaded)

    async def shutdown(self) -> None:
        """Apaga el workflow, deteniendo grabaciones y descargando modelos."""
        if self._is_recording:
            with contextlib.suppress(Exception):
                await self.stop()

        if self._worker:
            with contextlib.suppress(Exception):
                await self._worker.unload()

        if self._notifications:
            self._notifications.shutdown(wait=False)

__init__(broadcast_fn=None)

Inicializa el workflow.

Parameters:

Name Type Description Default
broadcast_fn BroadcastFn | None

Función opcional para emitir actualizaciones de estado.

None
Source code in apps/daemon/backend/src/v2m/orchestration/recording_workflow.py
def __init__(self, broadcast_fn: BroadcastFn | None = None) -> None:
    """Inicializa el workflow.

    Args:
        broadcast_fn: Función opcional para emitir actualizaciones de estado.
    """
    self._is_recording = False
    self._model_loaded = False
    self._broadcast_fn = broadcast_fn

    self._worker: PersistentWhisperWorker | None = None
    self._recorder: AudioRecorder | None = None
    self._transcriber: StreamingTranscriber | None = None
    self._clipboard: LinuxClipboardAdapter | None = None
    self._notifications: LinuxNotificationService | None = None

shutdown() async

Apaga el workflow, deteniendo grabaciones y descargando modelos.

Source code in apps/daemon/backend/src/v2m/orchestration/recording_workflow.py
async def shutdown(self) -> None:
    """Apaga el workflow, deteniendo grabaciones y descargando modelos."""
    if self._is_recording:
        with contextlib.suppress(Exception):
            await self.stop()

    if self._worker:
        with contextlib.suppress(Exception):
            await self._worker.unload()

    if self._notifications:
        self._notifications.shutdown(wait=False)

options: show_source: true


LLMWorkflow

Coordina el procesamiento de texto mediante proveedores de lenguaje (LLM), incluyendo refinamiento y traducción.

v2m.orchestration.llm_workflow.LLMWorkflow

Orquestador para el refinamiento y traducción de texto mediante LLM.

Source code in apps/daemon/backend/src/v2m/orchestration/llm_workflow.py
class LLMWorkflow:
    """Orquestador para el refinamiento y traducción de texto mediante LLM."""

    def __init__(self) -> None:
        """Inicializa el workflow de LLM."""
        self._llm_service: Any | None = None
        self._clipboard: LinuxClipboardAdapter | None = None
        self._notifications: LinuxNotificationService | None = None

    @property
    def clipboard(self):
        """Adaptador de portapapeles (Linux)."""
        if self._clipboard is None:
            from v2m.features.desktop.linux_adapters import LinuxClipboardAdapter

            self._clipboard = LinuxClipboardAdapter()
        return self._clipboard

    @property
    def notifications(self):
        """Servicio de notificaciones del sistema."""
        if self._notifications is None:
            from v2m.features.desktop.notification_service import LinuxNotificationService

            self._notifications = LinuxNotificationService()
        return self._notifications

    @property
    def llm_service(self) -> Any:
        """Servicio LLM configurado (Gemini, Ollama o Local)."""
        if self._llm_service is None:
            backend = config.llm.backend
            if backend == "gemini":
                from v2m.features.llm.gemini_service import GeminiLLMService

                self._llm_service = GeminiLLMService()
            elif backend == "ollama":
                from v2m.features.llm.ollama_service import OllamaLLMService

                self._llm_service = OllamaLLMService()
            else:
                from v2m.features.llm.local_service import LocalLLMService

                self._llm_service = LocalLLMService()
            logger.info(f"LLM backend inicializado: {backend}")
        return self._llm_service

    async def process_text(self, text: str) -> "LLMResponse":
        """Refina el texto usando el LLM y lo copia al portapapeles."""
        from v2m.api.schemas import LLMResponse

        backend_name = config.llm.backend
        try:
            if asyncio.iscoroutinefunction(self.llm_service.process_text):
                refined = await self.llm_service.process_text(text)
            else:
                refined = await asyncio.to_thread(self.llm_service.process_text, text)
            self.clipboard.copy(refined)
            self.notifications.notify(f"✅ {backend_name} - copiado", f"{refined[:80]}...")
            return LLMResponse(text=refined, backend=backend_name)
        except Exception as e:
            logger.error(f"Error procesando texto con {backend_name}: {e}")
            self.clipboard.copy(text)
            self.notifications.notify(f"⚠️ {backend_name} falló", "usando texto original...")
            return LLMResponse(text=text, backend=f"{backend_name} (fallback)")

    async def translate_text(self, text: str, target_lang: str) -> "LLMResponse":
        """Traduce el texto al idioma especificado usando el LLM."""
        from v2m.api.schemas import LLMResponse

        backend_name = config.llm.backend
        if not re.match(r"^[a-zA-Z\s\-]{2,20}$", target_lang):
            logger.warning(f"Idioma inválido: {target_lang}")
            self.notifications.notify("❌ Error", "Idioma de destino inválido")
            return LLMResponse(text=text, backend="error")
        try:
            if asyncio.iscoroutinefunction(self.llm_service.translate_text):
                translated = await self.llm_service.translate_text(text, target_lang)
            else:
                translated = await asyncio.to_thread(self.llm_service.translate_text, text, target_lang)
            self.clipboard.copy(translated)
            self.notifications.notify(f"✅ Traducción ({target_lang})", f"{translated[:80]}...")
            return LLMResponse(text=translated, backend=backend_name)
        except Exception as e:
            logger.error(f"Error traduciendo con {backend_name}: {e}")
            self.notifications.notify("❌ Error traducción", "Fallo al traducir")
            return LLMResponse(text=text, backend=f"{backend_name} (error)")

clipboard property

Adaptador de portapapeles (Linux).

llm_service property

Servicio LLM configurado (Gemini, Ollama o Local).

notifications property

Servicio de notificaciones del sistema.

__init__()

Inicializa el workflow de LLM.

Source code in apps/daemon/backend/src/v2m/orchestration/llm_workflow.py
def __init__(self) -> None:
    """Inicializa el workflow de LLM."""
    self._llm_service: Any | None = None
    self._clipboard: LinuxClipboardAdapter | None = None
    self._notifications: LinuxNotificationService | None = None

process_text(text) async

Refina el texto usando el LLM y lo copia al portapapeles.

Source code in apps/daemon/backend/src/v2m/orchestration/llm_workflow.py
async def process_text(self, text: str) -> "LLMResponse":
    """Refina el texto usando el LLM y lo copia al portapapeles."""
    from v2m.api.schemas import LLMResponse

    backend_name = config.llm.backend
    try:
        if asyncio.iscoroutinefunction(self.llm_service.process_text):
            refined = await self.llm_service.process_text(text)
        else:
            refined = await asyncio.to_thread(self.llm_service.process_text, text)
        self.clipboard.copy(refined)
        self.notifications.notify(f"✅ {backend_name} - copiado", f"{refined[:80]}...")
        return LLMResponse(text=refined, backend=backend_name)
    except Exception as e:
        logger.error(f"Error procesando texto con {backend_name}: {e}")
        self.clipboard.copy(text)
        self.notifications.notify(f"⚠️ {backend_name} falló", "usando texto original...")
        return LLMResponse(text=text, backend=f"{backend_name} (fallback)")

translate_text(text, target_lang) async

Traduce el texto al idioma especificado usando el LLM.

Source code in apps/daemon/backend/src/v2m/orchestration/llm_workflow.py
async def translate_text(self, text: str, target_lang: str) -> "LLMResponse":
    """Traduce el texto al idioma especificado usando el LLM."""
    from v2m.api.schemas import LLMResponse

    backend_name = config.llm.backend
    if not re.match(r"^[a-zA-Z\s\-]{2,20}$", target_lang):
        logger.warning(f"Idioma inválido: {target_lang}")
        self.notifications.notify("❌ Error", "Idioma de destino inválido")
        return LLMResponse(text=text, backend="error")
    try:
        if asyncio.iscoroutinefunction(self.llm_service.translate_text):
            translated = await self.llm_service.translate_text(text, target_lang)
        else:
            translated = await asyncio.to_thread(self.llm_service.translate_text, text, target_lang)
        self.clipboard.copy(translated)
        self.notifications.notify(f"✅ Traducción ({target_lang})", f"{translated[:80]}...")
        return LLMResponse(text=translated, backend=backend_name)
    except Exception as e:
        logger.error(f"Error traduciendo con {backend_name}: {e}")
        self.notifications.notify("❌ Error traducción", "Fallo al traducir")
        return LLMResponse(text=text, backend=f"{backend_name} (error)")

options: show_source: true