Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Feature/Add Langfuse Integration for User Feedback Tracking #2264

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 141 additions & 28 deletions app/backend/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,18 @@
from azure.search.documents.indexes.aio import SearchIndexClient
from azure.storage.blob.aio import ContainerClient
from azure.storage.blob.aio import StorageStreamDownloader as BlobDownloader
from azure.storage.filedatalake.aio import FileSystemClient
from azure.storage.filedatalake.aio import StorageStreamDownloader as DatalakeDownloader
from azure.storage.filedatalake.aio import (
FileSystemClient,
)
from azure.storage.filedatalake.aio import (
StorageStreamDownloader as DatalakeDownloader,
)
from langfuse import Langfuse
from langfuse.decorators import observe
from openai import AsyncAzureOpenAI, AsyncOpenAI
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
from openai import AsyncAzureOpenAI, AsyncOpenAI
from langfuse.openai import AsyncAzureOpenAI, AsyncOpenAI

Token usage and other metrics tracking implementation
References:

While token usage tracking and other related functionalities are successfully implemented, feedback tracking does not work beyond this stage.
image

from opentelemetry.instrumentation.aiohttp_client import AioHttpClientInstrumentor
from opentelemetry.instrumentation.asgi import OpenTelemetryMiddleware
from opentelemetry.instrumentation.httpx import (
HTTPXClientInstrumentor,
)
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.instrumentation.openai import OpenAIInstrumentor
from quart import (
Blueprint,
Expand Down Expand Up @@ -94,6 +98,13 @@
from prepdocslib.filestrategy import UploadUserFileStrategy
from prepdocslib.listfilestrategy import File

# Initialize Langfuse client
langfuse = Langfuse(
secret_key=os.getenv("LANGFUSE_SECRET_KEY"),
public_key=os.getenv("LANGFUSE_PUBLIC_KEY"),
host=os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com"),
)

bp = Blueprint("routes", __name__, static_folder="static")
# Fix Windows registry issue with mimetypes
mimetypes.add_type("application/javascript", ".js")
Expand Down Expand Up @@ -169,13 +180,26 @@ async def content_file(path: str, auth_claims: Dict[str, Any]):

@bp.route("/ask", methods=["POST"])
@authenticated
@observe()
async def ask(auth_claims: Dict[str, Any]):
if not request.is_json:
return jsonify({"error": "request must be json"}), 415

request_json = await request.get_json()
context = request_json.get("context", {})
context["auth_claims"] = auth_claims

try:
# Create a trace for this ask request
trace = langfuse.trace(
metadata={"endpoint": "/ask", "method": "POST", "entra_oid": auth_claims.get("oid", "anonymous")}
)

# Create a generation
generation = trace.generation(
name="ask_response", metadata={"messages": request_json.get("messages", []), "context": context}
)

use_gpt4v = context.get("overrides", {}).get("use_gpt4v", False)
approach: Approach
if use_gpt4v and CONFIG_ASK_VISION_APPROACH in current_app.config:
Expand All @@ -185,8 +209,14 @@ async def ask(auth_claims: Dict[str, Any]):
r = await approach.run(
request_json["messages"], context=context, session_state=request_json.get("session_state")
)

# End the generation after getting response
generation.end(metadata={"model": approach.__class__.__name__, "use_gpt4v": use_gpt4v})

return jsonify(r)
except Exception as error:
if "generation" in locals():
generation.end(error=str(error))
return error_response(error, "/ask")


Expand All @@ -208,6 +238,7 @@ async def format_as_ndjson(r: AsyncGenerator[dict, None]) -> AsyncGenerator[str,

@bp.route("/chat", methods=["POST"])
@authenticated
@observe()
async def chat(auth_claims: Dict[str, Any]):
if not request.is_json:
return jsonify({"error": "request must be json"}), 415
Expand Down Expand Up @@ -242,39 +273,80 @@ async def chat(auth_claims: Dict[str, Any]):

@bp.route("/chat/stream", methods=["POST"])
@authenticated
@observe()
async def chat_stream(auth_claims: Dict[str, Any]):
if not request.is_json:
return jsonify({"error": "request must be json"}), 415

request_json = await request.get_json()
context = request_json.get("context", {})
context["auth_claims"] = auth_claims

try:
use_gpt4v = context.get("overrides", {}).get("use_gpt4v", False)
approach: Approach
if use_gpt4v and CONFIG_CHAT_VISION_APPROACH in current_app.config:
approach = cast(Approach, current_app.config[CONFIG_CHAT_VISION_APPROACH])
else:
approach = cast(Approach, current_app.config[CONFIG_CHAT_APPROACH])
# Create trace_id at the start
trace_id = create_session_id(True, True)

# If session state is provided, persists the session state,
# else creates a new session_id depending on the chat history options enabled.
session_state = request_json.get("session_state")
if session_state is None:
session_state = create_session_id(
current_app.config[CONFIG_CHAT_HISTORY_COSMOS_ENABLED],
current_app.config[CONFIG_CHAT_HISTORY_BROWSER_ENABLED],
)
result = await approach.run_stream(
request_json["messages"],
context=context,
session_state=session_state,
# Make trace_id available at all levels
context["trace_id"] = trace_id
if "overrides" not in context:
context["overrides"] = {}
context["overrides"]["trace_id"] = trace_id

current_app.logger.info(f"Generated trace_id: {trace_id}")

# Create a new trace with unique ID and generation at the start
trace = langfuse.trace(
id=trace_id,
metadata={"endpoint": "/chat/stream", "method": "POST", "entra_oid": auth_claims.get("oid", "anonymous")},
)

# Create generation early
generation = trace.generation(
name="chat_stream_response", metadata={"messages": request_json.get("messages", []), "context": context}
)
response = await make_response(format_as_ndjson(result))
response.timeout = None # type: ignore
response.mimetype = "application/json-lines"
return response

current_app.logger.info(f"Generated trace_id: {trace_id}")

try:
use_gpt4v = context.get("overrides", {}).get("use_gpt4v", False)
approach: Approach
if use_gpt4v and CONFIG_CHAT_VISION_APPROACH in current_app.config:
approach = cast(Approach, current_app.config[CONFIG_CHAT_VISION_APPROACH])
else:
approach = cast(Approach, current_app.config[CONFIG_CHAT_APPROACH])

session_state = request_json.get("session_state")
if session_state is None:
session_state = create_session_id(
current_app.config[CONFIG_CHAT_HISTORY_COSMOS_ENABLED],
current_app.config[CONFIG_CHAT_HISTORY_BROWSER_ENABLED],
)

# Get result first
result = await approach.run_stream(
request_json["messages"],
context=context,
session_state=session_state,
)

# Create response after getting result
response = await make_response(format_as_ndjson(result))
response.timeout = None # type: ignore
response.mimetype = "application/json-lines"

# End generation before returning response
generation.end(metadata={"model": approach.__class__.__name__, "use_gpt4v": use_gpt4v, "status": "success"})

return response

except Exception as inner_error:
current_app.logger.error(f"Error in chat processing: {inner_error}")
generation.end(error=str(inner_error), metadata={"status": "error"})
raise inner_error

except Exception as error:
return error_response(error, "/chat")
current_app.logger.error(f"Error in chat_stream: {error}")
return error_response(error, "/chat/stream")


# Send MSAL.js settings to the client UI
Expand Down Expand Up @@ -406,6 +478,47 @@ async def list_uploaded(auth_claims: dict[str, Any]):
return jsonify(files), 200


@bp.route("/feedback", methods=["POST"])
@authenticated
async def feedback(auth_claims: Dict[str, Any]):
if not request.is_json:
return jsonify({"error": "request must be json"}), 415

try:
request_json = await request.get_json()
trace_id = request_json.get("trace_id")
value = request_json.get("value") # For thumbs history
score = request_json.get("score", value) # For Langfuse, fallback to value if not provided
feedback_type = request_json.get("type", "thumbs")
comment = request_json.get("comment")

current_app.logger.info(f"Received feedback request: {request_json}")

if not trace_id or (value is None and score is None):
return jsonify({"error": "trace_id and value/score are required"}), 400

# Add LLM provider info to the feedback
llm_provider = os.getenv("OPENAI_HOST", "openai")
score = langfuse.score(
trace_id=trace_id,
name="user_feedback_thumbs",
value=score, # Use score for Langfuse
comment=comment,
metadata={
"llm_provider": llm_provider,
"model": os.getenv("AZURE_OPENAI_CHATGPT_MODEL", "gpt-4") if llm_provider == "azure" else "gpt-4",
"feedback_type": feedback_type,
},
)

current_app.logger.info(f"Successfully submitted feedback to Langfuse: {score}")
return jsonify({"message": "Feedback received successfully", "trace_id": trace_id, "value": value}), 200

except Exception as error:
current_app.logger.error(f"Error in feedback endpoint: {error}")
return jsonify({"error": str(error)}), 500


@bp.before_app_serving
async def setup_clients():
# Replace these with your own values, either in environment variables or directly here
Expand Down
12 changes: 12 additions & 0 deletions app/backend/approaches/chatreadretrieveread.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ async def run_until_final_call(

data_points = {"text": sources_content}

# Get trace_id from all possible locations
trace_id = (
overrides.get("trace_id") or overrides.get("context", {}).get("trace_id") or auth_claims.get("trace_id")
)

extra_info = {
"data_points": data_points,
"thoughts": [
Expand Down Expand Up @@ -231,8 +236,15 @@ async def run_until_final_call(
),
),
],
"trace_id": trace_id, # Add trace_id
}

# Log for verification
print(f"trace_id in extra_info: {extra_info['trace_id']}")

# Log for debugging
print(f"extra_info in run_until_final_call: {extra_info}")

chat_coroutine = self.openai_client.chat.completions.create(
# Azure OpenAI takes the deployment name as the model name
model=self.chatgpt_deployment if self.chatgpt_deployment else self.chatgpt_model,
Expand Down
28 changes: 27 additions & 1 deletion app/frontend/src/api/models.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { FeedbackProviderOptions } from "../components/HistoryProviders/IProvider";

export const enum RetrievalMode {
Hybrid = "hybrid",
Vectors = "vectors",
Expand Down Expand Up @@ -54,6 +56,7 @@ export type ResponseContext = {
data_points: string[];
followup_questions: string[] | null;
thoughts: Thoughts[];
trace_id: string; // Make this required
};

export type ChatAppResponseOrError = {
Expand All @@ -67,7 +70,13 @@ export type ChatAppResponseOrError = {
export type ChatAppResponse = {
message: ResponseMessage;
delta: ResponseMessage;
context: ResponseContext;
context: {
data_points: string[];
followup_questions: string[] | null;
thoughts: Thoughts[];
trace_id?: string;
feedback?: number;
};
session_state: any;
};

Expand Down Expand Up @@ -123,3 +132,20 @@ export type HistroyApiResponse = {
answers: any;
timestamp: number;
};

export interface AppConfig {
showUserFeedback: boolean;
feedbackProvider: FeedbackProviderOptions;
}

export interface FeedbackRequest {
trace_id: string;
value: number;
type: "thumbs";
}

export type FeedbackResponse = {
trace_id: string;
status: "success" | "error";
message?: string;
};
47 changes: 47 additions & 0 deletions app/frontend/src/components/Answer/Answer.module.css
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,53 @@ sup {
width: fit-content;
}

.feedbackMessage {
position: fixed;
bottom: 1.25rem;
right: 1.25rem;
padding: 0.625rem 1.25rem;
background-color: #4caf50;
color: white;
border-radius: 0.25rem;
z-index: 1000;
animation: fadeInOut 3s ease;
}

.feedbackButtonClicked i {
color: #323130 !important; /* Darker color for solid icons */
}

.feedbackButton:hover i {
color: #201f1e !important; /* Darker on hover */
}

.errorMessage {
position: fixed;
bottom: 1.25rem;
right: 1.25rem;
padding: 0.625rem 1.25rem;
background-color: #f44336;
color: white;
border-radius: 0.25rem;
z-index: 1000;
animation: fadeInOut 3s ease;
}

@keyframes fadeInOut {
0% {
opacity: 0;
}
10% {
opacity: 1;
}
90% {
opacity: 1;
}
100% {
opacity: 0;
}
}

@keyframes loading {
0% {
content: "";
Expand Down
Loading