logo

Crea un chatbot con IA alimentado por tus datos

Implementación de un chatbot con RAG (Retrieval Augmented Generation) usando FastAPI, Redis y OpenAI

6 de septiembre de 2024

En un artículo anterior, "Retrieval Augmented Generation desde cero", exploramos los fundamentos de los sistemas RAG construyendo, desde cero, una aplicación local en Python. Vimos conceptos clave como la carga de documentos, la segmentación de texto (chunking), los vectores embeddings y la búsqueda semántica. La aplicación que construimos era capaz de responder preguntas sobre la película Inception usando su guion como fuente de datos.

Diagrama que muestra la carga de documentos a una base de datos vectorial y las consultas mediante Retrieval-Augmented Generation

Ahora vamos a llevar todo esto al siguiente nivel. En este artículo, vamos a construir un chatbot con IA listo para producción que muestra cómo aplicar estos conceptos en un caso de uso real. Será un chatbot de tendencias tecnológicas basado en los últimos informes de instituciones como McKinsey, Deloitte, el Banco Mundial, el FEM y la OCDE.

Lo implementaremos como una aplicación web full-stack, con un backend en Python (que cubriremos aquí) y un frontend en React (que veremos en el siguiente artículo). Y la gran ventaja de este sistema es que se puede personalizar con tus propias fuentes de datos y adaptar a distintos casos de uso.

Además, usaremos técnicas y tecnologías más avanzadas para construir una aplicación lista para producción. Este es un resumen de lo que cubriremos:

Las técnicas que veremos pueden aplicarse en múltiples dominios, desde asistentes tecnológicos hasta atención al cliente, análisis financiero o cualquier campo donde donde el acceso a información especializada y actualizada sea clave.

Todo el código de la aplicación está disponible en este repositorio de GitHub. Aunque intentaré explicar todo lo posible en este artículo, puedes consultar los detalles más específicos allí. Y si no estás familiarizado con algunos de los conceptos, te recomiendo revisar el artículo anterior que explica RAG desde cero.

Antes de empezar, puedes probar el chatbot aquí:

#Estructura del backend

Antes de profundizar en los detalles, echemos un vistazo a la estructura del backend para tener una visión general de cómo está organizada la aplicación:

backend/

├── app/
│ ├── assistants/
│ │ ├── assistant.py # Clase principal del asistente
│ │ ├── local-assistant.py # Clase del asistente para la aplicación local
│ │ ├── prompts.py # Prompts del asistente
│ │ └── tools.py # Contiene la herramienta QueryKnowledgeBaseTool
│ │
│ ├── utils/
│ │ ├── splitter.py # Segmentación de texto
│ │ └── sse_stream.py # Server-sent events
│ │
│ ├── api.py # Endpoints de FastAPI
│ ├── config.py # Configuración con Pydantic Settings
│ ├── db.py # Base de datos Redis
│ ├── export.py # Exportación de chats a JSON
│ ├── loader.py # Procesamiento de documentos y base de conocimiento
│ ├── main.py # Aplicación principal de FastAPI
│ └── openai.py # Funciones de OpenAI

└── pyproject.toml # Dependencias y configuración del proyecto

#Creación de la base de conocimiento

Lo primero que necesitamos en un chatbot basado en RAG es una base de conocimiento (knowledge base), que contiene la información necesaria para responder a las preguntas de los usuarios. Veamos la función principal que construye la base de conocimiento:

async def load_knowledge_base():
async with get_redis() as rdb:
await setup_db(rdb)
chunks = await process_docs()
await add_chunks_to_vector_db(rdb, chunks)

La función get_redis() abre una conexión asíncrona con Redis (veremos Redis en la siguiente sección). Después configuramos la base de datos, procesamos los documentos (en nuestro caso, los informes tecnológicos en PDF) y almacenamos los fragmentos procesados junto con sus embeddings en la base de datos vectorial.

La función de procesamiento de documentos es similar a lo que hicimos en el artículo anterior RAG desde cero e incluye:

Aquí tienes una versión simplificada de todo este procesamiento:

async def process_docs(docs_dir=settings.DOCS_DIR):
docs = []
pdf_files = [f for f in os.listdir(docs_dir) if f.endswith('.pdf')]
for filename in tqdm(pdf_files):
file_path = os.path.join(docs_dir, filename)
text = extract_text(file_path)
doc_name = os.path.splitext(filename)[0]
docs.append((doc_name, text))

chunks = []
text_splitter = TextSplitter(chunk_size=512, chunk_overlap=150)

for doc_name, doc_text in docs:
doc_id = str(uuid4())[:8]
doc_chunks = text_splitter.split(doc_text)
for chunk_idx, chunk_text in enumerate(doc_chunks):
chunk = {
'chunk_id': f'{doc_id}:{chunk_idx+1:04}',
'text': chunk_text,
'doc_name': doc_name,
'vector': None
}
chunks.append(chunk)

vectors = []
for batch in batchify(chunks, batch_size=64):
batch_vectors = await get_embeddings([chunk['text'] for chunk in batch])
vectors.extend(batch_vectors)

for chunk, vector in zip(chunks, vectors):
chunk['vector'] = vector
return chunks

Si tienes curiosidad sobre la funcionalidad de segmentación en TextSplitter, la vimos en el artículo anterior sobre RAG y también puedes consultar el código en detalle aquí.

Crear los embeddings vectoriales con OpenAI es tan sencillo como:

from openai import AsyncOpenAI

client = AsyncOpenAI(api_key=settings.OPENAI_API_KEY)

async def get_embeddings(input, model=settings.EMBEDDING_MODEL, dimensions=settings.EMBEDDING_DIMENSIONS):
res = await client.embeddings.create(input=input, model=model, dimensions=dimensions)
return [d.embedding for d in res.data]

#Base de datos Redis

Redis es una base de datos en memoria de alto rendimiento y muy versátil. Aunque es más conocida como sistema de caché, ha evolucionado mucho en los últimos años e incluye potentes extensiones para JSON y búsqueda (incluyendo búsqueda vectorial). También ofrece diferentes opciones de persistencia para garantizar la seguridad de los datos.

Si eres nuevo en Redis, sus guías de inicio rápido están muy bien para ponerte al día. También puedes echar un vistazo a los ejemplos de la librería para Python.

En nuestra aplicación, Redis desempeña un doble papel:

  1. Como base de datos vectorial para búsqueda semántica, permitiéndonos encontrar los fragmentos semánticamente más similares a las preguntas del usuario.
  2. Como base de datos general para el almacenamiento de las conversaciones.

Para poder almacenar vectores y realizar búsquedas vectoriales, necesitamos crear un índice en Redis:

async def create_vector_index(rdb):
schema = (
TextField('$.chunk_id', no_stem=True, as_name='chunk_id'),
TextField('$.text', as_name='text'),
TextField('$.doc_name', as_name='doc_name'),
VectorField(
'$.vector',
'FLAT',
{
'TYPE': 'FLOAT32',
'DIM': settings.EMBEDDING_DIMENSIONS,
'DISTANCE_METRIC': 'COSINE'
},
as_name='vector'
)
)
await rdb.ft('idx:vector').create_index(
fields=schema,
definition=IndexDefinition(prefix=['vector:'], index_type=IndexType.JSON)
)

El índice es de tipo JSON. Además del vector embedding (vector) almacenamos otros metadatos como chunk_id, doc_name (útiles para citar fuentes) y el texto completo del fragmento en text. El prefijo 'vector:' indica que todos los objetos JSON deben almacenarse con claves que empiecen con ese prefijo para ser indexados.

Es importante mencionar que no es necesario indexar todos los campos cuando almacenas datos en Redis. Solo debes indexar aquellos campos que vayas a utilizar en las búsquedas, para que estas sean eficientes. Por ejemplo, el índice que utilizamos para almacenar los chats es mucho más simple, y solo incluye el campo created por si queremos obtener los últimos chats ordenados por fecha:

async def create_chat_index(rdb):
schema = (
NumericField('$.created', as_name='created', sortable=True),
)
await rdb.ft('idx:chat').create_index(
fields=schema,
definition=IndexDefinition(prefix=['chat:'], index_type=IndexType.JSON)
)

Veamos ahora algunas de las funciones clave que interactúan con nuestra base de datos Redis. Esta es la función utilizada en la sección anterior para añadir fragmentos a la base de datos vectorial:

async def add_chunks_to_vector_db(rdb, chunks):
async with rdb.pipeline(transaction=True) as pipe:
for chunk in chunks:
pipe.json().set('vector:' + chunk['chunk_id'], Path.root_path(), chunk)
await pipe.execute()

Esta función realiza la búsqueda semántica usando el algoritmo K-nearest neighbors (KNN) y devuelve los top_k fragmentos más similares:

async def search_vector_db(rdb, query_vector, top_k=settings.VECTOR_SEARCH_TOP_K):
query = (
Query(f'(*)=>[KNN {top_k} @vector $query_vector AS score]')
.sort_by('score')
.return_fields('score', 'chunk_id', 'text', 'doc_name')
.dialect(2)
)
res = await rdb.ft(VECTOR_IDX_NAME).search(query, {
'query_vector': np.array(query_vector, dtype=np.float32).tobytes()
})
return [{
'score': 1 - float(d.score),
'chunk_id': d.chunk_id,
'text': d.text,
'doc_name': d.doc_name
} for d in res.docs]

Observa cómo calculamos la puntuación de similitud como 1 - float(d.score) porque la puntuación que devuelve la búsqueda es la distancia coseno (cuanto menor sea la distancia coseno, mayor será la similitud con la consulta del usuario). Y queremos obtener los top_k fragmentos más similares (los 10 primeros en nuestra aplicación). Ten en cuenta también que Redis requiere que el vector de consulta sea un array de bytes, por eso usamos np.array() y tobytes() de Numpy.

Finalmente, estas son las funciones utilizadas para crear chats, comprobar si existe un chat, añadir nuevos mensajes a un chat y recuperar los últimos n mensajes de un chat concreto (útil para la memoria conversacional):

async def create_chat(rdb, chat_id, created):
chat = {'id': chat_id, 'created': created, 'messages': []}
await rdb.json().set('chat:' + chat_id, Path.root_path(), chat)
return chat

async def chat_exists(rdb, chat_id):
return await rdb.exists('chat:' + chat_id)

async def add_chat_messages(rdb, chat_id, messages):
await rdb.json().arrappend('chat:' + chat_id, '$.messages', *messages)

async def get_chat_messages(rdb, chat_id, last_n=None):
if last_n is None:
messages = await rdb.json().get('chat:' + chat_id, '$.messages[*]')
else:
messages = await rdb.json().get('chat:' + chat_id, f'$.messages[-{last_n}:]')
return [{'role': m['role'], 'content': m['content']} for m in messages] if messages else []

#Endpoints de la API

Para crear el backend usamos FastAPI, un framework moderno y rápido para construir APIs con Python. En nuestra aplicación, hay dos endpoints principales:

  1. Crear una nueva sesión de chat.
  2. Enviar un mensaje y obtener la respuesta del chatbot en una sesión de chat concreta.

Veamos los endpoints en código:

from fastapi import APIRouter, Depends, HTTPException

router = APIRouter()

@router.post('/chats')
async def create_new_chat(rdb = Depends(get_rdb)):
chat_id = str(uuid4())[:8]
created = int(time())
await create_chat(rdb, chat_id, created)
return {'id': chat_id}

@router.post('/chats/{chat_id}')
async def chat(chat_id: str, chat_in: ChatIn):
rdb = get_redis()
if not await chat_exists(rdb, chat_id):
raise HTTPException(status_code=404, detail=f'Chat {chat_id} does not exist')
assistant = RAGAssistant(chat_id=chat_id, rdb=rdb)
sse_stream = assistant.run(message=chat_in.message)
return EventSourceResponse(sse_stream, background=rdb.aclose)

El segundo endpoint chat crea una instancia de la clase RAGAssistant, que incluye la lógica central de nuestro chatbot, como veremos en breve. Llama al método run() con el mensaje del usuario y usa Server-Sent Events (SSE) para enviar las respuestas del asistente al cliente mediante streaming en tiempo real. Para ello, utilizamos EventSourceResponse de la librería sse-starlette.

Fíjate también en que estamos usando el sistema de inyección de dependencias de FastAPI en rdb = Depends(get_rdb) para gestionar las conexiones con Redis. Usando la instrucción yield, abrimos la conexión cuando comienza la petición y la cerramos cuando finaliza:

async def get_rdb():
rdb = get_redis()
try:
yield rdb
finally:
await rdb.aclose()

⚠️Debido a una limitación con las respuestas en streaming de FastAPI después de la versión 0.106, no podemos usar una dependencia con yield para cerrar la conexión Redis en el segundo endpoint, así que lo hacemos usando una tarea en segundo plano en EventSourceResponse.

Con los endpoints definidos, crear una aplicación FastAPI es simple:

from fastapi import FastAPI

app = FastAPI()
app.include_router(router)

#Clase RAG Assistant

La clase RAGAssistant contiene la lógica central de nuestro chatbot RAG. Gestiona las conversaciones de los usuarios, almacena el historial de chat, consulta la base de conocimiento para obtener información relevante y utiliza el LLM con Retrieval Augmented Generation para responder a las preguntas de los usuarios.

La clase se inicializa así:

from openai import pydantic_function_tool

class RAGAssistant:
def __init__(self, chat_id, rdb, history_size=4, max_tool_calls=3):
self.chat_id = chat_id
self.rdb = rdb
self.sse_stream = None
self.main_system_message = {'role': 'system', 'content': MAIN_SYSTEM_PROMPT}
self.rag_system_message = {'role': 'system', 'content': RAG_SYSTEM_PROMPT}
self.tools_schema = [pydantic_function_tool(QueryKnowledgeBaseTool)]
self.history_size = history_size
self.max_tool_calls = max_tool_calls

Un resumen rápido de estos atributos:

Una de las características clave de nuestro chatbot es su capacidad para realizar streaming de las respuestas. Esto se consigue mediante una combinación de programación asíncrona y server-sent events (SSE). Echemos un vistazo al método principal run (utilizado en el endpoint chat):

def run(self, message):
self.sse_stream = SSEStream()
asyncio.create_task(self._handle_conversation_task(message))
return self.sse_stream

Cuando llega un nuevo mensaje del usuario, el asistente crea un objeto SSEStream que se encarga de crear una cola y gestionar el envío de la respuesta. A continuación, crea una tarea asíncrona con asyncio que gestiona toda la lógica de la conversación y devuelve inmediatamente el sse_stream. Esto permite comenzar a transmitir la respuesta al cliente en cuanto se generan los primeros fragmentos de texto, en lugar de esperar a que se complete todo el proceso.

En lugar de hacer await a self._handle_conversation_task(message) en el método run, lo lanzamos como una tarea concurrente. Así, la gestión de la conversación y el streaming de la respuesta pueden ejecutarse simultáneamente.

El método _handle_conversation_task ejecuta la lógica de la conversación, gestiona posibles errores y se asegura de que el sse_stream se cierre al final:

async def _handle_conversation_task(self, message):
try:
await self._run_conversation_step(message)
except Exception as e:
print(f'Error: {str(e)}')
# Other error handling
finally:
await self.sse_stream.close()

La función _run_conversation_step es donde implementamos RAG, realizamos búsquedas en la base de conocimiento y usamos el LLM para generar respuestas al usuario. Es probablemente la parte más importante del chatbot y vamos a analizarla en detalle en la siguiente sección.

#Implementando RAG con herramientas y respuestas estructuradas

En mi última entrada de blog, "Structured Outputs de OpenAI para RAG y extracción de datos", exploré la nueva funcionalidad de Structured Outputs de OpenAI, que permite asegurar que las respuestas del modelo cumplan un esquema JSON concreto. El artículo muestra cómo definir los esquemas usando Pydantic y algunas funciones útiles proporcionadas por el SDK de Python de OpenAI. También incluye algunos ejemplos de cómo usar respuestas estructuradas en aplicaciones RAG, con y sin streaming.

Nuestro chatbot de IA utiliza estas mismas técnicas, así que si quieres profundizar más, te recomiendo leer el artículo.

El proceso RAG se integra en el flujo de la conversación a través del método _run_conversation_step de la clase RAGAssistant:

async def _run_conversation_step(self, message):
user_db_message = {'role': 'user', 'content': message, 'created': int(time())}
chat_messages = await get_chat_messages(self.rdb, self.chat_id, last_n=self.history_size)
chat_messages.append({'role': 'user', 'content': message})
assistant_message = await self._generate_chat_response(
system_message=self.main_system_message,
chat_messages=chat_messages,
tools=self.tools_schema
)
tool_calls = assistant_message.tool_calls

if tool_calls:
chat_messages.append(assistant_message)
assistant_message = await self._handle_tool_calls(tool_calls, chat_messages)

assistant_db_message = {
'role': 'assistant',
'content': assistant_message.content,
'tool_calls': [
{'name': tc.function.name, 'arguments': tc.function.arguments} for tc in tool_calls
],
'created': int(time())
}
await add_chat_messages(self.rdb, self.chat_id, [user_db_message, assistant_db_message])

Un desglose de lo que hace esta función:

  1. Crea un objeto con el mensaje recibido del usuario y una marca temporal para el almacenamiento en la base de datos.
  2. Obtiene los últimos (self.history_size) mensajes del chat de la base de datos para la sesión actual (con ID chat_id) y añade el nuevo mensaje del usuario.
  3. Llama al LLM (GPT-4 de OpenAI) con el mensaje de sistema, las herramientas disponibles y el historial del chat para generar la respuesta del asistente. El LLM decide entonces si necesita utilizar una herramienta para responder adecuadamente a la pregunta del usuario. En nuestro caso solo estamos proporcionando la herramienta QueryKnowledgeBaseTool, que permite realizar búsquedas en la base de conocimiento.
  4. Si la respuesta del asistente contiene llamadas a herramientas, utiliza las herramientas y genera una nueva respuesta del asistente incorporando los resultados de las herramientas. En nuestro chatbot, esto significa responder a las preguntas del usuario con la información relevante obtenida de la base de conocimiento.
  5. Crea un objeto con el mensaje final del asistente y añade tanto el mensaje inicial del usuario como este mensaje del asistente al historial del chat en la base de datos.

Si tienes curiosidad, este es el prompt de sistema que estamos usando:

MAIN_SYSTEM_PROMPT = """
You are a knowledgeable assistant specialized in answering questions about new technology trends, their applications in various sectors and their broader impacts.

You have access to the 'QueryKnowledgeBaseTool,' which includes technology reports from the world's leading institutions.
Use this tool to query the knowledge base and answer user questions.

Do not rely on prior knowledge or make answers up.
Always use the provided 'QueryKnowledgeBaseTool' to ensure your answers are grounded in the most up-to-date and accurate information available.

If a user's question seems unrelated, try to find a relevant technology angle.
Only if the question is completely completely outside the scope of technology, kindly remind the user of your specialization.
"""

El método _generate_chat_response llama al LLM y gestiona el streaming de la respuesta:

async def _generate_chat_response(self, system_message, chat_messages, **kwargs):
messages = [system_message, *chat_messages]
async with chat_stream(messages=messages, **kwargs) as stream:
async for event in stream:
if event.type == 'content.delta':
await self.sse_stream.send(event.delta)

final_completion = await stream.get_final_completion()
assistant_message = final_completion.choices[0].message
return assistant_message

Este método comprueba cada fragmento en el stream de respuesta generado por el LLM. Si es de tipo 'content.delta' (nuevo fragmento de texto), lo envíamos al cliente en tiempo real usando el objeto sse_stream. Una vez finalizada la generación, llamamos al método get_final_completion para obtener la respuesta completa, incluyendo las llamadas a herramientas que el modelo haya decidido utilizar.

La función chat_stream básicamente encapsula la función client.beta.chat.completions.stream() proporcionada por el SDK de Python de OpenAI. Si quieres saber más sobre los detalles del streaming con respuestas estructuradas, consulta esta sección de mi último artículo.

Como mencionamos anteriormente, proporcionamos una única herramienta al asistente para consultar la base de conocimiento. Definimos la herramienta con un modelo de Pydantic:

from pydantic import BaseModel, Field

class QueryKnowledgeBaseTool(BaseModel):
"""Query the knowledge base to answer user questions about new technology trends, their applications and broader impacts."""
query_input: str = Field(description='The natural language query input string. The query input should be clear and standalone.')

async def __call__(self, rdb):
query_vector = await get_embedding(self.query_input)
chunks = await search_vector_db(rdb, query_vector)
formatted_sources = [f'SOURCE: {c['doc_name']}\n"""\n{c['text']}\n"""' for c in chunks]
return f"\n\n---\n\n".join(formatted_sources) + f"\n\n---"

La herramienta QueryKnowledgeBaseTool incluye un método __call__ para hacerla invocable. Una vez instanciada con la consulta a realizar (query_input) en lenguaje natural, la podemos llamar para consultar la base de conocimiento y obtener los fragmentos más relevantes para esa consulta. En concreto, la herramienta hace lo siguiente:

  1. Convierte la consulta en un vector embedding que codifica su significado.
  2. Busca en la base de datos vectorial Redis para recuperar los 10 fragmentos semánticamente más similares.
  3. Devuelve los fragmentos relevantes formateados e incluyendo para cada uno el nombre del documento para poder citar la fuente.

Si queremos que el LLM llame a nuestra herramienta, necesitamos proporcionar un esquema JSON que describa lo que hace la herramienta y qué parámetros se requieren para llamarla, según el formato especificado por OpenAI. La función pydantic_function_tool del SDK de OpenAI (que usamos en la inicialización del asistente) convierte automáticamente cualquier modelo de Pydantic en el esquema JSON requerido:

self.tools_schema = [pydantic_function_tool(QueryKnowledgeBaseTool)]

Finalmente, veamos qué hace el método _handle_tool_calls:

async def _handle_tool_calls(self, tool_calls, chat_messages):
for tool_call in tool_calls[:self.max_tool_calls]:
kb_tool = tool_call.function.parsed_arguments
kb_result = await kb_tool(self.rdb)
chat_messages.append(
{'role': 'tool', 'tool_call_id': tool_call.id, 'content': kb_result}
)
return await self._generate_chat_response(
system_message=self.rag_system_message,
chat_messages=chat_messages,
)

Un detalle clave aquí es que, como estamos usando la funcionalidad de respuestas estructuradas y usando Pydantic para definir la herramienta, la librería de OpenAI automáticamente convierte cualquier llamada a herramientas en instancias de nuestro modelo QueryKnowledgeBaseTool. Esto ocurre gracias a la función client.beta.chat.completions.stream() mencionada anteriormente.

Podemos encontrar esta instancia de QueryKnowledgeBaseTool, con el parámetro query_input incluido, en tool_call.function.parsed_arguments. Y como la hemos hecho "invocable" con __call__, basta con llamarla para realizar la consulta a la base de conocimiento.

A continuación añadimos un nuevo "mensaje de herramienta" al chat que contiene el resultado de la herramienta (los 10 fragmentos relevantes formateados) y llamamos al método _generate_chat_response de nuevo para generar la respuesta final con esta nueva información. El LLM puede entonces usar este resultado de la herramienta como contexto para responder a la pregunta del usuario. Esta es la idea central de la técnica de Retrieval-Augmented Generation.

Observa también que estamos usando un mensaje de sistema diferente para esta nueva generación con el LLM (self.rag_system_message), más centrado en el proceso RAG:

RAG_SYSTEM_PROMPT = """
You are a knowledgeable assistant specialized in answering questions about new technology trends, their applications in various sectors and their broader impacts.
Use the sources provided by the 'QueryKnowledgeBaseTool' to answer the user's question. You must only use the facts from the sources in your answer.

Make sure to reference and include relevant excerpts from the sources to support your answers.
When providing an answer, mention the specific report from which the information was retrieved (e.g., "According to the [Report Name], ...").
Your answers must be accurate and grounded on truth.

If the information needed to answer a question is not available in the sources, say that you don't have enough information and share any relevant facts you find.
"""

Este enfoque es muy interesante para construir chatbots y aplicaciones basadas en LLM más complejas. En lugar de un único LLM con un único mensaje de sistema y herramientas, puedes combinar varios agentes con diferentes prompts de sistema y distintas herramientas, cada uno especializado en una tarea concreta.

#Streaming de respuestas con Server-Sent Events

Normalmente, es el cliente el que debe enviar una petición para recibir datos del servidor. Sin embargo, en nuestro chatbot de IA, queremos que el servidor envíe datos de forma asíncrona al cliente, enviando cada fragmento de texto a medida que esté disponible. Esto hace que la aplicación sea más rápida y ofrezca una mejor experiencia de usuario.

Un posible enfoque es usar WebSockets, que crean una conexión bidireccional entre el cliente y el servidor. Sin embargo, para nuestro caso de uso, hay un enfoque más simple que funciona igual de bien y puede implementarse directamente usando HTTP: Server-Sent Events (SSE).

Con Server-Sent Events, el servidor puede enviar datos de forma asíncrona al cliente una vez establecida la conexión HTTP, que es justo lo que necesitamos. Para implementarlo en Python con FastAPI, utilizamos la librería sse-starlette.

Implementar SSE en FastAPI es tan simple como proporcionar al EventSourceResponse de sse-starlette un generador asíncrono que será la fuente de los datos a transmitir:

from sse_starlette.sse import EventSourceResponse

async def sse_endpoint():
return EventSourceResponse(generator)

En nuestro chatbot, usamos una clase SSEStream que simplifica el proceso de enviar datos al stream desde distintas partes de la aplicación:

import asyncio
from sse_starlette import ServerSentEvent

class SSEStream:
def __init__(self) -> None:
self._queue = asyncio.Queue()
self._stream_end = object()

def __aiter__(self):
return self

async def __anext__(self):
data = await self._queue.get()
if data is self._stream_end:
raise StopAsyncIteration
return ServerSentEvent(data=data)

async def send(self, data):
await self._queue.put(data)

async def close(self):
await self._queue.put(self._stream_end)

Estos son los puntos clave de la implementación:

Con todo esto en mente, ahora debería estar más claro cómo se integra el streaming en nuestra aplicación. Resumiendo lo que hemos visto en secciones anteriores:

  1. Creamos el stream en el método run de la clase RAGAssistant: self.sse_stream = SSEStream().
  2. Ejecutamos _handle_conversation_task de forma concurrente para procesar la lógica de la conversación al mismo tiempo que hacemos streaming de la respuesta.
  3. Devolvemos el stream al endpoint chat de FastAPI para que empiece inmediatamente el streaming al cliente con return EventSourceResponse(sse_stream).
  4. En el método _generate_chat_response de RAGAssistant, cuando recibimos nuevos fragmentos de la respuesta del LLM, los enviamos al cliente en tiempo real mediante await self.sse_stream.send(event.delta).

#Conclusión

Hemos explorado en detalle los bloques y el código necesario para crear un chatbot con IA alimentado por tus propios datos y listo para producción. En el próximo artículo, construiremos la interfaz de usuario del chatbot en React. Puedes ver una demo del chatbot de tendencias tecnológicas en este enlace.

Si quieres profundizar más, todo el código está disponible en el repositorio de GitHub. En el README encontrarás instrucciones para instalar y ejecutar tanto la aplicación full-stack como una versión local para hacer pruebas desde el terminal. Puedes personalizar fácilmente el sistema con tus propias fuentes de datos y montar chatbots para otros casos de uso.

También es importante señalar que la implementación RAG actual tiene algunas limitaciones. Los casos de uso más avanzados pueden requerir técnicas adicionales como: búsqueda híbrida (combinando búsqueda vectorial y por palabras clave), procesamiento y descomposición de consultas, filtrado con metadatos, re-ranking y RAG recursivo.

Si tienes alguna pregunta, comentario, o te gustaría implementar aplicaciones similares basadas en IA en tu empresa, puedes escribirme a guillermo@codeawake.com.