-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Expand file tree
/
Copy pathclient.py
More file actions
123 lines (93 loc) · 3.75 KB
/
client.py
File metadata and controls
123 lines (93 loc) · 3.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# Copyright (c) Microsoft. All rights reserved.
"""Client application for interacting with a Durable Task hosted agent.
This client connects to the Durable Task Scheduler and sends requests to
registered agents, demonstrating how to interact with agents from external processes.
Prerequisites:
- The worker must be running with the agent registered
- Set FOUNDRY_PROJECT_ENDPOINT and FOUNDRY_MODEL
- Sign in with Azure CLI for AzureCliCredential authentication
- Durable Task Scheduler must be running
"""
import asyncio
import logging
import os
from agent_framework.azure import DurableAIAgentClient
from azure.identity import AzureCliCredential
from dotenv import load_dotenv
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
# Load environment variables from .env file
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def get_client(
taskhub: str | None = None, endpoint: str | None = None, log_handler: logging.Handler | None = None
) -> DurableAIAgentClient:
"""Create a configured DurableAIAgentClient.
Args:
taskhub: Task hub name (defaults to TASKHUB env var or "default")
endpoint: Scheduler endpoint (defaults to ENDPOINT env var or "http://localhost:8080")
log_handler: Optional logging handler for client logging
Returns:
Configured DurableAIAgentClient instance
"""
taskhub_name = taskhub or os.getenv("TASKHUB", "default")
endpoint_url = endpoint or os.getenv("ENDPOINT", "http://localhost:8080")
logger.debug(f"Using taskhub: {taskhub_name}")
logger.debug(f"Using endpoint: {endpoint_url}")
credential = None if endpoint_url == "http://localhost:8080" else AzureCliCredential()
dts_client = DurableTaskSchedulerClient(
host_address=endpoint_url,
secure_channel=endpoint_url != "http://localhost:8080",
taskhub=taskhub_name,
token_credential=credential,
log_handler=log_handler,
)
return DurableAIAgentClient(dts_client)
def run_client(agent_client: DurableAIAgentClient) -> None:
"""Run client interactions with the Joker agent.
Args:
agent_client: The DurableAIAgentClient instance
"""
# Get a reference to the Joker agent
logger.debug("Getting reference to Joker agent...")
joker = agent_client.get_agent("Joker")
# Create a new session for the conversation
session = joker.create_session()
logger.debug(f"Session ID: {session.session_id}")
logger.info("Start chatting with the Joker agent! (Type 'exit' to quit)")
# Interactive conversation loop
while True:
# Get user input
try:
user_message = input("You: ").strip()
except (EOFError, KeyboardInterrupt):
logger.info("\nExiting...")
break
# Check for exit command
if user_message.lower() == "exit":
logger.info("Goodbye!")
break
# Skip empty messages
if not user_message:
continue
# Send message to agent and get response
try:
response = joker.run(user_message, session=session)
logger.info(f"Joker: {response.text} \n")
except Exception as e:
logger.error(f"Error getting response: {e}")
logger.info("Conversation completed.")
async def main() -> None:
"""Main entry point for the client application."""
logger.debug("Starting Durable Task Agent Client...")
# Create client using helper function
agent_client = get_client()
try:
run_client(agent_client)
except Exception as e:
logger.exception(f"Error during agent interaction: {e}")
finally:
logger.debug("Client shutting down")
if __name__ == "__main__":
asyncio.run(main())