Part 4: Can Apache Flink be the solution?

Part 4: Can Apache Flink be the solution?

Part 4: Can Apache Flink be the solution?

Apache Flink isn't the solution for duplications and JOINs on ClickHouse.

Written by

Armend Avdijaj

Mar 28, 2025

Part 4: Can Apache Flink be the solution?

Let's say you decide that Flink is the solution you want to use. I will respect your bold decision and even show you a step-by-step guide to set up and perform JOINs/Deduplication before ingesting into ClickHouse. At the end of that post I will go into details of the expected maintenance effort and provide a conclusion. I warn you, Flink implementation can become scary.


giphy.gif

Step 1: Connect Flink to Kafka:

Decision to Make:

  • Kafka Broker & Topic Configuration: Define Kafka brokers, topic names, and partitions.

  • Data Format: Choose between JSON, Avro, or Protobuf. In the example, we choose JSON.

  • Consumer Group & Offset Strategy: Decide whether to start from the latest or earliest message. In the example, we decided on the earliest message.

  • Parallelism: Determine how many Flink tasks will consume from Kafka partitions. In the example we decided on 4 parallel tasks consuming from Kafka.

Code Example:

  • 1. Add Dependencies (Flink & Kafka Connectors)

    In your Flink project, add the necessary dependencies:

    xml
    CopyEdit
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka</artifactId>
        <version>1.16.0</version>
    </dependency
    
    
  • 2. Create a Kafka Source

    java
    CopyEdit
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.connector.kafka.source.KafkaSource;
    import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.connector.kafka.source.reader.deserializer.SimpleStringSchema;
    
    public class KafkaFlinkParallelismExample {
        public static void main(String[] args) throws Exception {
            // 1️⃣ Create Flink environment
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 2️⃣ Set parallelism (Decides how many tasks will read from Kafka)
            env.setParallelism(4);  // This means 4 parallel tasks will run
    
            // 3️⃣ Define Kafka source
            KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("kafka-broker:9092")
                .setTopics("user-events")
                .setGroupId("flink-consumer-group")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
    
            // 4️⃣ Create DataStream from Kafka source
            DataStream<String> kafkaStream = env
                .fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")
                .setParallelism(4);  // Parallelism is explicitly set for this task
    
            // 5️⃣ Print received data (for debugging)
            kafkaStream.print();
    
            // 6️⃣ Execute the Flink job
            env.execute("Kafka to Flink with Parallelism");
        }
    }
  • About Parallelism

    Parallelism in Flink (env.setParallelism(n))

    • Determines the number of concurrent Flink tasks that will process Kafka data.

    • Example: env.setParallelism(4)4 parallel tasks consuming from Kafka.

    • If Kafka has 6 partitions and Flink has 4 tasks, some tasks will handle multiple partitions.

    Parallelism per Operator (.setParallelism(n))

    • If not set, the job-wide parallelism is used (env.setParallelism(n)).

    • Example: kafkaStream.setParallelism(2) → Only this Kafka consumer runs with 2 parallel instances.

    Choosing the Right Parallelism

    • Flink parallelism ≥ Kafka partitions: Best performance, each task reads from a separate partition.

    • Flink parallelism < Kafka partitions: Some tasks handle multiple partitions (could cause lag).

    • Flink parallelism > Kafka partitions: Some tasks stay idle, wasting resources.

  • 3. Set Up the Kafka Source in Flink

    java
    CopyEdit
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    Properties kafkaProps = new Properties();
    kafkaProps.setProperty("bootstrap.servers", "kafka-broker:9092");
    kafkaProps.setProperty("group.id", "flink-consumer-group");
    kafkaProps.setProperty("auto.offset.reset", "earliest");
    
    FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(
        "clickhouse-stream",
        new SimpleStringSchema(),
        kafkaProps
    );
    
    DataStream<String> kafkaStream = env.addSource(kafkaSource);
  • 4. Deserialize and Convert Data into a Flink Format

    Assuming JSON messages:

    java
    CopyEdit
    DataStream<MyEvent> eventStream = kafkaStream
        .map(json -> new ObjectMapper().readValue(json, MyEvent.class))
        .returns(MyEvent.class);

Step 2: Deduplication in Flink

Decisions to Make:

  • Define a Unique Key: Choose a field (e.g., event_id) for deduplication.

  • Time Window Strategy: Define how long to store seen records.

  • State Management: Use RocksDB or Heap Memory to track seen events.

Code Example:

  • Implementation with window strategy for deduplication and rocksDB:

    java
    CopyEdit
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.state.MapState;
    import org.apache.flink.api.common.state.MapStateDescriptor;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.connector.kafka.source.KafkaSource;
    import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
    import org.apache.flink.connector.kafka.source.reader.deserializer.SimpleStringSchema;
    import org.apache.flink.runtime.state.StateBackend;
    import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    import org.apache.flink.runtime.state.rocksdb.RocksDBStateBackend;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.CheckpointConfig;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    import org.apache.flink.util.Collector;
    
    import java.time.Duration;
    
    public class FlinkDeduplicationWithRocksDB {
        public static void main(String[] args) throws Exception {
            // 1. Create Flink environment
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(4); // Set parallelism to match Kafka partitions
    
            // 2. Enable RocksDB as the state backend
            env.setStateBackend(new RocksDBStateBackend("file:///tmp/flink-checkpoints"));
            env.enableCheckpointing(60000); // Checkpoint every 60 seconds
            env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-checkpoints");
            env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    
            // 3. Kafka Source
            KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("kafka-broker:9092")
                .setTopics("user-events")
                .setGroupId("flink-deduplication-group")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
    
            DataStream<String> kafkaStream = env
                .fromSource(kafkaSource, WatermarkStrategy
                    .forBoundedOutOfOrderness(Duration.ofSeconds(10)), "Kafka Source") // 4. Event Time Strategy
                .setParallelism(4);
    
            // 5. Deduplication using RocksDB State
            DataStream<String> deduplicatedStream = kafkaStream
                .keyBy(event -> extractUserId(event)) // Keyed by user_id
                .process(new DeduplicationFunction(Duration.ofHours(24))); // 6. Time Window Strategy (24 hours)
    
            // 7. Print and Execute
            deduplicatedStream.print();
            env.execute("Kafka Deduplication with RocksDB");
        }
    
        // 👇 Extract user ID from JSON
        private static String extractUserId(String event) {
            return event.split(",")[0].replace("{\"user_id\":\"", "").replace("\"", "");
        }
    
        // 8. Deduplication Function using RocksDB
        public static class DeduplicationFunction extends KeyedProcessFunction<String, String, String> {
            private transient MapState<String, Long> seenEvents;
            private final Duration timeWindow;
    
            public DeduplicationFunction(Duration timeWindow) {
                this.timeWindow = timeWindow;
            }
    
            @Override
            public void open(Configuration parameters) {
                MapStateDescriptor<String, Long> descriptor =
                    new MapStateDescriptor<>("seenEvents", String.class, Long.class);
                seenEvents = getRuntimeContext().getMapState(descriptor);
            }
    
            @Override
            public void processElement(String event, Context ctx, Collector<String> out) throws Exception {
                long eventTimestamp = System.currentTimeMillis(); // Extract from event in real case
                Long lastSeen = seenEvents.get(event);
    
                if (lastSeen == null || eventTimestamp - lastSeen > timeWindow.toMillis()) {
                    out.collect(event);
                    seenEvents.put(event, eventTimestamp); // Store event timestamp in RocksDB
                }
            }
        }
    }

Where is the Time Window Strategy Set?

  • 1. Event Time Window (Handling Late Events)

    java
    CopyEdit
    WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10))

    Handles out-of-order events by allowing a 10s delay.

  • 2. Deduplication Window (24 Hours)

    java
    CopyEdit
    new DeduplicationFunction(Duration.ofHours(24))
    • Keeps track of events for 24 hours in RocksDB.

    • If the same event arrives within 24 hours, it’s ignored.

  • Why Use RocksDB?

    Scalability: Handles millions of events efficiently.

    Persistence: Avoids losing state after failures.

    Checkpointing Support: Enables fault tolerance.

    Efficient Lookups: Uses RocksDB MapState for fast key lookups.

  • Key Considerations

    Choosing the Deduplication Window:

    • Short window (e.g., 1 min) → Less state storage, but duplicates might pass.

    • Long window (e.g., 24 hrs) → More storage, but better deduplication.

  • State Size & Cleanup:

    • Too large? Use TTL settings to expire old state.

      • Disk issues? Ensure enough storage for RocksDB.

  • Scaling Considerations:

    • Set parallelism to match Kafka partitions (env.setParallelism(n)).

    • Ensure sufficient disk space for RocksDB if handling millions of events.

Step 3: Running Joins in Flink

Decisions to Make:

  • Join Type: Inner, Left, Right, or Outer.

  • Time Window for Streaming Joins: Define how long to retain data for joins.

  • State Management: Ensure efficient storage of joined records.

Code Example:

  • Joining Two Kafka Streams in Flink

    java
    CopyEdit
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.connector.kafka.source.KafkaSource;
    import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.JoinedStreams;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
    import org.apache.flink.util.Collector;
    import java.time.Duration;
    
    // ✅ Data Model
    class Transaction {
        public String userId;
        public double amount;
    
        public Transaction(String userId, double amount) {
            this.userId = userId;
            this.amount = amount;
        }
    }
    
    class User {
        public String userId;
        public String name;
        public int age;
    
        public User(String userId, String name, int age) {
            this.userId = userId;
            this.name = name;
            this.age = age;
        }
    }
    
    class EnrichedTransaction {
        public String userId;
        public double amount;
        public String name;
        public int age;
    
        public EnrichedTransaction(String userId, double amount, String name, int age) {
            this.userId = userId;
            this.amount = amount;
            this.name = name;
            this.age = age;
        }
    
        @Override
        public String toString() {
            return "EnrichedTransaction{" +
                    "userId='" + userId + '\'' +
                    ", amount=" + amount +
                    ", name='" + name + '\'' +
                    ", age=" + age +
                    '}';
        }
    }
    
    public class FlinkKafkaJoinExample {
        public static void main(String[] args) throws Exception {
            // Step 1: Setup Flink Environment
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(4);
    
            // Step 2: Define Kafka Sources for Transactions and Users
            KafkaSource<String> transactionSource = KafkaSource.<String>builder()
                .setBootstrapServers("kafka-broker:9092")
                .setTopics("transactions-topic")
                .setGroupId("flink-join-group")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
    
            KafkaSource<String> userSource = KafkaSource.<String>builder()
                .setBootstrapServers("kafka-broker:9092")
                .setTopics("users-topic")
                .setGroupId("flink-join-group")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
    
            // Step 3: Create Kafka Data Streams
            DataStream<String> rawTransactions = env.fromSource(transactionSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)), "Transaction Source");
            DataStream<String> rawUsers = env.fromSource(userSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)), "User Source");
    
            // Step 4: Parse JSON and Extract Transaction Data
            DataStream<Transaction> transactions = rawTransactions.map(event -> {
                // Assume JSON structure: {"user_id": "123", "amount": 100.5}
                String[] parts = event.replace("{", "").replace("}", "").replace("\"", "").split(",");
                return new Transaction(parts[0].split(":")[1], Double.parseDouble(parts[1].split(":")[1]));
            });
    
            // Step 5: Parse JSON and Extract User Data
            DataStream<User> users = rawUsers.map(event -> {
                // Assume JSON structure: {"user_id": "123", "name": "John Doe", "age": 30}
                String[] parts = event.replace("{", "").replace("}", "").replace("\"", "").split(",");
                return new User(parts[0].split(":")[1], parts[1].split(":")[1], Integer.parseInt(parts[2].split(":")[1]));
            });
    
            // Step 6: Key Both Streams by user_id (Required for Joining)
            KeyedStream<Transaction, String> keyedTransactions = transactions.keyBy(t -> t.userId);
            KeyedStream<User, String> keyedUsers = users.keyBy(u -> u.userId);
    
            // Step 7: Join Streams Using an Interval Join (5-minute Window)
            DataStream<EnrichedTransaction> enrichedTransactions = keyedTransactions
                .intervalJoin(keyedUsers)
                .between(Duration.ofMinutes(-5), Duration.ofMinutes(5)) // Time range to match events
                .process(new ProcessJoinFunction<Transaction, User, EnrichedTransaction>() {
                    @Override
                    public void processElement(Transaction transaction, User user, Context ctx, Collector<EnrichedTransaction> out) {
                        out.collect(new EnrichedTransaction(transaction.userId, transaction.amount, user.name, user.age));
                    }
                });
    
            // Step 8: Print Enriched Transactions
            enrichedTransactions.print();
    
            // Step 9: Execute Flink Job
            env.execute("Kafka Stream Join Example");
        }
    }
  • Example Input & Output

    Kafka Transactions Topic (transactions-topic)

    json
    CopyEdit
    {"user_id": "123", "amount": 150.75}
    {"user_id": "456", "amount": 99.99}

    Kafka Users Topic (users-topic)

    json
    CopyEdit
    {"user_id": "123", "name": "Alice", "age": 28}
    {"user_id": "456", "name": "Bob", "age": 35}

    Flink Output (Enriched Transactions)

    
    

Key Considerations

  • Event Time vs. Processing Time

    • WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)) ensures late events are handled correctly.

  • Choosing the Join Window

    • between(Duration.ofMinutes(-5), Duration.ofMinutes(5)): Matches events that occurred within ±5 minutes.

    • Smaller windows reduce state size but may miss late events.

  • Scaling Considerations

    • Parallelism should match Kafka partitions for efficiency.

    • If data is large, use RocksDB as a state backend.

Summary

This example shows how to join two Kafka streams in Flink:

  • Transactions stream (purchases) + Users stream (user details).

  • Interval Join (user_id) with a 5-minute time window.

  • Outputs enriched transactions containing amount, name, and age.

Step 4: Batching and Writing to ClickHouse with a Custom Connector

Decisions to Make:

  • Batch Size: Optimize batch size to balance latency and throughput.

  • Insert Strategy: Use ClickHouse’s INSERT INTO ... VALUES or INSERT INTO ... SELECT.

  • Error Handling: Implement retry mechanisms.

Implementation example:

  • 1. Create a Flink Sink to ClickHouse

    java
    CopyEdit
    public class ClickHouseSink extends RichSinkFunction<List<MyEvent>> {
        private transient Connection connection;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            connection = DriverManager.getConnection("jdbc:clickhouse://clickhouse-server:8123/default");
        }
    
        @Override
        public void invoke(List<MyEvent> batch, Context context) throws Exception {
            String sql = "INSERT INTO events (event_id, data, timestamp) VALUES (?, ?, ?)";
            try (PreparedStatement stmt = connection.prepareStatement(sql)) {
                for (MyEvent event : batch) {
                    stmt.setString(1, event.getEventId());
                    stmt.setString(2, event.getData());
                    stmt.setTimestamp(3, new Timestamp(event.getTimestamp()));
                    stmt.addBatch();
                }
                stmt.executeBatch();
            }
        }
    
        @Override
        public void close() throws Exception {
            if (connection != null) connection.close();
        }
    }
  • 2. Use Flink’s windowAll for Efficient Batching

    java
    CopyEdit
    deduplicatedStream
        .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
        .apply(new WindowFunction<MyEvent, List<MyEvent>, TimeWindow>() {
            @Override
            public void apply(TimeWindow window, Iterable<MyEvent> events, Collector<List<MyEvent>> out) {
                List<MyEvent> batch = new ArrayList<>();
                for (MyEvent event : events) batch.add(event);
                out.collect(batch);
            }
        })
        .addSink(new ClickHouseSink());

Overview of maintenance efforts for Flink

After setting up Flink for stream processing to ClickHouse, users must handle ongoing maintenance efforts to ensure smooth operation and high performance. Users often need to monitor and perform extensive maintenance after setting up the system to ensure it runs smoothly. Here are the typical areas of maintenance:

Kafka Consumer Management:

The consumer lag (the time between Kafka writes and Flink consumes) can become an issue. Data can be lost or duplicated, especially when Flink processes too slowly. Usually, you need to monitor the lag closely so you can react early enough.

Flink Job Stability & Failures:

It can always happen that the Flink jobs fail, and data gets lost or duplicated. Usually, you see that when the checkpointing settings are set incorrectly. Checkpointing is a concept that works based on snapshots taken so that the system can recover from that point on. As a user, you would, for example, need to decide where the checkpoints are stored (filesystem, database, etc.).

Debugging Challenges for Python Users in Flink:

One major challenge for Python users working with Flink is that debugging often requires working in Java. While Flink provides a PyFlink API, much of its underlying execution framework, including error messages, logs, and internal failures, are Java-based. This means that when a job fails or a checkpoint issue arises, Python users often need to dive into Java stack and Flinks mechanics to troubleshoot the problem. This creates a steep learning curve, requiring users to understand FJava runtime, memory management, and error handling, even if they primarily work in Python. As a result, debugging and maintaining Flink jobs can be complex and time-consuming for teams that rely on Python for data processing.

Deduplication Handling:

It is highly recommended that old duplicate entries be cleaned up. This way, you can ensure that memory is not exhausted. Another issue that can become a maintenance task is the event timestamps. If they are set up incorrectly, the duplicates can grow quickly.

JOIN Performance Tuning:

It is always recommended that JOIN performance is monitored. Especially when using stateful JOINs, the memory can be exhausted quite quickly. Usually, what users do is consider broadcast joins on small data sets. So basically, the join is broadcasted to all task nodes and looks for the most efficient small table to keep high performance.

ClickHouse Write Performance:

When monitoring the write performance, certain things can happen that will impact the latency.

  • If batches are too small → high insert overhead.

  • If partitions are incorrect → slow queries.

  • If ClickHouse is under high load → backpressure in Flink.

It is always recommended that disk usage be monitored and MergeTree tables are compacted.

Resource Monitoring & Scaling:

Always ensuring enough memory is available for Flink jobs will help you avoid building up a big backlog and exhausting the system. The same goes for the Kafka producer side, which shouldn’t be overloaded.

Summary of Flink usage for Stream Processing of Kafka Events to ClickHouse

You can get very far with that setup, but as you can see, there are several challenges that you would need to master.

  1. The initial setup takes a lot of effort. If you are not experienced with Flink, you will have a long ramp-up time.

  2. Certain decisions, such as Batch Sizes, Windowing Strategy, Parallelism, etc., must be made and can greatly impact the pipeline's performance. Setting the wrong configurations can become a risky investment.

  3. Fine-tuning is a task that will always come up depending on how dynamic your data volume is. The effort can become very high.

  4. Maintaining the setup can become a big effort as you must monitor each piece of the system (memory usage, latency, duplications, etc.) to ensure it runs smoothly. I have seen several teams spending hours daily debugging the system.

OK, now you understand that Flink is coming with its own challenges. Now the question is, how does GlassFlow solve duplications and JOINs in a better way? Check out our next article to learn the details of our approach.

Try GlassFlow Open Source for ClickHouse on GitHub!

Next part:
Part 5: How GlassFlow will solve Duplications and JOINs for ClickHouse

Did you like this article? Share it!

You might also like

Cleaned Kafka Streams for ClickHouse

Clean Data. No maintenance. Less load for ClickHouse.

Cleaned Kafka Streams for ClickHouse

Clean Data. No maintenance. Less load for ClickHouse.

Cleaned Kafka Streams for ClickHouse

Clean Data. No maintenance. Less load for ClickHouse.