This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.

JSON Format

Format: Serialization Schema Format: Deserialization Schema

The JSON format allows to read and write JSON data based on an JSON schema. Currently, the JSON schema is derived from table schema.

Dependencies

In order to setup the JSON format, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Maven dependency SQL Client JAR
flink-json Built-in

How to create a table with JSON format

Here is an example to create a table using Kafka connector and JSON format.

CREATE TABLE user_behavior (
  user_id BIGINT,
  item_id BIGINT,
  category_id BIGINT,
  behavior STRING,
  ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'json',
 'json.fail-on-missing-field' = 'false',
 'json.ignore-parse-errors' = 'true'
)

Format Options

Option Required Default Type Description
format
required (none) String Specify what format to use, here should be 'json'.
json.fail-on-missing-field
optional false Boolean Whether to fail if a field is missing or not.
json.ignore-parse-errors
optional false Boolean Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors.
json.timestamp-format.standard
optional 'SQL' String Specify the input and output timestamp format. Currently supported values are 'SQL' and 'ISO-8601':
  • Option 'SQL' will parse input timestamp in "yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g '2020-12-30 12:13:14.123' and output timestamp in the same format.
  • Option 'ISO-8601'will parse input timestamp in "yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g '2020-12-30T12:13:14.123' and output timestamp in the same format.

Data Type Mapping

Currently, the JSON schema is always derived from table schema. Explicitly defining an JSON schema is not supported yet.

Flink JSON format uses jackson databind API to parse and generate JSON string.

The following table lists the type mapping from Flink type to JSON type.

Flink SQL type JSON type
CHAR / VARCHAR / STRING string
BOOLEAN boolean
BINARY / VARBINARY string with encoding: base64
DECIMAL number
TINYINT number
SMALLINT number
INT number
BIGINT number
FLOAT number
DOUBLE number
DATE string with format: date
TIME string with format: time
TIMESTAMP string with format: date-time
INTERVAL number
ARRAY array
MAP / MULTISET object
ROW object