Flink Connector
This is the official Apache Flink Sink Connector supported by ClickHouse. It is built using Flink's AsyncSinkBase and the official ClickHouse java client.
The connector supports Apache Flink's DataStream API. Table API support is planned for a future release.
Requirements
- Java 11+ (for Flink 1.17+) or 17+ (for Flink 2.0+)
- Apache Flink 1.17+
Flink Version Compatibility Matrix
The connector is split into two artifacts to support both Flink 1.17+ and Flink 2.0+. Choose the artifact that matches your desired Flink version:
| Flink Version | Artifact | ClickHouse Java Client Version | Required Java |
|---|---|---|---|
| latest | flink-connector-clickhouse-2.0.0 | 0.9.5 | Java 17+ |
| 2.0.1 | flink-connector-clickhouse-2.0.0 | 0.9.5 | Java 17+ |
| 2.0.0 | flink-connector-clickhouse-2.0.0 | 0.9.5 | Java 17+ |
| 1.20.2 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
| 1.19.3 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
| 1.18.1 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
| 1.17.2 | flink-connector-clickhouse-1.17 | 0.9.5 | Java 11+ |
Note: the connector has not been tested against Flink versions earlier than 1.17.
Installation & Setup
Import as a Dependency
For Flink 2.0+
- Maven
- Gradle
- SBT
For Flink 1.17+
- Maven
- Gradle
- SBT
Download the binary
The name pattern of the binary JAR is:
where:
flink_versionis one of2.0.0or1.17stable_versionis a stable artifact release version
You can find all available released JAR files in the Maven Central Repository.
Using the DataStream API
Snippet
Configure ClickHouseClient
Let's say you want to insert RAW CSV data as is:
- Create an ElementConverter
- Create the sink and set the format using
setClickHouseFormat
- Finally, connect your DataStream to the sink.
More examples and snippets can be found in our tests:
Quick Start Example
We have created maven-based example for an easy start with the ClickHouse Sink:
For more detailed instructions, see the Example Guide
DataStream API Connection Options
Clickhouse Client Options
| Parameters | Description | Default Value |
|---|---|---|
url | Fully qualified Clickhouse URL | N/A |
username | ClickHouse database username | N/A |
password | ClickHouse database password | N/A |
database | ClickHouse database name | N/A |
table | ClickHouse table name | N/A |
Sink Options
The following options come directly from Flink's AsyncSinkBase:
| Parameters | Description | Default Value |
|---|---|---|
maxBatchSize | Maximum number of records inserted in a single batch | N/A |
maxInFlightRequests | The maximum number of in flight requests allowed before the sink applies backpressure | N/A |
maxBufferedRequests | The maximum number of records that may be buffered in the sink before backpressure is applied | N/A |
maxBatchSizeInBytes | The maximum size (in bytes) a batch may become. All batches sent will be smaller than or equal to this size | N/A |
maxTimeInBufferMS | The maximum time a record may stay in the sink before being flushed | N/A |
maxRecordSizeInBytes | The maximum record size that the sink will accept, records larger than this will be automatically rejected | N/A |
Using the Table API
Table API support is planned for a future release. This section will be updated once available.
Snippet
Planned for a future release — this section will provide a usage snippet for configuring the Table API.
Quick Start Example
Planned for a future release — a complete end-to-end example will be added once Table API support becomes available.
Table API Connection Options
Planned for a future release — this section will be updated once available.
Supported data types
The table below provides a quick reference for converting data types when inserting from Flink into ClickHouse.
Inserting data from Flink into ClickHouse
| Java Type | ClickHouse Type | Supported | Serialize Method |
|---|---|---|---|
byte/Byte | Int8 | ✅ | DataWriter.writeInt8 |
short/Short | Int16 | ✅ | DataWriter.writeInt16 |
int/Integer | Int32 | ✅ | DataWriter.writeInt32 |
long/Long | Int64 | ✅ | DataWriter.writeInt64 |
BigInteger | Int128 | ✅ | DataWriter.writeInt124 |
BigInteger | Int256 | ✅ | DataWriter.writeInt256 |
short/Short | UInt8 | ✅ | DataWriter.writeUInt8 |
int/Integer | UInt8 | ✅ | DataWriter.writeUInt8 |
int/Integer | UInt16 | ✅ | DataWriter.writeUInt16 |
long/Long | UInt32 | ✅ | DataWriter.writeUInt32 |
long/Long | UInt64 | ✅ | DataWriter.writeUInt64 |
BigInteger | UInt64 | ✅ | DataWriter.writeUInt64 |
BigInteger | UInt128 | ✅ | DataWriter.writeUInt128 |
BigInteger | UInt256 | ✅ | DataWriter.writeUInt256 |
BigDecimal | Decimal | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal32 | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal64 | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal128 | ✅ | DataWriter.writeDecimal |
BigDecimal | Decimal256 | ✅ | DataWriter.writeDecimal |
float/Float | Float | ✅ | DataWriter.writeFloat32 |
double/Double | Double | ✅ | DataWriter.writeFloat64 |
boolean/Boolean | Boolean | ✅ | DataWriter.writeBoolean |
String | String | ✅ | DataWriter.writeString |
String | FixedString | ✅ | DataWriter.writeFixedString |
LocalDate | Date | ✅ | DataWriter.writeDate |
LocalDate | Date32 | ✅ | DataWriter.writeDate32 |
LocalDateTime | DateTime | ✅ | DataWriter.writeDateTime |
ZonedDateTime | DateTime | ✅ | DataWriter.writeDateTime |
LocalDateTime | DateTime64 | ✅ | DataWriter.writeDateTime64 |
ZonedDateTime | DateTime64 | ✅ | DataWriter.writeDateTime64 |
int/Integer | Time | ❌ | N/A |
long/Long | Time64 | ❌ | N/A |
byte/Byte | Enum8 | ✅ | DataWriter.writeInt8 |
int/Integer | Enum16 | ✅ | DataWriter.writeInt16 |
java.util.UUID | UUID | ✅ | DataWriter.writeIntUUID |
String | JSON | ✅ | DataWriter.writeJSON |
Array<Type> | Array<Type> | ✅ | DataWriter.writeArray |
Map<K,V> | Map<K,V> | ✅ | DataWriter.writeMap |
Tuple<Type,..> | Tuple<T1,T2,..> | ✅ | DataWriter.writeTuple |
Object | Variant | ❌ | N/A |
Notes:
- A
ZoneIdmust be provided when performing date operations. - Precision and scale must be provided when performing decimal operations.
- In order for ClickHouse to parse a Java String as JSON, you need to enable
enableJsonSupportAsStringinClickHouseClientConfig.
Metrics
The connector exposes the following additional metrics on top of Flink's existing metrics:
| Metric | Description | Type | Status |
|---|---|---|---|
| numBytesSend | Total number of bytes sent to ClickHouse | Counter | ✅ |
| numRecordSend | Total number of records sent to ClickHouse | Counter | ✅ |
| numRequestSubmitted | Total number of requests sent (actual number of flushes performed) | Counter | ✅ |
| numOfDroppedBatches | Total number of batches dropped due to non-retryable failures | Counter | ✅ |
| numOfDroppedRecords | Total number of records dropped due to non-retryable failures | Counter | ✅ |
| totalBatchRetries | Total number of batch retries due to retryable failures | Counter | ✅ |
| writeLatencyHistogram | Histogram of successful write latency distribution (ms) | Histogram | ✅ |
| writeFailureLatencyHistogram | Histogram of failed write latency distribution (ms) | Histogram | ✅ |
| triggeredByMaxBatchSizeCounter | Total number of flushes triggered by reaching maxBatchSize | Counter | ✅ |
| triggeredByMaxBatchSizeInBytesCounter | Total number of flushes triggered by reaching maxBatchSizeInBytes | Counter | ✅ |
| triggeredByMaxTimeInBufferMSCounter | Total number of flushes triggered by reaching maxTimeInBufferMS | Counter | ✅ |
| actualRecordsPerBatch | Histogram of actual batch size distribution | Histogram | ✅ |
| actualBytesPerBatch | Histogram of actual bytes per batch distribution | Histogram | ✅ |
Limitations
- Currently, the sink does not support exactly-once semantics.
ClickHouse Version Compatibility and Security
- All artifacts and versions of the connector are tested with all active LTS versions of ClickHouse.
- See the ClickHouse security policy for known security vulnerabilities and how to report a vulnerability.
- We recommend upgrading the connector continuously to not miss security fixes and new improvements.
- If you have an issue with migration, please create a GitHub issue and we will respond!
Contributing and Support
If you'd like to contribute to the project or report any issues, we welcome your input! Visit our GitHub repository to open an issue, suggest improvements, or submit a pull request.
Contributions are welcome! Please check the contribution guide in the repository before starting. Thank you for helping improve the ClickHouse Flink connector!