Your First Workflow
Build a complete customer service workflow using multiple AI frameworks.
Overview
We'll create a workflow that:
- Analyzes customer sentiment (LangChain)
- Checks customer history (Zep Memory)
- Creates support ticket (MCP Tools)
- Starts resolution workflow (Temporal)
Step 1: Define the Business Rules
Create rules/customer_service.yaml
:
rule_set: customer_service
rules:
- rule: analyze_sentiment
description: Analyze customer message sentiment
priority: high
conditions:
- message.text != null
actions:
- framework: langchain
action: sentiment_analysis
output: sentiment_score
- rule: check_escalation
description: Determine if escalation needed
priority: high
conditions:
- sentiment_score < 0.3
- customer.tier == "premium"
actions:
- framework: zep
action: get_customer_history
- framework: temporal
action: start_escalation_workflow
params:
priority: high
- rule: create_ticket
description: Create support ticket
priority: medium
conditions:
- sentiment_score < 0.7
actions:
- framework: mcp
action: create_ticket
params:
type: support
auto_assign: true
Step 2: Initialize the Frameworks
from bizy.core import MetaOrchestrator
from bizy.adapters import (
LangChainAdapter,
TemporalAdapter,
MCPAdapter,
ZepAdapter
)
async def setup_orchestrator():
orchestrator = MetaOrchestrator()
# Configure adapters
adapters = {
"langchain": LangChainAdapter({"api_key": "..."}),
"temporal": TemporalAdapter({"host": "localhost"}),
"mcp": MCPAdapter({"server_url": "http://localhost:8080"}),
"zep": ZepAdapter({"api_url": "http://localhost:8000"})
}
# Register all adapters
for name, adapter in adapters.items():
await adapter.connect()
orchestrator.register_adapter(name, adapter)
# Load rules
orchestrator.load_rules("rules/customer_service.yaml")
return orchestrator
Step 3: Create the Workflow Handler
class CustomerServiceHandler:
def __init__(self, orchestrator):
self.orchestrator = orchestrator
async def handle_customer_message(self, customer_id: str, message: str):
# Prepare context
context = {
"customer": await self.get_customer_info(customer_id),
"message": {"text": message}
}
# Execute rules
result = await self.orchestrator.execute(
rule_set="customer_service",
context=context
)
return {
"ticket_id": result.get("ticket_id"),
"workflow_id": result.get("workflow_id"),
"sentiment": result.get("sentiment_score"),
"actions_taken": result.get("executed_actions")
}
Step 4: Run the Workflow
async def main():
# Setup
orchestrator = await setup_orchestrator()
handler = CustomerServiceHandler(orchestrator)
# Handle a customer message
result = await handler.handle_customer_message(
customer_id="cust_123",
message="I'm very frustrated with the service!"
)
print(f"Ticket created: {result['ticket_id']}")
print(f"Sentiment score: {result['sentiment']}")
print(f"Actions taken: {result['actions_taken']}")
# Run
import asyncio
asyncio.run(main())
Expected Output
Ticket created: TICK-2024-001
Sentiment score: 0.2
Actions taken: [
"langchain.sentiment_analysis",
"zep.get_customer_history",
"temporal.start_escalation_workflow",
"mcp.create_ticket"
]
Next Steps
- Add error handling and retries
- Implement custom actions
- Create workflow monitoring
- Scale with multiple workers