Workflows (Orchestration)
Workflows are components in charge of coordinating the different system functionalities to complete complex business tasks.
RecordingWorkflow
Manages the entire recording process, from initial capture to final transcription.
v2m.orchestration.recording_workflow.RecordingWorkflow
Orquestador para el flujo de grabación y transcripción asíncrona.
Source code in apps/daemon/backend/src/v2m/orchestration/recording_workflow.py
| class RecordingWorkflow:
"""Orquestador para el flujo de grabación y transcripción asíncrona."""
def __init__(self, broadcast_fn: BroadcastFn | None = None) -> None:
"""Inicializa el workflow.
Args:
broadcast_fn: Función opcional para emitir actualizaciones de estado.
"""
self._is_recording = False
self._model_loaded = False
self._broadcast_fn = broadcast_fn
self._worker: PersistentWhisperWorker | None = None
self._recorder: AudioRecorder | None = None
self._transcriber: StreamingTranscriber | None = None
self._clipboard: LinuxClipboardAdapter | None = None
self._notifications: LinuxNotificationService | None = None
@property
def worker(self):
if self._worker is None:
from v2m.features.transcription.persistent_model import PersistentWhisperWorker
whisper_cfg = config.transcription.whisper
self._worker = PersistentWhisperWorker(
model_size=whisper_cfg.model,
device=whisper_cfg.device,
compute_type=whisper_cfg.compute_type,
device_index=whisper_cfg.device_index,
num_workers=whisper_cfg.num_workers,
keep_warm=whisper_cfg.keep_warm,
)
return self._worker
@property
def recorder(self):
if self._recorder is None:
from v2m.features.audio.recorder import AudioRecorder
whisper_cfg = config.transcription.whisper
self._recorder = AudioRecorder(
sample_rate=16000,
channels=1,
device_index=whisper_cfg.audio_device_index,
)
return self._recorder
@property
def transcriber(self):
if self._transcriber is None:
from v2m.features.audio.streaming_transcriber import StreamingTranscriber
session_adapter = WebSocketSessionAdapter(self._broadcast_fn)
self._transcriber = StreamingTranscriber(
worker=self.worker,
session_manager=session_adapter,
recorder=self.recorder,
)
return self._transcriber
@property
def clipboard(self):
if self._clipboard is None:
from v2m.features.desktop.linux_adapters import LinuxClipboardAdapter
self._clipboard = LinuxClipboardAdapter()
return self._clipboard
@property
def notifications(self):
if self._notifications is None:
from v2m.features.desktop.notification_service import LinuxNotificationService
self._notifications = LinuxNotificationService()
return self._notifications
async def warmup(self) -> None:
if self._model_loaded:
return
try:
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, self.worker.initialize_sync)
self._model_loaded = True
logger.info("✅ Modelo Whisper precargado en VRAM")
except Exception as e:
logger.error(f"❌ Error en warmup del modelo: {e}")
async def toggle(self) -> "ToggleResponse":
if not self._is_recording:
return await self.start()
return await self.stop()
async def start(self) -> "ToggleResponse":
from v2m.api.schemas import ToggleResponse
if self._is_recording:
return ToggleResponse(status="recording", message="⚠️ Ya está grabando")
try:
await self.transcriber.start()
self._is_recording = True
config.paths.recording_flag.touch()
self.notifications.notify("🎤 voice2machine", "grabación iniciada...")
logger.info("🎙️ Grabación iniciada")
return ToggleResponse(status="recording", message="🎙️ Grabando...")
except Exception as e:
logger.error(f"Error iniciando grabación: {e}")
return ToggleResponse(status="error", message=f"❌ Error: {e}")
async def stop(self) -> "ToggleResponse":
from v2m.api.schemas import ToggleResponse
if not self._is_recording:
return ToggleResponse(status="idle", message="⚠️ No hay grabación en curso")
try:
self._is_recording = False
if config.paths.recording_flag.exists():
config.paths.recording_flag.unlink()
self.notifications.notify("⚡ v2m procesando", "procesando...")
transcription = await self.transcriber.stop()
if not transcription or not transcription.strip():
# Diagnóstico mejorado: reportar estado de la cola y duración de grabación
try:
queue_size = self.transcriber._audio_queue.qsize()
logger.warning(
f"Transcripción vacía: audio_queue_size={queue_size}, "
f"verificar logs de VAD y Whisper para diagnóstico detallado"
)
except Exception as diag_err:
logger.debug(f"Error obteniendo diagnóstico: {diag_err}")
self.notifications.notify("❌ whisper", "no se detectó voz en el audio")
return ToggleResponse(status="idle", message="❌ No se detectó voz", text=None)
self.clipboard.copy(transcription)
preview = transcription[:80]
self.notifications.notify("✅ whisper - copiado", f"{preview}...")
logger.info(f"✅ Transcripción completada: {len(transcription)} chars")
return ToggleResponse(status="idle", message="✅ Copiado al portapapeles", text=transcription)
except Exception as e:
logger.error(f"Error deteniendo grabación: {e}")
self._is_recording = False
return ToggleResponse(status="error", message=f"❌ Error: {e}")
def get_status(self) -> "StatusResponse":
from v2m.api.schemas import StatusResponse
state = "recording" if self._is_recording else "idle"
return StatusResponse(state=state, recording=self._is_recording, model_loaded=self._model_loaded)
async def shutdown(self) -> None:
"""Apaga el workflow, deteniendo grabaciones y descargando modelos."""
if self._is_recording:
with contextlib.suppress(Exception):
await self.stop()
if self._worker:
with contextlib.suppress(Exception):
await self._worker.unload()
if self._notifications:
self._notifications.shutdown(wait=False)
|
__init__(broadcast_fn=None)
Inicializa el workflow.
Parameters:
| Name |
Type |
Description |
Default |
broadcast_fn
|
BroadcastFn | None
|
Función opcional para emitir actualizaciones de estado.
|
None
|
Source code in apps/daemon/backend/src/v2m/orchestration/recording_workflow.py
| def __init__(self, broadcast_fn: BroadcastFn | None = None) -> None:
"""Inicializa el workflow.
Args:
broadcast_fn: Función opcional para emitir actualizaciones de estado.
"""
self._is_recording = False
self._model_loaded = False
self._broadcast_fn = broadcast_fn
self._worker: PersistentWhisperWorker | None = None
self._recorder: AudioRecorder | None = None
self._transcriber: StreamingTranscriber | None = None
self._clipboard: LinuxClipboardAdapter | None = None
self._notifications: LinuxNotificationService | None = None
|
shutdown()
async
Apaga el workflow, deteniendo grabaciones y descargando modelos.
Source code in apps/daemon/backend/src/v2m/orchestration/recording_workflow.py
| async def shutdown(self) -> None:
"""Apaga el workflow, deteniendo grabaciones y descargando modelos."""
if self._is_recording:
with contextlib.suppress(Exception):
await self.stop()
if self._worker:
with contextlib.suppress(Exception):
await self._worker.unload()
if self._notifications:
self._notifications.shutdown(wait=False)
|
options:
show_source: true
LLMWorkflow
Coordinates text processing using language providers (LLM), including refinement and translation.
v2m.orchestration.llm_workflow.LLMWorkflow
Orquestador para el refinamiento y traducción de texto mediante LLM.
Source code in apps/daemon/backend/src/v2m/orchestration/llm_workflow.py
| class LLMWorkflow:
"""Orquestador para el refinamiento y traducción de texto mediante LLM."""
def __init__(self) -> None:
"""Inicializa el workflow de LLM."""
self._llm_service: Any | None = None
self._clipboard: LinuxClipboardAdapter | None = None
self._notifications: LinuxNotificationService | None = None
@property
def clipboard(self):
"""Adaptador de portapapeles (Linux)."""
if self._clipboard is None:
from v2m.features.desktop.linux_adapters import LinuxClipboardAdapter
self._clipboard = LinuxClipboardAdapter()
return self._clipboard
@property
def notifications(self):
"""Servicio de notificaciones del sistema."""
if self._notifications is None:
from v2m.features.desktop.notification_service import LinuxNotificationService
self._notifications = LinuxNotificationService()
return self._notifications
@property
def llm_service(self) -> Any:
"""Servicio LLM configurado (Gemini, Ollama o Local)."""
if self._llm_service is None:
backend = config.llm.backend
if backend == "gemini":
from v2m.features.llm.gemini_service import GeminiLLMService
self._llm_service = GeminiLLMService()
elif backend == "ollama":
from v2m.features.llm.ollama_service import OllamaLLMService
self._llm_service = OllamaLLMService()
else:
from v2m.features.llm.local_service import LocalLLMService
self._llm_service = LocalLLMService()
logger.info(f"LLM backend inicializado: {backend}")
return self._llm_service
async def process_text(self, text: str) -> "LLMResponse":
"""Refina el texto usando el LLM y lo copia al portapapeles."""
from v2m.api.schemas import LLMResponse
backend_name = config.llm.backend
try:
if asyncio.iscoroutinefunction(self.llm_service.process_text):
refined = await self.llm_service.process_text(text)
else:
refined = await asyncio.to_thread(self.llm_service.process_text, text)
self.clipboard.copy(refined)
self.notifications.notify(f"✅ {backend_name} - copiado", f"{refined[:80]}...")
return LLMResponse(text=refined, backend=backend_name)
except Exception as e:
logger.error(f"Error procesando texto con {backend_name}: {e}")
self.clipboard.copy(text)
self.notifications.notify(f"⚠️ {backend_name} falló", "usando texto original...")
return LLMResponse(text=text, backend=f"{backend_name} (fallback)")
async def translate_text(self, text: str, target_lang: str) -> "LLMResponse":
"""Traduce el texto al idioma especificado usando el LLM."""
from v2m.api.schemas import LLMResponse
backend_name = config.llm.backend
if not re.match(r"^[a-zA-Z\s\-]{2,20}$", target_lang):
logger.warning(f"Idioma inválido: {target_lang}")
self.notifications.notify("❌ Error", "Idioma de destino inválido")
return LLMResponse(text=text, backend="error")
try:
if asyncio.iscoroutinefunction(self.llm_service.translate_text):
translated = await self.llm_service.translate_text(text, target_lang)
else:
translated = await asyncio.to_thread(self.llm_service.translate_text, text, target_lang)
self.clipboard.copy(translated)
self.notifications.notify(f"✅ Traducción ({target_lang})", f"{translated[:80]}...")
return LLMResponse(text=translated, backend=backend_name)
except Exception as e:
logger.error(f"Error traduciendo con {backend_name}: {e}")
self.notifications.notify("❌ Error traducción", "Fallo al traducir")
return LLMResponse(text=text, backend=f"{backend_name} (error)")
|
clipboard
property
Adaptador de portapapeles (Linux).
llm_service
property
Servicio LLM configurado (Gemini, Ollama o Local).
notifications
property
Servicio de notificaciones del sistema.
__init__()
Inicializa el workflow de LLM.
Source code in apps/daemon/backend/src/v2m/orchestration/llm_workflow.py
| def __init__(self) -> None:
"""Inicializa el workflow de LLM."""
self._llm_service: Any | None = None
self._clipboard: LinuxClipboardAdapter | None = None
self._notifications: LinuxNotificationService | None = None
|
process_text(text)
async
Refina el texto usando el LLM y lo copia al portapapeles.
Source code in apps/daemon/backend/src/v2m/orchestration/llm_workflow.py
| async def process_text(self, text: str) -> "LLMResponse":
"""Refina el texto usando el LLM y lo copia al portapapeles."""
from v2m.api.schemas import LLMResponse
backend_name = config.llm.backend
try:
if asyncio.iscoroutinefunction(self.llm_service.process_text):
refined = await self.llm_service.process_text(text)
else:
refined = await asyncio.to_thread(self.llm_service.process_text, text)
self.clipboard.copy(refined)
self.notifications.notify(f"✅ {backend_name} - copiado", f"{refined[:80]}...")
return LLMResponse(text=refined, backend=backend_name)
except Exception as e:
logger.error(f"Error procesando texto con {backend_name}: {e}")
self.clipboard.copy(text)
self.notifications.notify(f"⚠️ {backend_name} falló", "usando texto original...")
return LLMResponse(text=text, backend=f"{backend_name} (fallback)")
|
translate_text(text, target_lang)
async
Traduce el texto al idioma especificado usando el LLM.
Source code in apps/daemon/backend/src/v2m/orchestration/llm_workflow.py
| async def translate_text(self, text: str, target_lang: str) -> "LLMResponse":
"""Traduce el texto al idioma especificado usando el LLM."""
from v2m.api.schemas import LLMResponse
backend_name = config.llm.backend
if not re.match(r"^[a-zA-Z\s\-]{2,20}$", target_lang):
logger.warning(f"Idioma inválido: {target_lang}")
self.notifications.notify("❌ Error", "Idioma de destino inválido")
return LLMResponse(text=text, backend="error")
try:
if asyncio.iscoroutinefunction(self.llm_service.translate_text):
translated = await self.llm_service.translate_text(text, target_lang)
else:
translated = await asyncio.to_thread(self.llm_service.translate_text, text, target_lang)
self.clipboard.copy(translated)
self.notifications.notify(f"✅ Traducción ({target_lang})", f"{translated[:80]}...")
return LLMResponse(text=translated, backend=backend_name)
except Exception as e:
logger.error(f"Error traduciendo con {backend_name}: {e}")
self.notifications.notify("❌ Error traducción", "Fallo al traducir")
return LLMResponse(text=text, backend=f"{backend_name} (error)")
|
options:
show_source: true