Part 4: Can Apache Flink be the solution?
Let's say you decide that Flink is the solution you want to use. I will respect your bold decision and even show you a step-by-step guide to set up and perform JOINs/Deduplication before ingesting into ClickHouse. At the end of that post I will go into details of the expected maintenance effort and provide a conclusion. I warn you, Flink implementation can become scary.

Step 1: Connect Flink to Kafka:
Decision to Make:
Kafka Broker & Topic Configuration: Define Kafka brokers, topic names, and partitions.
Data Format: Choose between JSON, Avro, or Protobuf. In the example, we choose JSON.
Consumer Group & Offset Strategy: Decide whether to start from the latest or earliest message. In the example, we decided on the earliest message.
Parallelism: Determine how many Flink tasks will consume from Kafka partitions. In the example we decided on 4 parallel tasks consuming from Kafka.
Code Example:
1. Add Dependencies (Flink & Kafka Connectors)
In your Flink project, add the necessary dependencies:
xml
CopyEdit
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.16.0</version>
</dependency
2. Create a Kafka Source
java
CopyEdit
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.kafka.source.reader.deserializer.SimpleStringSchema;
public class KafkaFlinkParallelismExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("kafka-broker:9092")
.setTopics("user-events")
.setGroupId("flink-consumer-group")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> kafkaStream = env
.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")
.setParallelism(4);
kafkaStream.print();
env.execute("Kafka to Flink with Parallelism");
}
}About Parallelism
Parallelism in Flink (env.setParallelism(n))
Determines the number of concurrent Flink tasks that will process Kafka data.
Example: env.setParallelism(4) → 4 parallel tasks consuming from Kafka.
If Kafka has 6 partitions and Flink has 4 tasks, some tasks will handle multiple partitions.
Parallelism per Operator (.setParallelism(n))
If not set, the job-wide parallelism is used (env.setParallelism(n)).
Example: kafkaStream.setParallelism(2) → Only this Kafka consumer runs with 2 parallel instances.
Choosing the Right Parallelism
Flink parallelism ≥ Kafka partitions: Best performance, each task reads from a separate partition.
Flink parallelism < Kafka partitions: Some tasks handle multiple partitions (could cause lag).
Flink parallelism > Kafka partitions: Some tasks stay idle, wasting resources.
3. Set Up the Kafka Source in Flink
java
CopyEdit
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "kafka-broker:9092");
kafkaProps.setProperty("group.id", "flink-consumer-group");
kafkaProps.setProperty("auto.offset.reset", "earliest");
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(
"clickhouse-stream",
new SimpleStringSchema(),
kafkaProps
);
DataStream<String> kafkaStream = env.addSource(kafkaSource);4. Deserialize and Convert Data into a Flink Format
Assuming JSON messages:
java
CopyEdit
DataStream<MyEvent> eventStream = kafkaStream
.map(json -> new ObjectMapper().readValue(json, MyEvent.class))
.returns(MyEvent.class);
Step 2: Deduplication in Flink
Decisions to Make:
Define a Unique Key: Choose a field (e.g., event_id) for deduplication.
Time Window Strategy: Define how long to store seen records.
State Management: Use RocksDB or Heap Memory to track seen events.
Code Example:
Implementation with window strategy for deduplication and rocksDB:
java
CopyEdit
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.SimpleStringSchema;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.rocksdb.RocksDBStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
public class FlinkDeduplicationWithRocksDB {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
env.setStateBackend(new RocksDBStateBackend("file:///tmp/flink-checkpoints"));
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-checkpoints");
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("kafka-broker:9092")
.setTopics("user-events")
.setGroupId("flink-deduplication-group")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> kafkaStream = env
.fromSource(kafkaSource, WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(10)), "Kafka Source")
.setParallelism(4);
DataStream<String> deduplicatedStream = kafkaStream
.keyBy(event -> extractUserId(event))
.process(new DeduplicationFunction(Duration.ofHours(24)));
deduplicatedStream.print();
env.execute("Kafka Deduplication with RocksDB");
}
private static String extractUserId(String event) {
return event.split(",")[0].replace("{\"user_id\":\"", "").replace("\"", "");
}
public static class DeduplicationFunction extends KeyedProcessFunction<String, String, String> {
private transient MapState<String, Long> seenEvents;
private final Duration timeWindow;
public DeduplicationFunction(Duration timeWindow) {
this.timeWindow = timeWindow;
}
@Override
public void open(Configuration parameters) {
MapStateDescriptor<String, Long> descriptor =
new MapStateDescriptor<>("seenEvents", String.class, Long.class);
seenEvents = getRuntimeContext().getMapState(descriptor);
}
@Override
public void processElement(String event, Context ctx, Collector<String> out) throws Exception {
long eventTimestamp = System.currentTimeMillis();
Long lastSeen = seenEvents.get(event);
if (lastSeen == null || eventTimestamp - lastSeen > timeWindow.toMillis()) {
out.collect(event);
seenEvents.put(event, eventTimestamp);
}
}
}
}
Where is the Time Window Strategy Set?
1. Event Time Window (Handling Late Events)
java
CopyEdit
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10))
Handles out-of-order events by allowing a 10s delay.
2. Deduplication Window (24 Hours)
java
CopyEdit
new DeduplicationFunction(Duration.ofHours(24))
Keeps track of events for 24 hours in RocksDB.
If the same event arrives within 24 hours, it’s ignored.
Why Use RocksDB?
Scalability: Handles millions of events efficiently.
Persistence: Avoids losing state after failures.
Checkpointing Support: Enables fault tolerance.
Efficient Lookups: Uses RocksDB MapState for fast key lookups.
Key Considerations
Choosing the Deduplication Window:
Short window (e.g., 1 min) → Less state storage, but duplicates might pass.
Long window (e.g., 24 hrs) → More storage, but better deduplication.
State Size & Cleanup:
Too large? Use TTL settings to expire old state.
Disk issues? Ensure enough storage for RocksDB.
Scaling Considerations:
Set parallelism to match Kafka partitions (env.setParallelism(n)).
Ensure sufficient disk space for RocksDB if handling millions of events.
Step 3: Running Joins in Flink
Decisions to Make:
Join Type: Inner, Left, Right, or Outer.
Time Window for Streaming Joins: Define how long to retain data for joins.
State Management: Ensure efficient storage of joined records.
Code Example:
Joining Two Kafka Streams in Flink
java
CopyEdit
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.JoinedStreams;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
class Transaction {
public String userId;
public double amount;
public Transaction(String userId, double amount) {
this.userId = userId;
this.amount = amount;
}
}
class User {
public String userId;
public String name;
public int age;
public User(String userId, String name, int age) {
this.userId = userId;
this.name = name;
this.age = age;
}
}
class EnrichedTransaction {
public String userId;
public double amount;
public String name;
public int age;
public EnrichedTransaction(String userId, double amount, String name, int age) {
this.userId = userId;
this.amount = amount;
this.name = name;
this.age = age;
}
@Override
public String toString() {
return "EnrichedTransaction{" +
"userId='" + userId + '\'' +
", amount=" + amount +
", name='" + name + '\'' +
", age=" + age +
'}';
}
}
public class FlinkKafkaJoinExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
KafkaSource<String> transactionSource = KafkaSource.<String>builder()
.setBootstrapServers("kafka-broker:9092")
.setTopics("transactions-topic")
.setGroupId("flink-join-group")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
KafkaSource<String> userSource = KafkaSource.<String>builder()
.setBootstrapServers("kafka-broker:9092")
.setTopics("users-topic")
.setGroupId("flink-join-group")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> rawTransactions = env.fromSource(transactionSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)), "Transaction Source");
DataStream<String> rawUsers = env.fromSource(userSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)), "User Source");
DataStream<Transaction> transactions = rawTransactions.map(event -> {
String[] parts = event.replace("{", "").replace("}", "").replace("\"", "").split(",");
return new Transaction(parts[0].split(":")[1], Double.parseDouble(parts[1].split(":")[1]));
});
DataStream<User> users = rawUsers.map(event -> {
String[] parts = event.replace("{", "").replace("}", "").replace("\"", "").split(",");
return new User(parts[0].split(":")[1], parts[1].split(":")[1], Integer.parseInt(parts[2].split(":")[1]));
});
KeyedStream<Transaction, String> keyedTransactions = transactions.keyBy(t -> t.userId);
KeyedStream<User, String> keyedUsers = users.keyBy(u -> u.userId);
DataStream<EnrichedTransaction> enrichedTransactions = keyedTransactions
.intervalJoin(keyedUsers)
.between(Duration.ofMinutes(-5), Duration.ofMinutes(5))
.process(new ProcessJoinFunction<Transaction, User, EnrichedTransaction>() {
@Override
public void processElement(Transaction transaction, User user, Context ctx, Collector<EnrichedTransaction> out) {
out.collect(new EnrichedTransaction(transaction.userId, transaction.amount, user.name, user.age));
}
});
enrichedTransactions.print();
env.execute("Kafka Stream Join Example");
}
}Example Input & Output
Kafka Transactions Topic (transactions-topic)
Kafka Users Topic (users-topic)
json
CopyEdit
{"user_id": "123", "name": "Alice", "age": 28}
{"user_id": "456", "name": "Bob", "age": 35}Flink Output (Enriched Transactions)
Key Considerations
Event Time vs. Processing Time
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)) ensures late events are handled correctly.
Choosing the Join Window
between(Duration.ofMinutes(-5), Duration.ofMinutes(5)): Matches events that occurred within ±5 minutes.
Smaller windows reduce state size but may miss late events.
Scaling Considerations
Parallelism should match Kafka partitions for efficiency.
If data is large, use RocksDB as a state backend.
Summary
This example shows how to join two Kafka streams in Flink:
Transactions stream (purchases) + Users stream (user details).
Interval Join (user_id) with a 5-minute time window.
Outputs enriched transactions containing amount, name, and age.
Step 4: Batching and Writing to ClickHouse with a Custom Connector
Decisions to Make:
Batch Size: Optimize batch size to balance latency and throughput.
Insert Strategy: Use ClickHouse’s INSERT INTO ... VALUES or INSERT INTO ... SELECT.
Error Handling: Implement retry mechanisms.
Implementation example:
1. Create a Flink Sink to ClickHouse
java
CopyEdit
public class ClickHouseSink extends RichSinkFunction<List<MyEvent>> {
private transient Connection connection;
@Override
public void open(Configuration parameters) throws Exception {
connection = DriverManager.getConnection("jdbc:clickhouse://clickhouse-server:8123/default");
}
@Override
public void invoke(List<MyEvent> batch, Context context) throws Exception {
String sql = "INSERT INTO events (event_id, data, timestamp) VALUES (?, ?, ?)";
try (PreparedStatement stmt = connection.prepareStatement(sql)) {
for (MyEvent event : batch) {
stmt.setString(1, event.getEventId());
stmt.setString(2, event.getData());
stmt.setTimestamp(3, new Timestamp(event.getTimestamp()));
stmt.addBatch();
}
stmt.executeBatch();
}
}
@Override
public void close() throws Exception {
if (connection != null) connection.close();
}
}2. Use Flink’s windowAll for Efficient Batching
java
CopyEdit
deduplicatedStream
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.apply(new WindowFunction<MyEvent, List<MyEvent>, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<MyEvent> events, Collector<List<MyEvent>> out) {
List<MyEvent> batch = new ArrayList<>();
for (MyEvent event : events) batch.add(event);
out.collect(batch);
}
})
.addSink(new ClickHouseSink());
Overview of maintenance efforts for Flink
After setting up Flink for stream processing to ClickHouse, users must handle ongoing maintenance efforts to ensure smooth operation and high performance. Users often need to monitor and perform extensive maintenance after setting up the system to ensure it runs smoothly. Here are the typical areas of maintenance:
Kafka Consumer Management:
The consumer lag (the time between Kafka writes and Flink consumes) can become an issue. Data can be lost or duplicated, especially when Flink processes too slowly. Usually, you need to monitor the lag closely so you can react early enough.
Flink Job Stability & Failures:
It can always happen that the Flink jobs fail, and data gets lost or duplicated. Usually, you see that when the checkpointing settings are set incorrectly. Checkpointing is a concept that works based on snapshots taken so that the system can recover from that point on. As a user, you would, for example, need to decide where the checkpoints are stored (filesystem, database, etc.).
Debugging Challenges for Python Users in Flink:
One major challenge for Python users working with Flink is that debugging often requires working in Java. While Flink provides a PyFlink API, much of its underlying execution framework, including error messages, logs, and internal failures, are Java-based. This means that when a job fails or a checkpoint issue arises, Python users often need to dive into Java stack and Flinks mechanics to troubleshoot the problem. This creates a steep learning curve, requiring users to understand FJava runtime, memory management, and error handling, even if they primarily work in Python. As a result, debugging and maintaining Flink jobs can be complex and time-consuming for teams that rely on Python for data processing.
Deduplication Handling:
It is highly recommended that old duplicate entries be cleaned up. This way, you can ensure that memory is not exhausted. Another issue that can become a maintenance task is the event timestamps. If they are set up incorrectly, the duplicates can grow quickly.
JOIN Performance Tuning:
It is always recommended that JOIN performance is monitored. Especially when using stateful JOINs, the memory can be exhausted quite quickly. Usually, what users do is consider broadcast joins on small data sets. So basically, the join is broadcasted to all task nodes and looks for the most efficient small table to keep high performance.
ClickHouse Write Performance:
When monitoring the write performance, certain things can happen that will impact the latency.
If batches are too small → high insert overhead.
If partitions are incorrect → slow queries.
If ClickHouse is under high load → backpressure in Flink.
It is always recommended that disk usage be monitored and MergeTree tables are compacted.
Resource Monitoring & Scaling:
Always ensuring enough memory is available for Flink jobs will help you avoid building up a big backlog and exhausting the system. The same goes for the Kafka producer side, which shouldn’t be overloaded.
Summary of Flink usage for Stream Processing of Kafka Events to ClickHouse
You can get very far with that setup, but as you can see, there are several challenges that you would need to master.
The initial setup takes a lot of effort. If you are not experienced with Flink, you will have a long ramp-up time.
Certain decisions, such as Batch Sizes, Windowing Strategy, Parallelism, etc., must be made and can greatly impact the pipeline's performance. Setting the wrong configurations can become a risky investment.
Fine-tuning is a task that will always come up depending on how dynamic your data volume is. The effort can become very high.
Maintaining the setup can become a big effort as you must monitor each piece of the system (memory usage, latency, duplications, etc.) to ensure it runs smoothly. I have seen several teams spending hours daily debugging the system.
OK, now you understand that Flink is coming with its own challenges. Now the question is, how does GlassFlow solve duplications and JOINs in a better way? Check out our next article to learn the details of our approach.
Try GlassFlow Open Source for ClickHouse on GitHub!
Next part:
Part 5: How GlassFlow will solve Duplications and JOINs for ClickHouse