Skip to main content

Big Data & Pipeline Overivew

1. 常见的大数据组件与工具

Hadoop

  • Big Data Framework 大数据框架
  • Apache Hadoop is an open-source framework for distributed storage and processing of large sets of data using a cluster of commodity hardware. It's known for its Hadoop Distributed File System (HDFS) and its processing engine (MapReduce). Hadoop enables businesses to quickly gain insights from massive data sets by distributing the data and processing across multiple machines. 一个开源框架,用于使用商用硬件集群分布式存储和处理大量数据。它以其 Hadoop 分布式文件系统 (HDFS) 及其处理引擎 (MapReduce) 而闻名。 Hadoop 使企业能够通过在多台机器上分布数据和处理来快速从海量数据集中获得洞察。

Hive

  • Data Warehousing 数据仓库
  • Apache Hive is a data warehousing project built on top of Hadoop, providing a SQL-like language known as HiveQL. Hive allows professionals who are familiar with SQL to run queries on large-scale data stored in Hadoop HDFS. It facilitates reading, writing, and managing large datasets residing in distributed storage using SQL-like syntax, making the transition from SQL to big data smooth. 一个构建在 Hadoop 之上的数据仓库项目,提供一种类似 SQL 的语言,称为 HiveQL。 Hive 允许熟悉 SQL 的专业人员对 Hadoop HDFS 中存储的大规模数据运行查询。它有助于使用类似 SQL 的语法读取、写入和管理驻留在分布式存储中的大型数据集,从而使从 SQL 到大数据的过渡顺利。

Google Big Query

  • Fully-managed, Serverless Data Warehouse 完全托管的无服务器数据仓库
  • Google BigQuery is an OLAP system similar to Spark and Hive. It supports both stream and batch data analysis. Fully Managed by Google, there's no need to worry about system operations and maintenance. Google BigQuery 是一个OLAP系统,支持流数据和批数据分析,类似于Spark和Hive,但完全由Google托管,无需担心系统和运维。

Read Kafka Data to BigQuery

import com.google.cloud.bigquery.*;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class KafkaToBigQuery {

private static final String BOOTSTRAP_SERVERS = "your_kafka_servers";
private static final String TOPIC = "your_topic_name";
private static final String GROUP_ID = "your_consumer_group";
private static final String DATASET_NAME = "your_dataset_name";
private static final String TABLE_NAME = "your_table_name";

public static void main(String[] args) {

// Kafka setup
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

Consumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList(TOPIC));

// BigQuery setup
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
TableId tableId = TableId.of(DATASET_NAME, TABLE_NAME);

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

records.forEach(record -> {
InsertAllRequest.Builder builder = InsertAllRequest.newBuilder(tableId);

// Convert Kafka record value to a BigQuery row
builder.addRow(ImmutableMap.of(
"column_name_1", record.key(),
"column_name_2", record.value()
));

InsertAllResponse response = bigquery.insertAll(builder.build());

if (response.hasErrors()) {
// Handle errors here...
System.out.println("Error inserting rows to BigQuery");
}
});
}
}
}

Google Dataflow

Google Cloud Dataflow is a fully managed stream and batch data processing service on Google Cloud. Built on Apache Beam SDK, it's a unified model for defining both batch and stream processing tasks. Automatic scaling and no cluster management are among its key features. Google Cloud Dataflow 是 Google Cloud 上的一个完全托管的流和批数据处理服务。基于 Apache Beam SDK,它提供了一个统一的模型,用于定义批处理和流处理任务。自动扩展和无需集群管理是其主要特点。

Demo question: 编写 GCP Dataflow 以从两个订阅(subscription)接收信息。订阅 A 会发送带有文档类型属性 document type: online 的消息。订阅 B 发送的文档类型为 Club。该管道将两个有效载荷写入不同的存储桶,并输出文件名。

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.storage.GcsIO;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;

public class GCPDataflowDemo {

public static void main(String[] args) {
// 创建管道
Pipeline p = Pipeline.create();

// 从订阅 A 接收消息,并根据属性将其写入一个存储桶
p.apply("Read from Sub A", PubsubIO.readStrings().fromSubscription("projects/your_project/subscriptions/subA"))
.apply("Filter Online", ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().contains("document type: online")) {
c.output(c.element());
}
}
}))
.apply("Write to Bucket for Online", GcsIO.write().to("gs://your_bucket_for_online/path"));

// 从订阅 B 接收消息,并根据属性将其写入另一个存储桶
p.apply("Read from Sub B", PubsubIO.readStrings().fromSubscription("projects/your_project/subscriptions/subB"))
.apply("Filter Club", ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().contains("document type: Club")) {
c.output(c.element());
}
}
}))
.apply("Write to Bucket for Club", GcsIO.write().to("gs://your_bucket_for_club/path"));

// 运行管道
p.run().waitUntilFinish();
}
}

Ansible

  • Automation Tool 自动化工具
  • Ansible is widely used for automating IT tasks such as configuration management, application deployment, and middleware provisioning. In big data environments, it can automate the deployment and management of clusters. 自动化集群的部署和管理。

Apache Airflow

  • Workflow Management Platform 工作流管理平台
  • Apache Airflow is designed to programmatically author, schedule, and monitor workflows. It allows users to define workflows as Directed Acyclic Graphs (DAGs) to organize and schedule tasks. 旨在以编程方式编写、安排和监控工作流程。它允许用户将工作流程定义为有向无环图 (DAG) 来组织和安排任务。

Apache Beam

  • Data Processing Engine 数据处理引擎
  • Apache Beam provides a unified programming model for defining data processing workflows and supports various data processing engines (e.g., Apache Flink, Apache Spark) and data storage systems. 提供一个用于定义数据处理工作流的编程模型,并支持多种数据处理引擎(如 Apache Flink、Apache Spark 等)和多种数据存储系统。

Apache Spark

  • Big Data Computing Framework 大数据计算框架
  • Apache Spark is a unified analytics engine for large-scale data processing. It provides APIs for batch processing, stream processing, machine learning, and graph computation. 用于大规模数据处理的统一分析引擎。它提供了用于数据批处理、流处理、机器学习和图计算的 API。
  • Batch/Stream Processing Framework 流批一体化处理框架
  • Apache Flink is a framework for processing unbounded and bounded data streams. It supports event-time processing and state management, providing exactly-once processing semantics in case of failures. 一个处理无界和有界数据流的框架。它支持事件时处理和状态管理,在发生故障时提供一次性处理语义。

Apache Storm

  • Real-time Computation System 实时计算框架
  • Apache Storm is a computation system for processing data streams in real-time. It can be used for real-time analytics, online machine learning, continuous computation, and more. 一个用于实时数据流处理的计算系统。它可以用于实时分析、在线机器学习、连续计算等。

2. Case - Logging Collector Pipeline 日志采集器

Step 1. Log Collection 日志收集

  • Log Generation: Our backend Applications generate logs during running.
  • Send to Kafka: We have a service that will publish logs asynchronously to a specific Kafka topic

Step 2. Log Processing 日志处理

Consume from Kafka & Data Processing

We use Flink to consume stream logs from Kafka, process the logs, which might involve cleaning, aggregating, or other transformations.

  • a. Define Kafka Data Source
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092"); // Kafka server address
properties.setProperty("group.id", "my-consumer-group"); // Consumer group ID

DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer<>("my-topic", new SimpleStringSchema(), properties));
  • b. Data Processing Operations 数据处理操作: We perform various data processing operations on the data consumed from Kafka in Flink, such as map, filter, window, etc. 对数据进行各种数据处理操作
DataStream<MyData> processedStream = stream
.map(new MapFunction<String, MyData>() {
@Override
public MyData map(String value) {
// Implement your data transformation logic
return new MyData(value);
}
})
.filter(new FilterFunction<MyData>() {
@Override
public boolean filter(MyData value) {
// Implement your filtering logic
return value != null;
}
});
  • c. Define Sink 定义数据池: We define one or more sinks to write the processed data to the target storage system. 定义一个或多个 Sink,将处理后的数据写入到目标存储系统。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));

ElasticsearchSink.Builder<MyData> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<MyData>() {
public IndexRequest createIndexRequest(MyData element) {
Map<String, String> json = new HashMap<>();
json.put("data_field", String.valueOf(element.getDataField()));
// ... other field mappings
return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}

@Override
public void process(MyData element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}
);

esSinkBuilder.setBulkFlushMaxActions(1); // trigger a flush for every element
// ... other configurations

myDataStream.addSink(esSinkBuilder.build());

Step 4. Log Storage 数据存储

Choose Storage Solution: Processed log data usually needs to be stored in one or more target systems for further analysis and querying. Possible storage solutions include:

  • Data Warehouse 数据仓库: Like Google BigQuery or Amazon Redshift, for storing and analyzing large-scale structured data.
  • Data Lake 数据湖: Like Amazon S3 or Hadoop HDFS, for storing large-scale semi-structured or unstructured data.
  • Time-Series Database 时序数据库: Like InfluxDB, for storing and querying time-series data.
  • Search and Analysis Engine 搜索与分析引擎: Like Elasticsearch, for storing, searching, and real-time log data analysis.

Step 5. Log indexing & Query in ES 日志索引与查询

  • Configure the config/kibana.yml file
elasticsearch.hosts: ["http://your_elasticsearch_server_address:9200"]
  • Then, we can start Kibana
bin/kibana
  • Set up Index Pattern 设定索引模式
    • 输入你在 Elasticsearch 中用于存储数据的索引的名称或模式
    • Specify the pattern matching your Elasticsearch indices and follow the setup steps.

Case - ETL Data pipeline

Extract-Transform-Load 提取-转换-加载

Case - Real-time data pipeline