Flink’s Table & SQL API makes it possible to work with queries written in the SQL language, but these queries need to be embedded within a table program that is written in either Java or Scala. Moreover, these programs need to be packaged with a build tool before being submitted to a cluster. This more or less limits the usage of Flink to Java/Scala programmers.
The SQL Client aims to provide an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of Java or Scala code. The SQL Client CLI allows for retrieving and visualizing real-time results from the running distributed application on the command line.
Attention The SQL Client is in an early development phase. Even though the application is not production-ready yet, it can be a quite useful tool for prototyping and playing around with Flink SQL. In the future, the community plans to extend its functionality by providing a REST-based SQL Client Gateway.
This section describes how to setup and run your first Flink SQL program from the command-line.
The SQL Client is bundled in the regular Flink distribution and thus runnable out-of-the-box. It requires only a running Flink cluster where table programs can be executed. For more information about setting up a Flink cluster see the Cluster & Deployment part. If you simply want to try out the SQL Client, you can also start a local cluster with one worker using the following command:
The SQL Client scripts are also located in the binary directory of Flink. In the future, a user will have two possibilities of starting the SQL Client CLI either by starting an embedded standalone process or by connecting to a remote SQL Client Gateway. At the moment only the embedded
mode is supported. You can start the CLI by calling:
By default, the SQL Client will read its configuration from the environment file located in ./conf/sql-client-defaults.yaml
. See the configuration part for more information about the structure of environment files.
Once the CLI has been started, you can use the HELP
command to list all available SQL statements. For validating your setup and cluster connection, you can enter your first SQL query and press the Enter
key to execute it:
This query requires no table source and produces a single row result. The CLI will retrieve results from the cluster and visualize them. You can close the result view by pressing the Q
key.
The CLI supports two modes for maintaining and visualizing results.
The table mode materializes results in memory and visualizes them in a regular, paginated table representation. It can be enabled by executing the following command in the CLI:
The changelog mode does not materialize results and visualizes the result stream that is produced by a continuous query consisting of insertions (+
) and retractions (-
).
You can use the following query to see both result modes in action:
This query performs a bounded word count example.
In changelog mode, the visualized changelog should be similar to:
In table mode, the visualized result table is continuously updated until the table program ends with:
The configuration section explains how to read from table sources and configure other table program properties.
The SQL Client can be started with the following optional CLI commands. They are discussed in detail in the subsequent paragraphs.
A SQL query needs a configuration environment in which it is executed. The so-called environment files define available table sources and sinks, external catalogs, user-defined functions, and other properties required for execution and deployment.
Every environment file is a regular YAML file. An example of such a file is presented below.
This configuration:
MyTableName
that reads from a CSV file,table
result mode.Depending on the use case, a configuration can be split into multiple files. Therefore, environment files can be created for general purposes (defaults environment file using --defaults
) as well as on a per-session basis (session environment file using --environment
). Every CLI session is initialized with the default properties followed by the session properties. For example, the defaults environment file could specify all table sources that should be available for querying in every session whereas the session environment file only declares a specific state retention time and parallelism. Both default and session environment files can be passed when starting the CLI application. If no default environment file has been specified, the SQL Client searches for ./conf/sql-client-defaults.yaml
in Flink’s configuration directory.
Attention Properties that have been set within a CLI session (e.g. using the SET
command) have highest precedence:
The SQL Client does not require to setup a Java project using Maven or SBT. Instead, you can pass the dependencies as regular JAR files that get submitted to the cluster. You can either specify each JAR file separately (using --jar
) or define entire library directories (using --library
). For connectors to external systems (such as Apache Kafka) and corresponding data formats (such as JSON), Flink provides ready-to-use JAR bundles. These JAR files are suffixed with sql-jar
and can be downloaded for each release from the Maven central repository.
Name | Version | Download |
---|---|---|
Filesystem | Built-in | |
Apache Kafka | 0.11 | Download |
Name | Download |
---|---|
CSV | Built-in |
JSON | Download |
Sources are defined using a set of YAML properties. Similar to a SQL CREATE TABLE
statement you define the name of the table, the final schema of the table, connector, and a data format if necessary. Additionally, you have to specify its type (source, sink, or both).
Attention Not every combination of connector and format is supported. Internally, your YAML file is translated into a set of string-based properties by which the SQL Client tries to resolve a matching table source. If a table source can be resolved also depends on the JAR files available in the classpath.
The following example shows an environment file that defines a table source reading JSON data from Apache Kafka. All properties are explained in the following subsections.
The resulting schema of the TaxiRide
table contains most of the fields of the JSON schema. Furthermore, it adds a rowtime attribute rowTime
and processing-time attribute procTime
. Both connector
and format
allow to define a property version (which is currently version 1
) for future backwards compatibility.
The schema allows for describing the final appearance of a table. It specifies the final name, final type, and the origin of a field. The origin of a field might be important if the name of the field should differ from the input format. For instance, a field name&field
should reference nameField
from an Avro format.
For each field, the following properties can be used:
The following type strings are supported for being defined in an environment file:
In order to control the event-time behavior for table sources, the SQL Client provides predefined timestamp extractors and watermark strategies. For more information about time handling in Flink and especially event-time, we recommend the general event-time section.
The following timestamp extractors are supported:
The following watermark strategies are supported:
Flink provides a set of connectors that can be defined in the environment file.
Attention Currently, connectors can only be used as table sources not sinks.
The filesystem connector allows for reading from a local or distributed filesystem. A filesystem can be defined as:
Currently, only files with CSV format can be read from a filesystem. The filesystem connector is included in Flink and does not require an additional JAR file.
The Kafka connector allows for reading from a Apache Kafka topic. It can be defined as follows:
Make sure to download the Kafka SQL JAR file and pass it to the SQL Client.
Flink provides a set of formats that can be defined in the environment file.
The CSV format allows to read comma-separated rows. Currently, this is only supported for the filesystem connector.
The CSV format is included in Flink and does not require an additional JAR file.
The JSON format allows to read JSON data that corresponds to a given format schema. The format schema can be defined either as a Flink type string, as a JSON schema, or derived from the desired table schema. A type string enables a more SQL-like definition and mapping to the corresponding SQL data types. The JSON schema allows for more complex and nested structures.
If the format schema is equal to the table schema, the schema can also be automatically derived. This allows for defining schema information only once. The names, types, and field order of the format are determined by the table’s schema. Time attributes are ignored. A from
definition in the table schema is interpreted as a field renaming in the format.
Currently, Flink supports only a subset of the JSON schema specification draft-07
. Union types (as well as allOf
, anyOf
, not
) are not supported yet. oneOf
and arrays of types are only supported for specifying nullability.
Simple references that link to a common definition in the document are supported as shown in the more complex example below:
Make sure to download the JSON SQL JAR file and pass it to the SQL Client.
The current SQL Client implementation is in a very early development stage and might change in the future as part of the bigger Flink Improvement Proposal 24 (FLIP-24). Feel free to join the discussion and open issue about bugs and features that you find useful.