13. Design Ad Click Event Aggregation
In this chapter, we explore how to design an ad click event aggregation system at Facebook or Google scale. 在本章中,我们将探讨如何设计一个与 Facebook 或 Google 规模相当的广告点击事件汇总系统。
Before we dive into technical design, let’s learn about the core concepts of online advertising to better understand this topic. One core benefit of online advertising is its measurability, as quantified by real-time data. 在我们深入技术设计之前,让我们先了解一下在线广告的核心概念,以更好地理解这个主题。在线广告的一个核心好处是它的可衡量性,由实时数据量化。
Digital advertising has a core process called Real-Time Bidding (RTB), in which digital advertising inventory is bought and sold. 数字广告有一个核心流程叫做实时竞价(RTB),在这个流程中,数字广告库存被买卖。
Data accuracy is also very important. Ad click event aggregation plays a critical role in measuring the effectiveness of online advertising, which essentially impacts how much money advertisers pay. Based on the click aggregation results, campaign managers can control the budget or adjust bidding strategies, such as changing targeted audience groups, keywords, etc. The key metrics used in online advertising, including click-through rate (CTR) [1] and conversion rate (CVR) [2], depend on aggregated ad click data. 数据准确性也非常重要。广告点击事件聚合在衡量在线广告的有效性方面起着至关重要的作用,这本质上影响着广告主支付多少钱。基于点击聚合结果,广告活动经理可以控制预算或调整竞价策略,比如改变目标受众群体、关键词等。在线广告中使用的关键指标,包括点击率(CTR)[1]和转化率(CVR)[2],取决于聚合的广告点击数据。
Step 1 - Understand the Problem and Establish Design Scope
Functional requirements
- Aggregate the number of clicks of
ad_id
in the last M minutes. 聚合 ad_id 最近 M 分钟的点击次数。 - Return the top 100 most clicked
ad_id
every minute. 每分钟返回点击次数最多的 100 个 ad_id。 - Support aggregation filtering by different attributes. 支持按不同属性过滤聚合。
- Dataset volume is at Facebook or Google scale (see the back-of-envelope estimation section below for detailed system scale requirements). 数据集的规模是 Facebook 或 Google 级别的。
Non-functional requirements
- Correctness of the aggregation result is important as the data is used for RTB and ads billing. 聚合结果的正确性很重要,因为数据用于实时竞价和广告计费。
- Properly handle delayed or duplicate events. 正确处理延迟或重复的事件。
- Robustness. The system should be resilient to partial failures. 鲁棒性。系统应该对部分故障具有弹性。
- Latency requirement. End-to-end latency should be a few minutes, at most. 端到端延迟应该是几分钟,最多。
Back-of-the-envelope estimation 粗略估计
Let’s do an estimation to understand the scale of the system and the potential challenges we will need to address.
- 1 billion DAU.
- Assume on average each user clicks 1 ad per day. That’s 1 billion ad click events per day.
- Ad click QPS =
10^9 events / 10^5 seconds
in a day =10,000
- Assume peak ad click QPS is 5 times the average number. Peak QPS =
50,000
QPS. - Assume a single ad click event occupies 0.1 KB storage. Daily storage requirement is:
0.1 KB \* 1 billion = 100 GB
. The monthly storage requirement is about3 TB
.
Step 2 - Query API Design
In our case, however, a client is the dashboard user (data scientist, product manager, advertiser, etc.) who runs queries against the aggregation service. 在我们的案例中,客户端是 dashboard 用户(数据科学家、产品经理、广告商等),他们对聚合服务运行查询。
Let’s review the functional requirements so we can better design the APIs:
- Aggregate the number of clicks of
ad_id
in the last M minutes. 聚合 ad_id 最近 M 分钟的点击次数。 - Return the top N most clicked
ad_ids
in the last M minute. 每分钟返回点击次数最多的 N 个 ad_id。 - Support aggregation filtering by different attributes. 支持按不同属性过滤聚合。
We only need two APIs to support those three use cases because filtering (the last requirement) can be supported by adding query parameters to the requests.
API 1: Aggregate the number of clicks of ad_id
in the last M minutes.
返回给定 ad_id 的聚合事件计数
API | Detail |
---|---|
GET /v1/ads/{:ad_id}/aggregated_count | Return aggregated event count for a given ad_id |
Request parameters are:
Field | Description | Type |
---|---|---|
from | Start minute (default is now minus 1 minute) | long |
to | End minute (default is now) | long |
filter | An identifier for different filtering strategies. For example, filter = 001 filters out non-US clicks | long |
Response:
Field | Description | Type |
---|---|---|
ad_id | The identifier of the ad | string |
count | The aggregated count between the start and end minutes | long |
API 2: Return top N most clicked ad_ids
in the last M minutes
返回过去 M 分钟内点击次数最多的前 N 个广告
API | Detail |
---|---|
GET /v1/ads/popular_ads | Return top N most clicked ads in the last M minutes |
Request parameters are:
Field | Description | Type |
---|---|---|
count | Top N most clicked ads | integer |
window | The aggregation window size (M) in minutes | integer |
filter | An identifier for different filtering strategies | long |
Response:
Field | Description | Type |
---|---|---|
ad_ids | A list of the most clicked ads | array |
Data Model
There are two types of data in the system: raw data and aggregated data. 原始数据和聚合数据。
Raw data 原始数据
Below shows what the raw data looks like in log files:
[AdClickEvent] ad001, 2021-01-01 00:00:01, user 1, 207.148.22.22, USA
Table below lists what the data fields look like in a structured way. Data is scattered on different application servers.
ad_id | click_timestamp | user | ip | country |
---|---|---|---|---|
ad001 | 2021-01-01 00:00:01 | user1 | 207.148.22.22 | USA |
ad001 | 2021-01-01 00:00:02 | user1 | 207.148.22.22 | USA |
ad002 | 2021-01-01 00:00:02 | user2 | 209.153.56.11 | USA |
Aggregated data 聚合数据
Assume that ad click events are aggregated every minute. Table 8 shows the aggregated result.
ad_id | click_minute | count |
---|---|---|
ad001 | 202101010000 | 5 |
ad001 | 202101010001 | 7 |
To support ad filtering, we add an additional field called filter_id
to the table. Records with the same ad_id
and click_minute
are grouped by filter_id
as shown in Table 9, and filters are defined in Table 10.
为了支持广告过滤,我们在表中添加了一个额外的字段叫做 filter_id。表中相同 ad_id 和 click_minute 的记录按 filter_id 分组,如表 9 所示,过滤器定义在表 10 中。
Table 9 Aggregated data with filters 聚合数据与过滤器
ad_id | click_minute | filter_id | count |
---|---|---|---|
ad001 | 202101010000 | 0012 | 2 |
ad001 | 202101010000 | 0023 | 3 |
ad001 | 202101010001 | 0012 | 1 |
ad001 | 202101010001 | 0023 | 6 |
Table 10 Filter table 过滤器表
filter_id | region | IP | user_id |
---|---|---|---|
0012 | US | * | * |
0013 | * | 123.1.2.3 | * |
Table 11 Support top N most clicked ads in the last M minutes 支持过去 M 分钟内点击次数最多的前 N 个广告
Field | Type | Description |
---|---|---|
window_size | integer | The aggregation window size (M) in minutes |
update_time_minute | timestamp | Last updated timestamp (in 1-minute granularity) |
most_clicked_ads | array | List of ad IDs in JSON format. |
Comparison 对比
Table 12 Raw data vs aggregated data
Raw data only | Aggregated data only | |
---|---|---|
Pros | - Full data set- Support data filter and recalculation | - Smaller data set - Fast query |
Cons | - Huge data storage - Slow query | - Data loss. This is derived data. For example, 10 entries might be aggregated to 1 entry |
- If something goes wrong, we could use the raw data for debugging. If the aggregated data is corrupted due to a bad bug, we can recalculate the aggregated data from the raw data, after the bug is fixed. 如果出现问题,我们可以使用原始数据进行调试。如果由于一个糟糕的 bug 而导致聚合数据损坏,我们可以在 bug 修复后从原始数据重新计算聚合数据。
- Aggregated data should be stored as well. The data size of the raw data is huge. We run read queries on aggregated data. 聚合数据也应该存储。原始数据的数据量很大。我们在聚合数据上运行读取查询。
- Raw data serves as backup data. We usually don’t need to query raw data unless recalculation is needed. Old raw data could be moved to cold storage to reduce costs. 原始数据作为备份数据。我们通常不需要查询原始数据,除非需要重新计算。旧的原始数据可以移动到冷存储以降低成本。
- Aggregated data serves as active data. It is tuned for query performance. 聚合数据作为活跃数据。它是为查询性能调优的。
Step 3 - High-Level Design
In real-time big data [8] processing, data usually flows into and out of the processing system as unbounded data streams. The aggregation service works in the same way; the input is the raw data (unbounded data streams), and the output is the aggregated results (see Figure 2). 在实时大数据处理中,数据通常以无界数据流的形式流入和流出处理系统。聚合服务的工作方式也是如此;输入是原始数据(无界数据流),输出是聚合结果(见图 2)。
Asynchronous processing 异步处理
A common solution is to adopt a message queue (Kafka) to decouple producers and consumers. This makes the whole process asynchronous and producers/consumers can be scaled independently.
Putting everything we have discussed together, we come up with the high-level design as shown in Figure 3. Log watcher, aggregation service, and database are decoupled by two message queues. The database writer polls data from the message queue, transforms the data into the database format, and writes it to the database.
We need the second message queue like Kafka to achieve end-to-end exactly-once semantics (atomic commit). 我们需要像 Kafka 这样的第二个消息队列来实现端到端的 exactly-once 语义(原子提交)。
// ad click event data in first message queue
{
"topic": "ad_click_event",
"partition": 0,
"offset": 1,
"key": "user001",
"value": {
"ad_id": "12345",
"click_timestamp": "2023-10-24T12:00:00Z",
"user_id": "user001",
"ip": "192.168.1.1",
"country": "CN"
}
}
// Ad click counts aggregated at per-minute granularity.
// 按每分钟粒度聚合的广告点击计数。
{
"topic": "ad_click_aggregated",
"partition": 0,
"offset": 2,
"key": "12345",
"value": {
"click_minute": "2023-10-24T12:00",
"count": 10
}
},
// Top N most clicked ads aggregated at per-minute granularity.
// 按每分钟粒度聚合的前 N 个点击次数最多的广告。
{
"topic": "top_ads_aggregated",
"partition": 0,
"offset": 3,
"key": "2023-10-24T12:00",
"value": {
"update_time_minute": "2023-10-24T12:00",
"most_clicked_ads": ["12345", "67890"]
}
}
Aggregation Service 聚合计算
The MapReduce framework is a good option to aggregate ad click events. The directed acyclic graph (DAG) is a good model for it [9]. The key to the DAG model is to break down the system into small computing units, like the Map/Aggregate/Reduce nodes.
Main use cases
Now that we understand how MapReduce works at the high level, let’s take a look at how it can be utilized to support the main use cases:
- Aggregate the number of clicks of
ad_id
in the last M mins. 聚合 ad_id 最近 M 分钟的点击次数。 - Return top N most clicked
ad_ids
in the last M minutes. 每分钟返回点击次数最多的 N 个 ad_id。 - Data filtering. 数据过滤。
Use case 1: aggregate the number of clicks
As shown in Figure, input events are partitioned by ad_id (ad_id % 3)
in Map nodes and are then aggregated by Aggregation nodes.
Use case 2: return top N most clicked ads
- Figure shows a simplified design of getting the top 3 most clicked ads, which can be extended to top N. 展示了获取点击次数最多的前 3 个广告的简化设计,可以扩展到前 N 个。
- Input events are mapped using
ad_id
and each Aggregate node maintains a heap data structure to get the top 3 ads within the node efficiently. 输入事件使用 ad_id 进行映射,每个聚合节点维护一个堆数据结构,以便在节点内有效地获取前 3 个广告。 - In the last step, the Reduce node reduces 9 ads (top 3 from each aggregate node) to the top 3 most clicked ads every minute. 在最后一步,Reduce 节点将 9 个广告(每个聚合节点的前 3 个)减少到每分钟点击次数最多的前 3 个广告。
Use case 3: data filtering
To support data filtering like "show me the aggregated click count for ad001 within the USA only", we can pre-define filtering criteria and aggregate based on them. For example, the aggregation results look like this for ad001
and ad002
ad_id | click_minute | country | count |
---|---|---|---|
ad001 | 202101010001 | USA | 100 |
ad001 | 202101010001 | GPB | 200 |
ad001 | 202101010001 | others | 3000 |
ad002 | 202101010001 | USA | 10 |
ad002 | 202101010001 | GPB | 25 |
ad002 | 202101010001 | others | 12 |
This technique is called the star schema 星型模型 [11], which is widely used in data warehouses. The filtering fields are called dimensions. This approach has the following benefits:
- It is simple to understand and build.
- The current aggregation service can be reused to create more dimensions in the star schema. No additional component is needed. 可以重用当前的聚合服务以在星型模式中创建更多维度。不需要额外的组件。
- Accessing data based on filtering criteria is fast because the result is pre-calculated. 基于过滤条件访问数据很快,因为结果是预先计算的。
A limitation with this approach is that it creates many more buckets and records, especially when we have a lot of filtering criteria.
Fact Constellation Schema 星座模型 aka. Galaxy Schema
事实表 Fact Table
事实表是数据仓库或数据集市的核心,它包含业务过程的性能度量或事实(如销售额、存货数量等)。它还包含维度表的外键,这些外键是与事实关联的维度(如时间、产品、地点等)的指针。
一个事实表通常包含两种类型的列:
- Facts: Numeric performance measures, such as total amount or quantity. 数值性能度量,如总金额、总数量等。- - Foreign Keys: Pointers to associated dimension tables, allowing for aggregation and analysis across various dimensions. 外键到相关的维度表,使数据能够在多个维度上进行聚合和分析。