Introduction
In the previous article, we discussed how to capture fresh data through database logs. At TapData, our engine’s abstraction diverges from many other stream processing frameworks. In our system:
-
There’s no distinction between batch and streaming data — all are treated uniformly as a single object type, referred to as a
Record. -
All data originates from a single concept:
DataSource. -
All data transformation happens in a unified stage:
DataStage.
This design treats all data as a continuous stream, regardless of its source or update frequency. The result is a truly unified framework where batch and streaming are fully integrated—not just in naming, but in computation logic as well.
So here’s the question: What kind of data structure is needed to support this unified model?
Designing a Unified Structure
The first challenge is expressing batch and stream data consistently. We treat existing batch data as if it were freshly inserted streaming data. Since streaming data already includes inserts, updates, and deletes, it can be viewed as a superset of batch data.
Let’s build the structure from scratch and walk through each element step-by-step.
Here’s an example:
{
"op": "u", // 一个更新操作
"ts": 1465491461815, // 操作时间
"offset": "123456", // 操作的位移
"before": { // 更新之前的值
"_id": 12345,
"uid": 12345,
"name": "tapdata",
},
"after": { // 更新之后的值
"_id": 12345,
"uid": 12345,
"name": "tap",
"nick": "dfs", },
"patch": { // 更新操作的内容
"$set": {
"name": "tap",
"nick": "dfs",
}
},
"key": { // 记录唯一标识条件, 如果没有, 可以为 {}
"_id": 12345,
},
"source": { // 数据源的属性
"connector": "mongodb",
"name": "fulfillment",
"snapshot": true,
"db": "system",
"table": "user",
}
}
Field-by-Field Explanation
after – The Fresh Value
This represents the new state of the data:
-
For inserts: the newly written value.
-
For updates: the value after change.
-
For deletes: usually an empty
{}.
before – The Old Value
This is the state before the operation. It’s not always essential for replication, but crucial in stream processing. For example, if you’re computing a running sum based on a field, you only need the delta between new and old values—
after - before—rather than recomputing from scratch each time.op – Operation Type
Marks the kind of operation:
-
Insert
-
Update
-
Delete
While technically derivable from the presence of
before/after, storing this flag makes the data easier to interpret.patch – The Operation Details
Represents what specifically changed (e.g., which fields were set or removed). This can optimize sync logic and enhance readability. It can be inferred from
before and after, but explicit recording improves clarity.key – Unique Identifier
Identifies the record being modified. Usually the primary key, but if unavailable, a unique index or full
before value can be used as fallback.source – Origin Information
Metadata about the data source:
-
Connector type
-
Source name
-
Database and table
-
Whether the record came from a snapshot or a change event
Example:
{
"source": {
"connector": "mongodb",
"name": "fulfillment",
"ts": 1558965508000,
"snapshot": false,
"db": "inventory",
"table": "customers"
}
}
Additional Structural Elements
ts – Timestamp
Represents the time the operation occurred (in milliseconds). Useful for data recovery, rewind, and debugging.
offset – Stream Position
A unique marker of where the event occurred in the data stream. More precise than timestamps for event ordering, especially when many operations happen within the same millisecond.
type – Data or Schema Event
Indicates whether this record is:
-
A data event (
type = dml) -
A schema change event (
type = ddl)
TapData separates schema changes (DDL) into independent events rather than embedding them in the stream. This reduces redundancy and simplifies data processing.
Implementation Challenges
Even with a well-defined structure, implementing real-time streaming across heterogeneous databases poses several challenges:
-
Missing Values
In theory:
-
Inserts should contain
after -
Updates should contain both
beforeandafter -
Deletes should contain
before
However, in practice (e.g., MongoDB), change logs are optimized for replication—not for capturing full change history. Tools like Debezium explain this clearly:
In MongoDB’s oplog, update events do not contain the before or after states of the changed document. Consequently, it is not possible for a Debezium connector to provide this information. However, a Debezium connector provides a document’s starting state in create and read events. Downstream consumers of the stream can reconstruct document state by keeping the latest state for each document and comparing the state in a new event with the saved state. Debezium connector’s are not able to keep this state.
Since native database logs often lack key contextual information, it can be difficult for downstream consumers of the logs to reconstruct a complete view of the data change. As Debezium clearly states:“It is not possible for a Debezium connector to provide this information.”
Leaving aside the limitations of traditional CDC frameworks, from a stream processing perspective, if the engine is able to retain the previous value during synchronization and emit it along with the update event, then it becomes possible to reconstruct the full before-and-after change record.
-
Inconsistent Data Types
Different systems use different precision or formats. For example:
-
Oracle may use 9-digit timestamps
-
MongoDB might use 3-digit millisecond precision
If you attempt a real-time join across such sources, mismatched types will prevent any match from succeeding.
That’s why standardizing data types is crucial for downstream processing.
-
Inconsistent Schema Models
Different databases have radically different structures:
-
Namespace Depth: Some (like Elasticsearch) are flat; others (like Oracle) use 3-level hierarchies (schema, database, table).
-
Schema Flexibility: SQL systems are rigid; NoSQL systems like MongoDB or Redis are semi-structured or unstructured.
-
Indexing Capabilities: Vary widely—some only support B-tree; others support geospatial, full-text, or graph indexes.
It’s nearly impossible to enforce a universal abstraction—but defining clear compatibility boundaries is achievable.
TapData's Solution
To support this unified model, TapData normalizes output across all data sources:
-
For systems like MySQL with complete row logs, we directly parse and convert.
-
For databases lacking full log info, we combine in-memory and external caching to reconstruct a complete data stream.
-
The setup process is streamlined, and data normalization is handled automatically by the platform.
Data Types
TapData abstracts and adapts data types across platforms—ensuring compatibility during both read and write. It also supports extensibility for custom types.
Schema Changes
DDL changes are translated and synchronized across databases. For example, a column deletion in MySQL becomes an
UNSET operation in MongoDB—maintaining semantic consistency across heterogenous systems.Once standardized, data from dozens of different systems becomes a unified, clean stream—queued up and ready for real-time computation.
One Last Question
So what’s next? In the upcoming article, we’ll explore what kinds of operations a real-time engine should perform—and how to do them efficiently.
Stay tuned at https://tapdata.io.


