Highly Scalable Data Ingestion Architecture for ML and Marketing Intelligence
Leveraging AWS Ecosystem and Data Crawling for Scalable and Adaptive Data Pipelines
Today’s article is written by our guest, Rares Istoc, a veteran with over 7 years of experience building scalable software and data engineering systems in the industry.
→ Here is his 🔗 LinkedIn.
Machine learning without data is like a chef without ingredients - all the skills but nothing to cook.
These days, everything circulates around data, from personalized ads to streaming recommendations. Data drives decisions in business, healthcare, and sports. Without it, apps would be clueless, smart devices would be dumb, and predictions would be nothing more than guesses. In this digital age, data is the lifeblood of innovation and efficiency.
Ok, but why another article about data ingestion?
There are many ways to build data ingestion pipelines, and with all the new tools created over the last decade, selecting the best ones can be challenging. The answer often depends on your project’s specific needs.
In this article, you’ll explore an end-to-end solution for marketing intelligence. Using AWS’s ecosystem, you can create a scalable data-ingestion pipeline for data crawling and integrate it into various analytical processes like sales, competitor analysis, market analysis, and customer insights.
I’ll also present the challenges encountered while building this solution. Finding a complete working solution is tough, with most answers scattered across the Internet. You can access the full solution code on 🔗 GitHub.
IMPORTANT NOTE: Before diving into this solution, you must be aware of the legal implications of ingesting data from some data sources, like social media pages, so we can make sure nobody goes to jail. Please read the terms and conditions of each major platform; these will restrict you from crawling user profiles and private pages.
Table of Contents:
Architecture Overview
Implementation
Challenges & Pitfalls
Local Testings
Deployment
1. Architecture Overview
This is what we are about to build:
Here are some non-functional requirements I’ve aimed to achieve with this architecture:
Scalability: The solution can process many pages simultaneously and easily add more, handling growth at any time.
Maintainability & Adaptability: Each component is designed for easy modification and expansion without significant development time.
Components Overview:
• Scheduler: Triggers crawler lambdas for each page link.
• Crawler: Extracts various posts and information from the page link. If unfamiliar with crawling, look it up before proceeding. Details will follow in the implementation part.
• Database: MongoDB is used for our data lake storage, housing posts for later use. It excels at handling semi-structured data.
The complete flow: the scheduler triggers a crawler lambda for each page, sending the page name and link. The crawler extracts posts from the past week, storing the raw content, creation date, link, and name. The scheduler waits for all lambdas to finish, aggregates the posts from the database, and sends them to ChatGPT using prompt templates to generate reports.
2. Implementation
In this section, I’ll provide a detailed overview of the main components, breaking them down with code samples and explanations.
2.1. Scheduler
I’ll not focus much on the reporting part, though you can find it here along with all the code shared in this article. The main focus is the scheduling part, the entry point of the system where the flow starts and is orchestrated:
import json
import os
import time
from datetime import datetime, timedelta
import boto3
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.typing import LambdaContext
from src.constants import PAGE_LINK
from src.db import database
from src.utils import monitor
logger = Logger(service="decodingml/scheduler")
_client = boto3.client("lambda")
def lambda_handler(event, context: LambdaContext):
correlation_ids = []
for link in PAGE_LINK:
response = _client.invoke(
FunctionName="lambda",
InvocationType="Event",
Payload=json.dumps({"link": link}),
)
logger.info(f"Triggered crawler for: {link}")
correlation_ids.append(response["ResponseMetadata"]["RequestId"])
logger.info(f"Monitoring: {len(correlation_ids)} crawler processes")
while True:
time.sleep(15)
completed = monitor(correlation_ids)
correlation_ids = [c for c in correlation_ids if c not in completed]
if not correlation_ids:
break
logger.info(f"Still waiting for {len(correlation_ids)} crawlers to complete")
now = datetime.now()
posts = list(
database.profiles.find(
{
"date": {"$gte": (now - timedelta(days=7)), "$lte": now},
}
)
)
logger.info(f"Gathered {len(posts)} posts")
if not posts:
logger.info("Cannot generate report, no new posts available")
return
reports = generate_profiles_report(posts)
logger.info("Generated new report!")
The scheduler acts as a scatterer, iterating over a list of page links and invoking a crawler asynchronously with the InvocationType parameter set to Event, ensuring the scheduler won’t block for a single page. It stores each lambda’s correlation ID in a list and waits for all lambdas to finish, with a 15-second wait time, adjustable based on your crawler’s average completion time. Finally, it finds all crawled posts and sends them to the report generation phase.
2.2. Crawler
Here I’ll break down the actual crawling process:
import abc
import os
from datetime import datetime, timedelta
from itertools import takewhile, dropwhile
from typing import List, Dict, Any
import instaloader
from src.crawlers.base import BaseAbstractCrawler
class BaseAbstractCrawler(abc.ABC):
@abc.abstractmethod
def extract(self, link: str, **kwargs) -> None: ...
class InstagramCrawler(BaseAbstractCrawler):
def __init__(self, link: str, proxy=None):
self.link = link
self.loader = instaloader.Instaloader()
self._until = datetime.now()
self._since = self._until - timedelta(days=7)
self._proxy = proxy
def extract(self, **kwargs) -> List[Dict[str, str | Any]]:
parsed_url = urlparse(self.link)
if self._proxy:
os.environ['https_proxy'] = self._proxy.__dict__().get('http')
profile = instaloader.Profile.from_username(self.loader.context, parsed_url.path.strip('/').split('/')[0])
posts = takewhile(lambda p: p.date > self._since, dropwhile(lambda p: p.date > self._until, profile.get_posts()))
return [
{'content': post.caption, 'date': post.date, 'link': self.link}
for post in posts
]
I’ve defined a main abstraction point for all crawlers, establishing a common interface that all derived crawlers must implement. Each subclass must provide its implementation for the extract()
method, ensuring reusability and uniformity.
import re
from src.crawlers.base import BaseAbstractCrawler
from src.crawlers.instagram import InstagramCrawler
class CrawlerDispatcher:
def __init__(self) -> None:
self._crawlers = {}
def register(self, domain: str, crawler: type[BaseAbstractCrawler]) -> None:
self._crawlers[r"https://(www\.)?{}.com/*".format(re.escape(domain))] = crawler
def get_crawler(self, url: str) -> BaseAbstractCrawler:
for pattern, crawler in self._crawlers.items():
if re.match(pattern, url):
return crawler()
else:
raise ValueError("No crawler found for the provided link")
dispatcher = CrawlerDispatcher()
dispatcher.register('instagram', InstagramCrawler)
To promote and call each crawler automatically, I’ve built a dispatcher that selects and instantiates the correct crawler class based on the provided link. This acts as a registry and factory for the crawlers, managed under a unified interface and structure.
Advantages:
• Flexibility & Scalability: Allows easy addition of new domains and specialized crawlers without modifying the existing codebase.
• Encapsulation & Modularity: The dispatcher encapsulates the logic for determining which crawler to use, making the system modular and allowing each crawler to focus on its core business logic.
from datetime import datetime, timedelta
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.typing import LambdaContext
from src.crawlers import dispatcher
from src.db import database
logger = Logger(service="decodingml/crawler")
def lambda_handler(event, context: LambdaContext):
link = event.get('link')
logger.info(f"Start extracting posts for {link}")
crawler = dispatcher.get_crawler(event.get('link'))
posts = [{**page, 'correlation_id': context.aws_request_id} for page in crawler.extract()]
now = datetime.now()
existing_posts = database.profiles.find({
"date": {"$gte": (now - timedelta(days=7)), "$lte": now},
"name": link
}, projection={'date': 1})
existing_posts = [post.get('date') for post in list(existing_posts)]
posts = [post for post in posts if post.get('date') not in existing_posts]
if not posts:
logger.info("No new posts on page")
return
logger.info(f"Successfully extracted {len(posts)} posts")
database.profiles.insert_many(posts)
logger.info(f"Successfully inserted data in db")
The main entry point assembles the link from the event body, selects the correct crawler, and starts extraction jobs. After extraction, it checks for existing posts to avoid duplicates and adds new posts to the database.
3. Challenges & Pitfalls
3.1. Running headless browser instance with selenium in lambda runtime environment
This caused the most headaches. The Lambda execution environment is read-only, so writing to disk requires using a temporary file, complicating automatic binary driver installation. Therefore, you need to install the driver directly in the Docker image and reference it manually in Selenium’s driver options. The only usable driver for this setup was the Google binary driver in my case.
FROM public.ecr.aws/lambda/python:3.11 as build
# Download chrome driver and browser and manually unpack them in their folders
RUN yum install -y unzip && \
curl -Lo "/tmp/chromedriver-linux64.zip" "https://edgedl.me.gvt1.com/edgedl/chrome/chrome-for-testing/119.0.6045.105/linux64/chromedriver-linux64.zip" && \
curl -Lo "/tmp/chrome-linux64.zip" "https://edgedl.me.gvt1.com/edgedl/chrome/chrome-for-testing/119.0.6045.105/linux64/chrome-linux64.zip" && \
unzip /tmp/chromedriver-linux64.zip -d /opt/ && \
unzip /tmp/chrome-linux64.zip -d /opt/
FROM public.ecr.aws/lambda/python:3.11
# Install the function's OS dependencies using yum
RUN yum install -y \
atk \
cups-libs \
gtk3 \
libXcomposite \
alsa-lib \
libXcursor \
libXdamage \
libXext \
libXi \
libXrandr \
libXScrnSaver \
libXtst \
pango \
at-spi2-atk \
libXt \
xorg-x11-server-Xvfb \
xorg-x11-xauth \
dbus-glib \
dbus-glib-devel \
nss \
mesa-libgbm \
ffmpeg \
libxext6 \
libssl-dev \
libcurl4-openssl-dev \
libpq-dev
COPY --from=build /opt/chrome-linux64 /opt/chrome
COPY --from=build /opt/chromedriver-linux64 /opt/
COPY ./pyproject.toml ./poetry.lock ./
# Install Poetry, export dependencies to requirements.txt, and install dependencies
# in the Lambda task directory, finally cleanup manifest files.
RUN python3 -m pip install --upgrade pip && pip install poetry
RUN poetry export -f requirements.txt > requirements.txt && \
pip3 install --no-cache-dir -r requirements.txt --target "${LAMBDA_TASK_ROOT}" && \
rm requirements.txt pyproject.toml poetry.lock
# Copy function code
COPY ./src ${LAMBDA_TASK_ROOT}/src
The main idea in this Dockerfile is that I manually downloaded the Chrome driver and browser and unpacked them in a location where they can be accessed by Selenium, which usually would’ve done this directly.
This is a mandatory step for the Lambda environment. Since everything is read-only, in the next code sample I’ll show you how point Selenium to the correct driver and browser locations:
from tempfile import mkdtemp
def init_driver(self):
options = Options()
# Setup drover binary location manually
options.binary_location = '/opt/chrome/chrome'
# Run browser in headless mode
options.add_argument('--headless=new')
options.add_argument('--no-sandbox')
options.add_argument('--single-process')
options.add_argument('--window-size=1420,1080')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--disable-gpu')
options.add_argument('--disable-popup-blocking')
options.add_argument('--disable-notifications')
options.add_argument('--disable-dev-tools')
options.add_argument('--log-level=3')
options.add_argument('--ignore-certificate-errors')
options.add_argument("--no-zygote")
options.add_argument(f"--user-data-dir={mkdtemp()}")
options.add_argument(f"--data-path={mkdtemp()}")
options.add_argument(f"--disk-cache-dir={mkdtemp()}")
options.add_argument('--remote-debugging-port=9222')
self._driver = webdriver.Chrome(
service=Service("/opt/chromedriver"),
options=options,
)
I hardcoded the driver and browser locations in the Dockerfile. Additionally, I pointed several folders (e.g., user-data-dir, disk-cache-dir) to temporary directories to prevent Selenium from creating them automatically, which would cause errors due to Lambda’s disk limitations.
3.2. Aggregate Empty Pages
My initial monitoring algorithm was basic, looping over lambda invocation correlation IDs and checking the database for generated posts. However, it encountered an infinite loop when no new posts were created for some pages.
import datetime
import re
from typing import List
import boto3
_client = boto3.client('logs')
def monitor(correlation_ids: List[str]):
finished = []
now = int((datetime.datetime.now() datetime.timedelta(days=1)).timestamp() * 1000)
response = _client.filter_log_events(
logGroupName='/aws/lambda/crawler',
startTime=now,
filterPattern="REPORT RequestId"
)
for event in response['events']:
match = re.search(r'REPORT RequestId: ([^\s]+)', event.get('message'))
if match:
correlation_id = match.group(1)
if correlation_id in correlation_ids:
finished.append(correlation_id)
return finished
Here, I search through all log streams for each lambda generated in that current day and look for the message, which usually has this format: REPORT RequestId: <correlation_id>. This indicates that the lambda has reached the end of its execution, and I can mark which correlation IDs have finished.
3.3. Avoid being blocked by social media platforms
This was a pity error—the kind you would’ve spent days on—and the solution was to watch it from a different perspective. Popular social media platforms implement many anti-bot protection mechanisms to prevent crawling, from request header analysis to rate limiting to IP blocking.
And because we run our browser in headless mode to mimic realistic user-browser interaction, and all our crawlers send requests under the same IP address to multiple pages at the same time repeatedly, this screams, please block me.
To address this, I’ve used a proxy to mask my IP address and location:
import os
class ProxyConnection:
def __init__(
self,
host: str = None,
port: str = None,
username: str = None,
password: str = None,
verify_ssl: bool = False
):
self.host = host or os.getenv('PROXY_HOST')
self.port = port or os.getenv('PROXY_PORT')
self.username = username or os.getenv('PROXY_USERNAME')
self.password = password or os.getenv('PROXY_PASSWORD')
self.verify_ssl = verify_ssl
self._url = f"{self.username}:{self.password}@{self.host}:{self.port}"
def __dict__(self):
return {
'https': 'https://{}'.format(self._url.replace(" ", "")),
'http': 'http://{}'.format(self._url.replace(" ", "")),
'no_proxy': 'localhost, 127.0.0.1',
'verify_ssl': self.verify_ssl
}
To address this, I used a proxy to mask my IP and location. Paid proxies like SmartProxy offer a pool of rotating IPs, assigning a different IP to each crawler, mimicking regular user behavior. Additionally, using a proxy allows finding a country without access restrictions to public pages, ensuring smooth crawling.
4. Local Testings
To prove this works, I wrote a makefile containing some simple commands for crawler and lambda. The problem is that I’ve only managed to test the crawler locally. Since the scheduler spins up crawlers, they should be already deployed on AWS.
local-test-crawler: # Send test command on local to test the lambda
curl -X POST "http://localhost:9000/2015-03-31/functions/function/invocations" \
-d '{"link": "https://www.instagram.com/mcdonalds"}'
local-test-scheduler: # Send test command on local to test the lambda
curl -X POST "http://localhost:9000/2015-03-31/functions/function/invocations" -d '{}'
Now, most people, when testing lambda functions on a local environment, use AWS Lambda RIE (Runtime Interface Emulator), which allows you to test your lambda function packages in a container. Basically, this emulates a lambda execution environment on your local machine. As you can see, I’ve managed to do this without using the emulator, which slightly simplified my environment.
You can use these commands to test each component. For example, if you would like to test the crawler, go into your terminal and use this command:
> make local-test-crawler
As you can see, the crawling process has started, and for this page, we’ve found three new posts in the last seven days:
5. Deployment
The deployment process is defined in our GitHub repository under the ops folder, where you can explore the whole solution written in Pulumi.
You can play with the Makefile. It contains all the necessary commands to make your infrastructure up and running.
Conclusion
In this article, we’ve explored a complete end-to-end robust solution for building a Highly Scalable Data Ingestion pipeline that can leverage existing data from multiple crawlable sources for various processes like ML training, data analysis, etc.
We’ve gone through specific challenges you might face and how to overcome them in this process.
| 🔗 Check out the code on GitHub [1] and support us with a ⭐️
Within our newsletter, we keep things short and sweet.
If you enjoyed reading this article, consider checking out the full version on Medium. It’s still free ↓
Images
If not otherwise stated, all images are created by the author.