-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Expand file tree
/
Copy pathworker.py
More file actions
132 lines (98 loc) · 3.96 KB
/
worker.py
File metadata and controls
132 lines (98 loc) · 3.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# Copyright (c) Microsoft. All rights reserved.
"""Worker process for hosting a single Azure OpenAI-powered agent using Durable Task.
This worker registers agents as durable entities and continuously listens for requests.
The worker should run as a background service, processing incoming agent requests.
Prerequisites:
- Set FOUNDRY_PROJECT_ENDPOINT and FOUNDRY_MODEL
- Sign in with Azure CLI for AzureCliCredential authentication
- Start a Durable Task Scheduler (e.g., using Docker)
"""
import asyncio
import logging
import os
from agent_framework import Agent
from agent_framework.azure import DurableAIAgentWorker
from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential
from azure.identity.aio import AzureCliCredential as AsyncAzureCliCredential
from dotenv import load_dotenv
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker
# Load environment variables from .env file
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)
def create_joker_agent() -> Agent:
"""Create the Joker agent using Azure OpenAI.
Returns:
Agent: The configured Joker agent
"""
return Agent(
client=FoundryChatClient(
project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
model=os.environ["FOUNDRY_MODEL"],
credential=AsyncAzureCliCredential(),
),
name="Joker",
instructions="You are good at telling jokes.",
)
def get_worker(
taskhub: str | None = None, endpoint: str | None = None, log_handler: logging.Handler | None = None
) -> DurableTaskSchedulerWorker:
"""Create a configured DurableTaskSchedulerWorker.
Args:
taskhub: Task hub name (defaults to TASKHUB env var or "default")
endpoint: Scheduler endpoint (defaults to ENDPOINT env var or "http://localhost:8080")
log_handler: Optional logging handler for worker logging
Returns:
Configured DurableTaskSchedulerWorker instance
"""
taskhub_name = taskhub or os.getenv("TASKHUB", "default")
endpoint_url = endpoint or os.getenv("ENDPOINT", "http://localhost:8080")
logger.debug(f"Using taskhub: {taskhub_name}")
logger.debug(f"Using endpoint: {endpoint_url}")
credential = None if endpoint_url == "http://localhost:8080" else AzureCliCredential()
return DurableTaskSchedulerWorker(
host_address=endpoint_url,
secure_channel=endpoint_url != "http://localhost:8080",
taskhub=taskhub_name,
token_credential=credential,
log_handler=log_handler,
)
def setup_worker(worker: DurableTaskSchedulerWorker) -> DurableAIAgentWorker:
"""Set up the worker with agents registered.
Args:
worker: The DurableTaskSchedulerWorker instance
Returns:
DurableAIAgentWorker with agents registered
"""
# Wrap it with the agent worker
agent_worker = DurableAIAgentWorker(worker)
# Create and register the Joker agent
logger.debug("Creating and registering Joker agent...")
joker_agent = create_joker_agent()
agent_worker.add_agent(joker_agent)
logger.debug(f"✓ Registered agent: {joker_agent.name}")
logger.debug(f" Entity name: dafx-{joker_agent.name}")
return agent_worker
async def main():
"""Main entry point for the worker process."""
logger.debug("Starting Durable Task Agent Worker...")
# Create a worker using the helper function
worker = get_worker()
# Setup worker with agents
setup_worker(worker)
logger.info("Worker is ready and listening for requests...")
logger.info("Press Ctrl+C to stop.")
logger.info("")
try:
# Start the worker (this blocks until stopped)
worker.start()
# Keep the worker running
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
logger.debug("Worker shutdown initiated")
logger.debug("Worker stopped")
if __name__ == "__main__":
asyncio.run(main())