feat: framework de orquestracao Worker+Reviewer com paralelismo

- Worker via GitHub Models API (o4-mini por padrao, suporte a modelos de raciocinio)
- Reviewer via Claude Code CLI local (sem custo adicional de API)
- Orchestrator com retry automatico baseado em feedback do reviewer
- run_parallel: execucao simultanea de tasks independentes via ThreadPoolExecutor
- pipeline: steps sequenciais com injecao de output entre etapas
- utils.run_parallel: executor compartilhado com tratamento de excecoes
- PREREQUISITES.md com instrucoes de instalacao

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Jesse Freitas 2026-03-22 15:42:09 -03:00
commit a5a6748a72
12 changed files with 732 additions and 0 deletions

10
.env.example Normal file
View file

@ -0,0 +1,10 @@
# GitHub Personal Access Token com permissao models:read
# Gere em: github.com → Settings → Developer settings → Personal access tokens
GITHUB_TOKEN=ghp_seu_token_aqui
# Modelo worker (opcional - padrao: o4-mini)
# Opcoes: o4-mini | o3 | gpt-4.1 | gpt-4.1-nano
WORKER_MODEL=o4-mini
# Encoding UTF-8 no Windows (nao alterar)
PYTHONUTF8=1

10
.gitignore vendored Normal file
View file

@ -0,0 +1,10 @@
.env
__pycache__/
*.pyc
*.pyo
.venv/
venv/
.vscode/
*.egg-info/
dist/
build/

61
PREREQUISITES.md Normal file
View file

@ -0,0 +1,61 @@
# Pre-requisitos
## 1. Python 3.10+
```bash
python --version # deve ser >= 3.10
```
## 2. Claude Code CLI (Reviewer)
O Reviewer chama o Claude localmente via CLI — sem custo adicional de API.
```bash
# Instalar
npm install -g @anthropic-ai/claude-code
# Verificar
claude --version
```
> Requer conta Anthropic com acesso ao Claude Code.
## 3. GitHub Personal Access Token (Worker)
O Worker usa o GitHub Models API. Gere um token em:
`https://github.com/settings/tokens`**Fine-grained token** com permissao `models:read`.
## 4. Dependencias Python
```bash
pip install -r requirements.txt
```
## 5. Arquivo .env
Crie um `.env` na raiz do projeto (use `.env.example` como base):
```bash
cp .env.example .env
# edite e preencha GITHUB_TOKEN
```
```env
GITHUB_TOKEN=ghp_seu_token_aqui
WORKER_MODEL=o4-mini # opcional, este e o padrao
```
## Modelos disponiveis (GitHub Models)
| Modelo | Perfil |
|---|---|
| `o4-mini` | Melhor para codigo (padrao) |
| `o3` | Mais poderoso, mais lento |
| `gpt-4.1` | Rapido, sem raciocinio |
| `gpt-4.1-nano` | Mais barato, tasks simples |
## Verificacao rapida
```bash
python -c "from orchestrator import Orchestrator; print('OK')"
```

138
README.md Normal file
View file

@ -0,0 +1,138 @@
# Claude Codex Workers
Framework de orquestração de agentes IA com padrão **Worker → Reviewer**.
O Worker gera código via GitHub Models (modelos de raciocínio como `o4-mini`).
O Reviewer valida e aprova via Claude Code CLI — sem custo adicional de API.
## Como funciona
```
sua tarefa
└→ Worker (GitHub Models / o4-mini) ← geração barata e paralela
└→ Reviewer (Claude Code CLI) ← validação inteligente
├→ APROVADO → retorna resultado
└→ REPROVADO → feedback injetado → Worker reexecuta (retry)
```
## Instalação
Veja [PREREQUISITES.md](PREREQUISITES.md) para instalar as dependências do sistema.
```bash
pip install -r requirements.txt
cp .env.example .env
# edite .env e preencha GITHUB_TOKEN
```
## Uso
### Task simples com retry automático
```python
from orchestrator import Orchestrator
orq = Orchestrator()
result = orq.run(
task="Escreva uma função Python para validar CPF.",
system_prompt="Python sênior. Código limpo com type hints e docstring.",
review_criteria="Deve ter docstring, tratar edge cases e ser eficiente.",
)
print(result["approved"]) # True/False
print(result["attempts"]) # numero de tentativas
print(result["final_output"]) # codigo gerado e aprovado
```
### Múltiplas tasks em paralelo
```python
results = orq.run_parallel(
tasks=[
{"task": "Crie schema SQL para usuarios", "review_criteria": "Deve ter PKs e indexes"},
{"task": "Crie schema SQL para produtos", "review_criteria": "Deve ter PKs e indexes"},
{"task": "Crie schema SQL para pedidos", "review_criteria": "Deve ter FKs corretas"},
],
max_workers=3,
)
```
### Pipeline sequencial (output de um step alimenta o próximo)
```python
results = orq.pipeline([
{
"name": "Schema SQL",
"task": "Crie o schema para um sistema de vendas.",
"critical": True,
},
{
"name": "Models SQLAlchemy",
"task": "Crie os models para o schema.",
"use_previous_output": True,
},
{
"name": "Endpoints FastAPI",
"task": "Crie os endpoints CRUD.",
"use_previous_output": True,
},
])
```
### Worker direto (sem review)
```python
from orchestrator import Worker
worker = Worker(model="gpt-4.1-nano") # modelo mais barato para tasks simples
resultado = worker.run("Liste 5 boas práticas de segurança para APIs REST.")
```
### Claude direto para decisões
```python
from orchestrator import Reviewer
reviewer = Reviewer()
decisao = reviewer.decide(
question="Qual a melhor estratégia de cache para dados que mudam a cada 5 minutos?",
context="API FastAPI com Redis, ~10k req/min.",
)
```
## Modelos disponíveis
| Modelo | Perfil |
|---|---|
| `o4-mini` | **Melhor para código** — raciocínio interno (padrão) |
| `o3` | Mais poderoso, mais lento |
| `gpt-4.1` | Rápido, sem raciocínio, bom para tasks simples |
| `gpt-4.1-nano` | Mais barato, tasks triviais |
## Estrutura
```
orchestrator/
├── config.py # Configuração e env vars
├── worker.py # Worker via GitHub Models API
├── reviewer.py # Reviewer via Claude Code CLI
├── orchestrator.py # Coordena workflow, retry e pipeline
└── utils.py # Executor paralelo compartilhado
example.py # Exemplos de uso
PREREQUISITES.md # Dependências do sistema
```
## Economia estimada
Comparando 1.000 tasks/mês versus usar apenas Claude Sonnet:
| Abordagem | Custo estimado |
|---|---|
| 100% Claude Sonnet | ~$33/mês |
| Worker (o4-mini) + Reviewer (Claude CLI) | ~$715/mês |
| **Economia** | **5680%** |
## Licença
MIT

144
example.py Normal file
View file

@ -0,0 +1,144 @@
"""
Exemplos de uso do orquestrador.
"""
from orchestrator import Orchestrator, Worker, Reviewer
def exemplo_simples():
"""Worker gera codigo, Claude revisa."""
orq = Orchestrator()
result = orq.run(
task="Escreva uma funcao Python que valida se um CPF e valido.",
system_prompt="Voce e um desenvolvedor Python senior. Escreva codigo limpo e com docstring.",
review_criteria="O codigo deve: ter docstring, tratar edge cases, ser eficiente.",
)
print(f"Aprovado: {result['approved']}")
print(f"Tentativas: {result['attempts']}")
print(f"Feedback: {result['feedback']}")
print(f"\nOutput Final:\n{result['final_output']}")
def exemplo_paralelo_worker():
"""Multiplos workers rodando ao mesmo tempo (sem review)."""
worker = Worker() # o4-mini por padrao
tasks = [
"Escreva uma funcao Python para validar email.",
"Escreva uma funcao Python para validar CPF.",
"Escreva uma funcao Python para validar CNPJ.",
"Escreva uma funcao Python para validar numero de telefone BR.",
"Escreva uma funcao Python para validar CEP.",
]
print(f"Executando {len(tasks)} tasks em paralelo...")
resultados = worker.run_parallel(
tasks=tasks,
system_prompt="Python senior. Codigo limpo, com docstring e type hints.",
max_workers=5,
)
for i, resultado in enumerate(resultados):
print(f"\n--- Task {i + 1} ---")
print(resultado[:300] + "..." if len(resultado) > 300 else resultado)
def exemplo_paralelo_orquestrador():
"""Multiplas tasks independentes com Worker + Reviewer em paralelo."""
orq = Orchestrator()
tasks = [
{
"task": "Crie o schema SQL para tabela de usuarios.",
"system_prompt": "DBA senior, PostgreSQL.",
"review_criteria": "Deve ter PK, indexes em email e created_at, tipos corretos.",
},
{
"task": "Crie o schema SQL para tabela de produtos.",
"system_prompt": "DBA senior, PostgreSQL.",
"review_criteria": "Deve ter PK, index em nome, campo preco como NUMERIC.",
},
{
"task": "Crie o schema SQL para tabela de pedidos.",
"system_prompt": "DBA senior, PostgreSQL.",
"review_criteria": "Deve ter PK, FKs para usuarios e produtos, status como ENUM.",
},
]
print(f"Executando {len(tasks)} tasks com Worker+Reviewer em paralelo...")
results = orq.run_parallel(tasks=tasks, max_workers=3)
for i, r in enumerate(results):
status = "APROVADO" if r["approved"] else "REPROVADO"
print(f"\n--- Task {i + 1} [{status}] ({r['attempts']} tentativa(s)) ---")
print(r["final_output"][:400] + "..." if len(r["final_output"]) > 400 else r["final_output"])
def exemplo_pipeline():
"""Pipeline sequencial: output de um step alimenta o proximo."""
orq = Orchestrator()
steps = [
{
"name": "Schema SQL",
"task": "Crie o schema SQL para um sistema de vendas com clientes, produtos e pedidos.",
"system_prompt": "DBA senior, PostgreSQL.",
"review_criteria": "Deve ter PKs, FKs, indices e tipos corretos.",
"critical": True,
},
{
"name": "Models SQLAlchemy",
"task": "Crie os models SQLAlchemy para o schema criado.",
"system_prompt": "Use SQLAlchemy 2.0 com type hints.",
"use_previous_output": True,
},
{
"name": "Endpoints FastAPI",
"task": "Crie os endpoints CRUD em FastAPI para os models.",
"system_prompt": "Use FastAPI com async/await e Pydantic v2.",
"use_previous_output": True,
},
]
results = orq.pipeline(steps)
for r in results:
status = "OK" if r["approved"] else "FALHOU"
print(f"[{status}] Step {r['step']}: {r['step_name']} ({r['attempts']} tentativa(s))")
if r["feedback"]:
print(f" Feedback: {r['feedback']}")
def exemplo_worker_direto():
"""Worker direto sem review - modelo barato para tasks simples."""
worker = Worker(model="gpt-4.1-nano")
resultado = worker.run(
task="Liste 5 boas praticas de seguranca para APIs REST.",
system_prompt="Seja conciso e direto.",
)
print(resultado)
def exemplo_reviewer_direto():
"""Claude direto para decisoes complexas."""
reviewer = Reviewer()
decisao = reviewer.decide(
question="Qual e a melhor estrategia de cache para uma API com dados que mudam a cada 5 minutos?",
context="Temos uma API FastAPI com Redis disponivel, ~10k requisicoes/minuto.",
)
print(decisao)
if __name__ == "__main__":
print("=== Exemplo Simples ===")
exemplo_simples()
print("\n=== Exemplo Paralelo Worker ===")
exemplo_paralelo_worker()
print("\n=== Exemplo Paralelo Orquestrador ===")
exemplo_paralelo_orquestrador()

5
orchestrator/__init__.py Normal file
View file

@ -0,0 +1,5 @@
from .orchestrator import Orchestrator
from .worker import Worker
from .reviewer import Reviewer
__all__ = ["Orchestrator", "Worker", "Reviewer"]

26
orchestrator/config.py Normal file
View file

@ -0,0 +1,26 @@
"""
Configuração do orquestrador.
variáveis de ambiente do arquivo .env
"""
import os
import sys
from dotenv import load_dotenv
load_dotenv()
# Força UTF-8 no Windows
if sys.platform == "win32":
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace")
# Token GitHub Models
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN")
# Modelo worker padrão
WORKER_MODEL = os.getenv("WORKER_MODEL", "o4-mini")
# Validação
if not GITHUB_TOKEN:
raise EnvironmentError("GITHUB_TOKEN não definido no .env")

View file

@ -0,0 +1,131 @@
"""
Orquestrador principal.
Coordena workers (GitHub Models) e reviewer (Claude).
"""
from .worker import Worker
from .reviewer import Reviewer
from .utils import run_parallel
class Orchestrator:
def __init__(self, worker_model: str = None):
self.worker = Worker(model=worker_model)
self.reviewer = Reviewer()
def run(
self,
task: str,
system_prompt: str = None,
review_criteria: str = None,
max_retries: int = 2,
auto_retry: bool = True,
) -> dict:
"""
Executa uma tarefa completa: worker gera, Claude revisa.
Returns:
dict com: approved, feedback, worker_output, improved_output, final_output, attempts
"""
attempts = 0
feedback_context = ""
while attempts <= max_retries:
attempts += 1
task_with_feedback = task
if feedback_context:
task_with_feedback = (
f"{task}\n\n"
f"[Tentativa anterior foi reprovada. Corrija os seguintes problemas:]\n"
f"{feedback_context}"
)
worker_output = self.worker.run(task_with_feedback, system_prompt)
review = self.reviewer.review(task, worker_output, review_criteria)
review["attempts"] = attempts
review["worker_output"] = worker_output
if review["approved"]:
return review
if auto_retry and attempts <= max_retries:
feedback_context = review["feedback"]
continue
break
return review
def run_parallel(self, tasks: list[dict], max_workers: int = 5) -> list[dict]:
"""
Executa multiplas tasks independentes em paralelo (Worker + Reviewer cada uma).
Cada task e um dict com os mesmos parametros do run():
- task (str): obrigatorio
- system_prompt (str, opcional)
- review_criteria (str, opcional)
- max_retries (int, opcional)
- auto_retry (bool, opcional)
Args:
tasks: Lista de tasks independentes
max_workers: Chamadas simultaneas ao GitHub Models (padrao: 5)
Returns:
Lista de resultados na mesma ordem das tasks
"""
def _run_task(t: dict) -> dict:
return self.run(
task=t["task"],
system_prompt=t.get("system_prompt"),
review_criteria=t.get("review_criteria"),
max_retries=t.get("max_retries", 2),
auto_retry=t.get("auto_retry", True),
)
return run_parallel(_run_task, tasks, max_workers)
def pipeline(self, steps: list[dict]) -> list[dict]:
"""
Executa um pipeline de multiplos passos sequenciais.
Cada step e um dict com:
- task (str): a tarefa
- name (str, opcional): nome do step para logging
- system_prompt (str, opcional)
- review_criteria (str, opcional)
- use_previous_output (bool, opcional): injeta output anterior na tarefa
- critical (bool, opcional): aborta pipeline se reprovar
Returns:
Lista com resultado de cada step
"""
results = []
previous_output = None
for i, step in enumerate(steps):
task = step["task"]
if step.get("use_previous_output") and previous_output:
task = f"{task}\n\nInput do passo anterior:\n{previous_output}"
result = self.run(
task=task,
system_prompt=step.get("system_prompt"),
review_criteria=step.get("review_criteria"),
max_retries=step.get("max_retries", 2),
auto_retry=step.get("auto_retry", True),
)
result["step"] = i + 1
result["step_name"] = step.get("name", f"Step {i + 1}")
results.append(result)
previous_output = result["final_output"]
if not result["approved"] and step.get("critical", False):
print(f"[Pipeline] Step critico '{result['step_name']}' falhou. Abortando.")
break
return results

123
orchestrator/reviewer.py Normal file
View file

@ -0,0 +1,123 @@
"""
Reviewer usando Claude Code CLI (sem precisar de API key).
Chama o `claude` instalado localmente via subprocess.
"""
import subprocess
import shutil
REVIEW_SYSTEM_PROMPT = """Voce e um revisor tecnico experiente.
Sua funcao e analisar o output de um agente worker e:
1. Identificar problemas, bugs ou inconsistencias
2. Verificar se a tarefa foi cumprida corretamente
3. Sugerir melhorias se necessario
4. Aprovar ou reprovar o resultado
Seja objetivo e direto. Responda em portugues."""
def _find_claude() -> str:
"""Localiza o executavel do Claude Code CLI no PATH."""
path = shutil.which("claude") or shutil.which("claude.cmd")
if not path:
raise EnvironmentError(
"Claude Code CLI nao encontrado. "
"Instale em: https://claude.ai/code e certifique-se que esta no PATH."
)
return path
def _run_claude(prompt: str, claude_path: str) -> str:
try:
result = subprocess.run(
[claude_path, "-p", prompt],
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
shell=False,
)
except FileNotFoundError:
raise EnvironmentError(f"Claude CLI nao encontrado em: {claude_path}")
if result.returncode != 0:
raise RuntimeError(f"Claude CLI erro: {result.stderr}")
return result.stdout.strip()
class Reviewer:
def __init__(self):
self._claude_path = _find_claude()
def review(self, original_task: str, worker_output: str, criteria: str = None) -> dict:
"""
Revisa o output de um worker via Claude Code CLI.
Returns:
dict com: approved (bool), feedback (str), improved_output (str|None), final_output (str)
"""
prompt = f"""{REVIEW_SYSTEM_PROMPT}
## Tarefa Original
{original_task}
## Output do Worker
{worker_output}
"""
if criteria:
prompt += f"\n## Criterios de Avaliacao\n{criteria}\n"
prompt += """
## Sua Analise
Avalie o output acima. Responda exatamente no formato:
APROVADO: [SIM/NAO]
FEEDBACK: [seu feedback]
OUTPUT_CORRIGIDO: [versao corrigida se necessario, ou N/A se aprovado]"""
response = _run_claude(prompt, self._claude_path)
return self._parse_review(response, worker_output)
def _parse_review(self, response: str, original_output: str) -> dict:
lines = response.strip().split("\n")
approved = False
feedback = ""
improved_output = None
output_start = None
for i, line in enumerate(lines):
upper = line.upper()
if "APROVADO:" in upper:
approved = "SIM" in upper
elif line.upper().startswith("FEEDBACK:"):
feedback = line.split(":", 1)[1].strip()
elif "OUTPUT_CORRIGIDO:" in upper:
rest = line.split(":", 1)[1].strip()
output_start = i + 1
improved_output = rest
if output_start is not None:
extra = "\n".join(lines[output_start:]).strip()
if extra:
improved_output = f"{improved_output}\n{extra}".strip() if improved_output else extra
if improved_output and improved_output.strip().upper() == "N/A":
improved_output = None
# Sem marcadores de formato → Claude aprovou sem objecoes
if not feedback and improved_output is None:
approved = True
return {
"approved": approved,
"feedback": feedback,
"improved_output": improved_output,
"final_output": improved_output if improved_output else original_output,
}
def decide(self, question: str, context: str = None) -> str:
"""Usa Claude para tomar uma decisao ou responder uma pergunta complexa."""
prompt = question
if context:
prompt = f"Contexto:\n{context}\n\nPergunta:\n{question}"
return _run_claude(prompt, self._claude_path)

26
orchestrator/utils.py Normal file
View file

@ -0,0 +1,26 @@
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Callable
def run_parallel(fn: Callable, items: list, max_workers: int) -> list:
"""
Executa fn(item) para cada item em paralelo, preservando ordem.
Propaga a primeira excecao encontrada com contexto do indice.
"""
results = [None] * len(items)
errors = {}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(fn, item): i for i, item in enumerate(items)}
for future in as_completed(futures):
idx = futures[future]
try:
results[idx] = future.result()
except Exception as e:
errors[idx] = e
if errors:
first_idx, first_err = next(iter(sorted(errors.items())))
raise RuntimeError(f"Task {first_idx} falhou: {first_err}") from first_err
return results

52
orchestrator/worker.py Normal file
View file

@ -0,0 +1,52 @@
"""
Worker usando GitHub Models API (OpenAI-compatible).
Ideal para tarefas pesadas e repetitivas, consumindo menos tokens do Claude.
"""
from openai import OpenAI
from .config import GITHUB_TOKEN, WORKER_MODEL
from .utils import run_parallel
REASONING_MODELS = {"o1", "o1-mini", "o3", "o3-mini", "o4-mini", "o4"}
class Worker:
def __init__(self, model: str = None):
self.model = model or WORKER_MODEL
self.is_reasoning = any(self.model.startswith(m) for m in REASONING_MODELS)
self.client = OpenAI(
base_url="https://models.inference.ai.azure.com",
api_key=GITHUB_TOKEN,
)
def run(self, task: str, system_prompt: str = None, temperature: float = 0.3) -> str:
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.append({"role": "user", "content": task})
params = {"model": self.model, "messages": messages}
if not self.is_reasoning:
params["temperature"] = temperature
response = self.client.chat.completions.create(**params)
return response.choices[0].message.content
def run_batch(self, tasks: list[str], system_prompt: str = None) -> list[str]:
"""Executa multiplas tarefas em sequencia."""
return [self.run(task, system_prompt) for task in tasks]
def run_parallel(self, tasks: list[str], system_prompt: str = None, max_workers: int = 5) -> list[str]:
"""
Executa multiplas tarefas em paralelo via threads.
Args:
tasks: Lista de tarefas a executar
system_prompt: Instrucao de comportamento (aplicada a todas)
max_workers: Numero maximo de chamadas simultaneas (padrao: 5)
Returns:
Lista de respostas na mesma ordem das tasks
"""
return run_parallel(lambda task: self.run(task, system_prompt), tasks, max_workers)

6
requirements.txt Normal file
View file

@ -0,0 +1,6 @@
# GitHub Models API (Worker) + Claude API (opcional)
anthropic>=0.40.0
openai>=1.50.0
# Variaveis de ambiente
python-dotenv>=1.0.0