How to build a real-time News Search Engine using Vector DBs
A hands-on guide to implementing a live news aggregating streaming pipeline with Apache Kafka, Bytewax, and Upstash Vector Database.
In this mini-course, you’ll learn how to build a news search engine using Kafka, Bytewax, Vector DBs, Streamlit, and Pydantic - all in Python.
Before diving in, note that everything in this article has full code support on the Decoding ML Articles GitHub Repository.
Table of Contents
Prerequisites
1.1 Creating a new Upstash Kafka Cluster
1.2 Creating a new Upstash Vector Index
1.3 Registering to 2 live News APIsData Gathering
The Ingestion Pipeline
3.1 Consume messages from Kafka
3.2 Implement a Bytewax dataflow
3.3 Refine, Format, Chunk, and Embed Articles
3.4 Compose Vectors and upsert to VectorDBUser Interface
Starting Pipelines
Conclusion
1. Prerequisites
Before implementing anything, we have to make sure we can access each service, meaning we’ll have to set up the following:
A new Upstash Kafka Cluster
A new Upstash Vector Index
Registering to News APIs
First, we’ll have to register to Upstash, and after logging in, a dashboard like this will appear:
1.1 Creating a new Upstash Kafka Cluster
From the top bar, select Kafka and create a new cluster using the + Create Cluster button.
Once you’re done, you’ll be prompted to this view:
The next step after creating a cluster is to define a topic to which we can produce (send) and consume (get) messages.
Under the Topics tab, select Create Topic and you’ll be prompted to this view:
Now copy the endpoint, username, and password, and paste them into your .env
file.
After that, go to Topics and copy your Kafka topic name.
This is how your .env
file should look up until now:
UPSTASH_KAFKA_UNAME="[USERNAME HERE]"
UPSTASH_KAFKA_PASS="[PASSWORD HERE]"
UPSTASH_KAFKA_ENDPOINT="[ENDPOINT HERE]"
UPSTASH_KAFKA_TOPIC="[TOPIC NAME HERE]"
1.2 Creating a new Upstash Vector Index
Now, let’s go ahead and create a new Vector Database. From the top bar, select Vector and then + Create Index, you’ll be prompted to this view:
Under the Set up by a model field select sentence-transformers/all-MiniLM-L6-v2 as that’s the model we’re going to use when generating embedding for our news articles and the cosine metric for vector-distance comparison.
Copy the Index Name, Endpoint, and Token and paste them into our .env
file.
This is how your .env
file should look up until now:
UPSTASH_KAFKA_UNAME="[USERNAME HERE]"
UPSTASH_KAFKA_PASS="[PASSWORD HERE]"
UPSTASH_KAFKA_ENDPOINT="[ENDPOINT HERE]"
UPSTASH_KAFKA_TOPIC="[TOPIC NAME HERE]"
UPSTASH_VECTOR_ENDPOINT="[VECTOR ENDPOINT HERE]"
UPSTASH_VECTOR_TOPIC="[VECTOR NAME HERE]"
UPSTASH_VECTOR_KEY="[VECTOR TOKEN HERE]"
1.3 Registering to News APIs
We will be using the following APIs from which to fetch articles:
🔗 NewsAPI - provides a free developer plan where we can call their API 100 times a day.
🔗 NewsData - provides a free plan where we get 200 credits/day, and each credit equals 10 articles, this means that we can fetch a total of 2000 articles per day.
The next step is to register on both platforms — don’t worry, it’s as straightforward as possible.
After you’ve registered to NewsAPI, head over to /account and you’ll see a
API_KEY
field, copy and paste it into our.env
atNEWSAPI_KEY
.After you’ve registered to NewsData, head over to /api-key, copy the
API KEY
and paste it into our.env
file atNEWSDATAIO_KEY
.
Here’s how a payload looks like from both APIs:
2. Data Gathering
Understanding how we produce messages to our Kafka Cluster:
Key considerations from the implementation:
We’re spawning as many KafkaProducerThread instances as there are fetch sources.
We wrap all these threads under the KafkaProducerSwarm class.
We share a single KafkaProducer instance across all threads, which will communicate with our cluster.
We’re following a Singleton Design Pattern [5] as we can scale to N fetching threads but still keep a single KafkaProducer instance, this will ensure higher throughput and consume fewer resources.
Throughout the implementations discussed above, we’re using the following Pydantic models:
NewsDataIOModel : wraps and formats a raw payload from the NewsData API.
NewsAPIModel: wraps and formats a raw payload from the NewsAPI API.
CommonDocument: establishes a common format between the different News formats mentioned above.
RefinedDocument: filters the common format by grouping helpful fields under metadata.
ChunkedDocument: chunks the text and ensures lineage between chunk_id and document_id.
EmbeddedDocument: embeds chunks ensuring lineage between chunk_id and document_id.
Pydantic is a powerful library for validating data models.
Here’s an article that covers everything you need to know: Pydantic.
At the root of our project, in the Makefile
we have the run_producers:
....
run_producers:
@echo "$(GREEN) [RUNNING] Ingestion Pipeline Kafka Producers $(RESET)"
@bash -c "poetry run python -m src.producer"
...
This will start the KafkaSwarm and handle threads that fetch articles from the NewsAPIs, format them, and push them to our Kafka Cluster.
From the logs, we’ve seen that both Producer threads have sent 5msgs each. To check if our messages have reached the cluster, go to Upstash Console→ KafkaCluster → Messages. You should see a view like this:
3. The Ingestion Pipeline
After we’ve validated that we’ve got messages on our Kafka Topic, we have to implement the “consumer” pipeline, meaning we’ll:
Read messages from our Kafka Topic
Parse, format, chunk, and generate embeddings
Generate Vector objects and upsert them to Upstash Vector Index.
Here, we’re using Bytewax to define a DataFlow that chains these steps in the correct order.
Let’s dive straight into the implementation and explain key concepts!
3.1 Defining the Kafka Source as Input for our Bytewax Flow.
# consumer.py
...
from bytewax.connectors.kafka import KafkaSinkMessage, KafkaSource
def build_kafka_stream_client():
"""
Build a Kafka stream client to read messages from the Upstash Kafka topic using the ByteWax KafkaSource connector.
"""
kafka_config = {
"bootstrap.servers": settings.UPSTASH_KAFKA_ENDPOINT,
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "SCRAM-SHA-256",
"sasl.username": settings.UPSTASH_KAFKA_UNAME,
"sasl.password": settings.UPSTASH_KAFKA_PASS,
"auto.offset.reset": "earliest", # Start reading at the earliest message
}
kafka_input = KafkaSource(
topics=[settings.UPSTASH_KAFKA_TOPIC],
brokers=[settings.UPSTASH_KAFKA_ENDPOINT],
add_config=kafka_config,
)
logger.info("KafkaSource client created successfully.")
return kafka_input
...
Key points from this implementation:
build_kafka_stream_client : creates an instance of a KafkaConsumer using the predefined Bytewax KafkaSource connector.
process_message : a callback that will process the message from our Kafka Topic.
3.2 Defining the Upstash Vector (Index) as the output of our Bytewax flow.
# vector.py
...
from bytewax.outputs import DynamicSink, StatelessSinkPartition
from upstash_vector import Index, Vector
class UpstashVectorSink(StatelessSinkPartition):
def __init__(
self,
client: Index,
collection_name: str = None,
):
self._client = client
self._collection_name = collection_name
self._upsert_batch_size = settings.UPSTASH_VECTOR_UPSERT_BATCH_SIZE
def write_batch(self, documents: List[EmbeddedDocument]):
vectors = [
Vector(id=doc.doc_id, vector=doc.embeddings, metadata=doc.metadata)
for doc in documents
]
# Batch upsert for efficiency
for i in range(0, len(vectors), self._upsert_batch_size):
batch_vectors = vectors[i : i + self._upsert_batch_size]
try:
self._client.upsert(vectors=batch_vectors)
except Exception as e:
logger.error(f"Caught an exception during batch upsert {e}")
...
Key points from this implementation:
UpstashVectorOutput : instantiates the Bytewax DynamicSink abstraction designed to route data to various destinations. In our case, this will wrap over the Upstash Vector Index client connection.
UpstashVectorSink : wraps over our DynamicSink and handles the functionality of upserting vectors to our VectorDatabase.
3.3 Building the rest of Bytewax Flow
Here’s the full implementation of our DataFlow that streams messages from Upstash Kafka Topic, cleans, refines, chunks, embeds, and upserts vectors to Upstash Vector Index.
# flow.py
...
def build(
model_cache_dir: Optional[Path] = None,
) -> Dataflow:
model = TextEmbedder(cache_dir=model_cache_dir)
dataflow = Dataflow(flow_id="news-to-upstash")
stream = op.input(
step_id="kafka_input",
flow=dataflow,
source=_build_input(),
)
stream = op.flat_map("map_kinp", stream, process_message)
stream = op.map("refine", stream, RefinedDocument.from_common)
stream = op.flat_map(
"chunkenize",
stream,
lambda refined_doc: ChunkedDocument.from_refined(refined_doc, model),
)
stream = op.map(
"embed",
stream,
lambda chunked_doc: EmbeddedDocument.from_chunked(chunked_doc, model),
)
stream = op.output("output", stream, _build_output())
logger.info("Successfully created bytewax dataflow.")
logger.info(
"\tStages: Kafka Input -> Map -> Refine -> Chunkenize -> Embed -> Upsert"
)
return dataflow
def _build_input() -> KafkaSource:
return build_kafka_stream_client()
def _build_output() -> DynamicSink:
return UpstashVectorOutput()
Key points from this implementation:
A TextEmbedder instance which is a singleton wrapper over our embedding model sentence-transformers/all-MiniLM-L6-v2 we’ll use to compute embeddings from the article’s text.
A stream variable is used to define and control the Bytewax DataFlow.
Various debugging steps across different stages of the DataFlow using Bytewax’s
op.inspect
operator.A _build_input() method that wraps the KafkaSource client, for simplicity.
A _build_output() method that wraps the UpstashVector client, for simplicity.
4. User Interface
The UI is a basic Streamlit application that has the following functionalities:
A text search bar
A div section that populates cards with the fetched articles from our vector database.
A card contains the following data fields:
Date Published
Similarity Score
Article Image
A SeeMore button that once clicked, sends you to the original article URL.
Here’s an example:
5. Starting Pipelines
Let’s check the key commands from the Makefile attached to the project’s repo:
RED := \033[0;31m
BLUE := \033[0;34m
GREEN := \033[0;32m
YELLOW := \033[0;33m
RESET := \033[0m
ENV_NAME := "py39upstash"
install:
@echo "$(GREEN) [CONDA] Creating [$(ENV_NAME)] python env $(RESET)"
conda create --name $(ENV_NAME) python=3.9 -y
@echo "Activating the environment..."
@bash -c "source $$(conda info --base)/etc/profile.d/conda.sh && conda activate $(ENV_NAME) \
&& pip install poetry \
poetry env use $(which python)"
@echo "Installing Packages"
@echo "Changing to pyproject.toml location..."
@bash -c " PYTHON_KEYRING_BACKEND=keyring.backends.fail.Keyring poetry install"
test:
@echo "$(GREEN) [TESTING] Running UnitTests $(RESET)"
@bash -c "poetry run pytest tests/"
run_producers:
@echo "$(GREEN) [RUNNING] Producers $(RESET)"
@bash -c "poetry run python -m src.producer"
run_pipeline:
@echo "$(GREEN) [RUNNING] Bytewax Pipeline $(RESET)"
@bash -c "RUST_BACKTRACE=1 poetry run python -m bytewax.run src/start:flow"
clean_vdb:
@echo "$(RED) [CLEANING] Upstash Vector DB $(RESET)"
@bash -c "poetry run python -m src.helpers clean_vectordb"
run_ui:
@echo "$(GREEN) [RUNNING] Streamlit UI interface $(RESET)"
@bash -c "poetry run streamlit run ui.py"
Run these in the following order:
1. make install # prepares and activates the env
2. make run_producers # runs the data gathering pipeline
3. make run_pipeline # runs the bytewax ingestion pipeline
4. make run_ui # starts the UI (accessible at localhost:8080)
6. Conclusion
Congratulations!
Let’s recap what you’ve learned from this mini-course:
Integrate serverless Kafta and Vector services from Upstash
Validate data exchange models using Pydantic
Choose when to use the Singleton pattern
How to define a complex Bytewax dataflow
How to chunk/format/embed payloads to Upstash Vector
How to implement a quick and nice-looking Streamlit UI
System design, and how to plan decoupled “services” that don’t rely on each other.
This is a shorter version, check the full-fledged version of the article on our Medium publication.
↓↓↓