Bloqueo de Threads
El sistema de bloqueo previene que multiples agentes envien mensajes simultaneamente al mismo candidato, evitando confusiones y mensajes duplicados.
Por que es Necesario
Problema sin Bloqueo
Agente A: "Hola! Necesito tus documentos"
Agente B: "Hola! Tienes una evaluacion pendiente"
Agente A: "Por favor sube tu INE"
Agente B: "El test tiene 30 preguntas"
Candidato: "??? Con quien hablo?"
Solucion con Bloqueo
Agente A adquiere bloqueo
Agente A: "Hola! Necesito tus documentos"
Agente A: "Por favor sube tu INE"
Agente B: (mensaje encolado - thread bloqueado)
Agente A libera bloqueo
Agente B: "Ahora tienes una evaluacion pendiente"
Como Funciona
Campos en Thread
Cada thread tiene campos para control de bloqueo:
| Campo | Tipo | Descripcion |
|---|---|---|
| activeExecutionId | number | ID de la ejecucion que tiene el bloqueo |
| executionLockedAt | datetime | Cuando se adquirio el bloqueo |
Ciclo de Vida del Bloqueo
1. Ejecucion inicia
↓
2. Intenta enviar mensaje
↓
3. Verifica si thread esta libre
↓
├── SI: Adquiere bloqueo
│ ↓
│ Envia mensaje
│ ↓
│ (continua con bloqueo)
│
└── NO: Mensaje se encola
↓
Espera liberacion
↓
Reintenta
Adquirir Bloqueo
Proceso de Adquisicion
// MessageService.acquireThreadLock()
1. Verificar bloqueo actual
- Si no hay bloqueo → adquirir
- Si bloqueo propio → refrescar timestamp
- Si bloqueo ajeno → verificar si expiro
2. Si expiro (>10 min) → limpiar y adquirir
3. Si no expiro → rechazar
Timeout de Bloqueo
Los bloqueos expiran automaticamente despues de 10 minutos de inactividad.
Proposito: Evitar bloqueos huerfanos si una ejecucion falla sin liberar.
const LOCK_TIMEOUT_MS = 10 * 60 * 1000; // 10 minutos
// Si el bloqueo es mas viejo que el timeout, se considera invalido
if (lockAge > LOCK_TIMEOUT_MS) {
// Liberar bloqueo stale
await releaseThreadLock(threadId, oldExecutionId);
}
Liberar Bloqueo
Liberacion Automatica
El bloqueo se libera automaticamente cuando:
| Evento | Momento |
|---|---|
| Ejecucion completa | Al finalizar exitosamente |
| Ejecucion falla | Al detectar error |
| Timeout | Despues de 10 minutos |
Liberacion en Codigo
// AgentGraph.finalizeExecution()
// Release any thread locks held by this execution
const lockedThreads = await threadRepo.find({
where: { activeExecutionId: executionId }
});
if (lockedThreads.length > 0) {
await threadRepo.update(
{ activeExecutionId: executionId },
{ activeExecutionId: null, executionLockedAt: null }
);
}
Herencia de Bloqueo
Cuando Aplica
Cuando un agente padre llama a un hijo con waitForCompletion=true, el hijo puede heredar el bloqueo.
Sin Herencia
Padre tiene bloqueo (exec 100)
↓
Padre llama a Hijo (waitForCompletion=false)
↓
Hijo (exec 101) intenta enviar
↓
Thread bloqueado por exec 100
↓
Mensaje encolado
Con Herencia
Padre tiene bloqueo (exec 100)
↓
Padre llama a Hijo (waitForCompletion=true)
↓
Hijo (exec 101) intenta enviar
↓
Verifica: Es hijo de 100? SI
Verifica: canInheritLock? SI
↓
Hijo hereda bloqueo
↓
Mensaje enviado exitosamente
Configuracion de Herencia
// Al crear ejecucion hijo
eventData: {
parentExecutionId: execution.id,
canInheritLock: waitForCompletion // true si padre espera
}
Verificacion de Herencia
// MessageService.isChildOfExecution()
const childExecution = await executionRepo.findOne({
where: { id: childExecutionId }
});
const eventData = childExecution.eventData;
const isChild = eventData.parentExecutionId === parentExecutionId;
const canInherit = eventData.canInheritLock === true;
return isChild && canInherit;
Cola de Mensajes
Cuando se Encola
Si una ejecucion intenta enviar pero el thread esta bloqueado por otra:
if (currentLock && currentLock !== executionId && !canInheritLock) {
// Encolar mensaje para envio posterior
await agentQueue.scheduleMessageWait({
executionId,
threadId,
message,
channel,
// ...
});
return { success: true, queued: true };
}
Procesamiento de Cola
Cuando el thread se libera:
- Sistema detecta liberacion
- Busca mensajes encolados para ese thread
- Procesa mensajes en orden FIFO
- Cada mensaje adquiere bloqueo temporalmente
Escenarios de Uso
Escenario 1: Agentes Independientes
Thread: Candidato Juan
Agente A: Solicitar documentos (14:00)
Agente B: Enviar evaluacion (14:01)
Resultado:
14:00 - Agente A adquiere bloqueo
14:00 - Agente A envia "Necesito tus documentos"
14:01 - Agente B intenta enviar (bloqueado)
14:01 - Mensaje B encolado
14:02 - Agente A libera bloqueo
14:02 - Mensaje B se envia
Escenario 2: Agentes Encadenados
Thread: Candidato Maria
Coordinador → Recolector (waitForCompletion=true)
Resultado:
14:00 - Coordinador adquiere bloqueo
14:00 - Coordinador: "Iniciamos tu proceso"
14:01 - Coordinador dispara Recolector
14:01 - Recolector hereda bloqueo
14:01 - Recolector: "Necesito tu INE"
14:05 - Recolector espera respuesta
14:30 - Maria sube INE
14:30 - Recolector: "Documento recibido"
14:30 - Recolector termina, devuelve bloqueo
14:30 - Coordinador: "Siguiente paso..."
Escenario 3: Timeout de Bloqueo
Thread: Candidato Pedro
Agente A: Se cuelga durante ejecucion
Resultado:
14:00 - Agente A adquiere bloqueo
14:00 - Agente A envia mensaje
14:01 - Agente A tiene error interno (no libera)
...
14:10 - Bloqueo expira (10 min)
14:11 - Agente B intenta enviar
14:11 - Sistema detecta bloqueo stale
14:11 - Sistema libera bloqueo de A
14:11 - Agente B adquiere bloqueo
14:11 - Agente B envia mensaje
Monitoreo
Verificar Bloqueos Activos
SELECT
ct.id as threadId,
ct.activeExecutionId,
ct.executionLockedAt,
TIMESTAMPDIFF(MINUTE, ct.executionLockedAt, NOW()) as lockMinutes,
u.name as candidateName
FROM candidate_thread ct
JOIN user u ON ct.candidateId = u.id
WHERE ct.activeExecutionId IS NOT NULL
ORDER BY ct.executionLockedAt;
Detectar Bloqueos Stale
SELECT
ct.id as threadId,
ct.activeExecutionId,
ct.executionLockedAt,
TIMESTAMPDIFF(MINUTE, ct.executionLockedAt, NOW()) as lockMinutes
FROM candidate_thread ct
WHERE ct.activeExecutionId IS NOT NULL
AND ct.executionLockedAt < NOW() - INTERVAL 10 MINUTE;
Limpiar Bloqueos Huerfanos
UPDATE candidate_thread
SET activeExecutionId = NULL, executionLockedAt = NULL
WHERE activeExecutionId IS NOT NULL
AND executionLockedAt < NOW() - INTERVAL 10 MINUTE;
Debugging
Logs Importantes
[MessageService] Thread 123 locked by execution 100, cannot acquire for 101
[MessageService] Thread 123 lock released by execution 100
[MessageService] Lock on thread 123 is stale (650000ms), clearing...
[MessageService] Execution 101 inheriting lock from parent 100 on thread 123
[AgentGraph] Released locks on 2 thread(s) for execution 100
Problemas Comunes
| Problema | Causa | Solucion |
|---|---|---|
| Mensaje nunca se envia | Bloqueo nunca se libera | Verificar finalizacion de ejecucion |
| Hijo no puede enviar | No hereda bloqueo | Usar waitForCompletion=true |
| Bloqueo persiste | Ejecucion fallida | Esperar timeout o limpiar manual |
| Mensajes desordenados | Cola no procesa FIFO | Verificar procesamiento de cola |
Proximos Pasos
- Agentes Encadenados - Parent-child sync
- Mensajes de Agente - Atribucion visual
- Logs de Ejecuciones - Debugging