Source code for pyflink.datastream.formats.csv

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from typing import Optional, TYPE_CHECKING

from pyflink.common.serialization import BulkWriterFactory, RowDataBulkWriterFactory, \
    SerializationSchema, DeserializationSchema
from pyflink.common.typeinfo import TypeInformation
from pyflink.datastream.connectors.file_system import StreamFormat
from pyflink.java_gateway import get_gateway

if TYPE_CHECKING:
    from pyflink.table.types import DataType, RowType, NumericType

__all__ = [
    'CsvSchema',
    'CsvSchemaBuilder',
    'CsvReaderFormat',
    'CsvBulkWriters',
    'CsvRowDeserializationSchema',
    'CsvRowSerializationSchema'
]


[docs]class CsvSchema(object): """ CsvSchema holds schema information of a csv file, corresponding to Java ``com.fasterxml.jackson.dataformat.csv.CsvSchema`` class. .. versionadded:: 1.16.0 """ def __init__(self, j_schema, row_type: 'RowType'): self._j_schema = j_schema self._row_type = row_type self._type_info = None @staticmethod def builder() -> 'CsvSchemaBuilder': """ Returns a :class:`CsvSchemaBuilder`. """ return CsvSchemaBuilder() def size(self): return self._j_schema.size() def __len__(self): return self.size() def __str__(self): return self._j_schema.toString() def __repr__(self): return str(self)
[docs]class CsvSchemaBuilder(object): """ CsvSchemaBuilder is for building a :class:`~CsvSchema`, corresponding to Java ``com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder`` class. .. versionadded:: 1.16.0 """ def __init__(self): jvm = get_gateway().jvm self._j_schema_builder = jvm.org.apache.flink.shaded.jackson2.com.fasterxml.jackson \ .dataformat.csv.CsvSchema.builder() self._fields = [] def build(self) -> 'CsvSchema': """ Build the :class:`~CsvSchema`. """ from pyflink.table.types import DataTypes return CsvSchema(self._j_schema_builder.build(), DataTypes.ROW(self._fields)) def add_array_column(self, name: str, separator: str = ';', element_type: Optional['DataType'] = None) \ -> 'CsvSchemaBuilder': """ Add an array column to schema, the type of elements could be specified via ``element_type``, which should be primitive types. :param name: Name of the column. :param separator: Text separator of array elements, default to ``;``. :param element_type: DataType of array elements, default to ``DataTypes.STRING()``. """ from pyflink.table.types import DataTypes if element_type is None: element_type = DataTypes.STRING() self._j_schema_builder.addArrayColumn(name, separator) self._fields.append(DataTypes.FIELD(name, DataTypes.ARRAY(element_type))) return self def add_boolean_column(self, name: str) -> 'CsvSchemaBuilder': """ Add a boolean column to schema, with type as ``DataTypes.BOOLEAN()``. :param name: Name of the column. """ from pyflink.table.types import DataTypes self._j_schema_builder.addBooleanColumn(name) self._fields.append(DataTypes.FIELD(name, DataTypes.BOOLEAN())) return self def add_number_column(self, name: str, number_type: Optional['NumericType'] = None) \ -> 'CsvSchemaBuilder': """ Add a number column to schema, the type of number could be specified via ``number_type``. :param name: Name of the column. :param number_type: DataType of the number, default to ``DataTypes.BIGINT()``. """ from pyflink.table.types import DataTypes if number_type is None: number_type = DataTypes.BIGINT() self._j_schema_builder.addNumberColumn(name) self._fields.append(DataTypes.FIELD(name, number_type)) return self def add_string_column(self, name: str) -> 'CsvSchemaBuilder': """ Add a string column to schema, with type as ``DataTypes.STRING()``. :param name: Name of the column. """ from pyflink.table.types import DataTypes self._j_schema_builder.addColumn(name) self._fields.append(DataTypes.FIELD(name, DataTypes.STRING())) return self def add_columns_from(self, schema: 'CsvSchema') -> 'CsvSchemaBuilder': """ Add all columns in ``schema`` to current schema. :param schema: Another :class:`CsvSchema`. """ self._j_schema_builder.addColumnsFrom(schema._j_schema) for field in schema._row_type: self._fields.append(field) return self def clear_columns(self): """ Delete all columns in the schema. """ self._j_schema_builder.clearColumns() self._fields.clear() return self def set_allow_comments(self, allow: bool = True): """ Allow using ``#`` prefixed comments in csv file. """ self._j_schema_builder.setAllowComments(allow) return self def set_any_property_name(self, name: str): self._j_schema_builder.setAnyPropertyName(name) return self def disable_array_element_separator(self): """ Set array element separator to ``""``. """ self._j_schema_builder.disableArrayElementSeparator() return self def remove_array_element_separator(self, index: int): """ Set array element separator of a column specified by ``index`` to ``""``. """ self._j_schema_builder.removeArrayElementSeparator(index) return self def set_array_element_separator(self, separator: str): """ Set global array element separator, default to ``;``. """ self._j_schema_builder.setArrayElementSeparator(separator) return self def set_column_separator(self, char: str): """ Set column separator, ``char`` should be a single char, default to ``,``. """ if len(char) != 1: raise ValueError('Column separator must be a single char, got {}'.format(char)) self._j_schema_builder.setColumnSeparator(char) return self def disable_escape_char(self): """ Disable escaping in csv file. """ self._j_schema_builder.disableEscapeChar() return self def set_escape_char(self, char: str): """ Set escape char, ``char`` should be a single char, default to no-escaping. """ if len(char) != 1: raise ValueError('Escape char must be a single char, got {}'.format(char)) self._j_schema_builder.setEscapeChar(char) return self def set_line_separator(self, separator: str): """ Set line separator, default to ``\\n``. This is only configurable for writing, for reading, ``\\n``, ``\\r``, ``\\r\\n`` are recognized. """ self._j_schema_builder.setLineSeparator(separator) return self def set_null_value(self, null_value: str): """ Set literal for null value, default to empty sequence. """ self._j_schema_builder.setNullValue(null_value) def disable_quote_char(self): """ Disable quote char. """ self._j_schema_builder.disableQuoteChar() return self def set_quote_char(self, char: str): """ Set quote char, default to ``"``. """ if len(char) != 1: raise ValueError('Quote char must be a single char, got {}'.format(char)) self._j_schema_builder.setQuoteChar(char) return self def set_skip_first_data_row(self, skip: bool = True): """ Set whether to skip the first row of csv file. """ self._j_schema_builder.setSkipFirstDataRow(skip) return self def set_strict_headers(self, strict: bool = True): """ Set whether to use strict headers, which check column names in the header are consistent with the schema. """ self._j_schema_builder.setStrictHeaders(strict) return self def set_use_header(self, use: bool = True): """ Set whether to read header. """ self._j_schema_builder.setUseHeader(use) return self def size(self): return len(self._fields) def __len__(self): return self.size()
[docs]class CsvReaderFormat(StreamFormat): """ The :class:`~StreamFormat` for reading csv files. Example: :: >>> schema = CsvSchema.builder() \\ ... .add_number_column('id', number_type=DataTypes.INT()) \\ ... .add_string_column('name') \\ ... .add_array_column('list', ',', element_type=DataTypes.STRING()) \\ ... .set_column_separator('|') \\ ... .set_escape_char('\\\\') \\ ... .set_use_header() \\ ... .set_strict_headers() \\ ... .build() >>> source = FileSource.for_record_stream_format( ... CsvReaderFormat.for_schema(schema), CSV_FILE_PATH).build() >>> ds = env.from_source(source, WatermarkStrategy.no_watermarks(), 'csv-source') >>> # the type of records is Types.ROW_NAMED(['id', 'name', 'list'], >>> # [Types.INT(), Types.STRING(), Types.LIST(Types.STRING())]) .. versionadded:: 1.16.0 """ def __init__(self, j_csv_format): super().__init__(j_csv_format) @staticmethod def for_schema(schema: 'CsvSchema') -> 'CsvReaderFormat': """ Builds a :class:`CsvReaderFormat` using `CsvSchema`. """ from pyflink.table.types import _to_java_data_type jvm = get_gateway().jvm j_csv_format = jvm.org.apache.flink.formats.csv.PythonCsvUtils \ .createCsvReaderFormat( schema._j_schema, _to_java_data_type(schema._row_type) ) return CsvReaderFormat(j_csv_format)
[docs]class CsvBulkWriters(object): """ CsvBulkWriter is for building :class:`~pyflink.common.serialization.BulkWriterFactory` to write Rows with a predefined CSV schema to partitioned files in a bulk fashion. Example: :: >>> schema = CsvSchema.builder() \\ ... .add_number_column('id', number_type=DataTypes.INT()) \\ ... .add_string_column('name') \\ ... .add_array_column('list', ',', element_type=DataTypes.STRING()) \\ ... .set_column_separator('|') \\ ... .build() >>> sink = FileSink.for_bulk_format( ... OUTPUT_DIR, CsvBulkWriters.for_schema(schema)).build() >>> ds.sink_to(sink) .. versionadded:: 1.16.0 """ @staticmethod def for_schema(schema: 'CsvSchema') -> 'BulkWriterFactory': """ Creates a :class:`~pyflink.common.serialization.BulkWriterFactory` for writing records to files in CSV format. """ from pyflink.table.types import _to_java_data_type jvm = get_gateway().jvm csv = jvm.org.apache.flink.formats.csv j_factory = csv.PythonCsvUtils.createCsvBulkWriterFactory( schema._j_schema, _to_java_data_type(schema._row_type)) return RowDataBulkWriterFactory(j_factory, schema._row_type)
[docs]class CsvRowDeserializationSchema(DeserializationSchema): """ Deserialization schema from CSV to Flink types. Deserializes a byte[] message as a JsonNode and converts it to Row. Failure during deserialization are forwarded as wrapped IOException. """ def __init__(self, j_deserialization_schema): super(CsvRowDeserializationSchema, self).__init__( j_deserialization_schema=j_deserialization_schema) class Builder(object): """ A builder for creating a CsvRowDeserializationSchema. """ def __init__(self, type_info: TypeInformation): if type_info is None: raise TypeError("Type information must not be None") self._j_builder = get_gateway().jvm\ .org.apache.flink.formats.csv.CsvRowDeserializationSchema.Builder( type_info.get_java_type_info()) def set_field_delimiter(self, delimiter: str): self._j_builder = self._j_builder.setFieldDelimiter(delimiter) return self def set_allow_comments(self, allow_comments: bool): self._j_builder = self._j_builder.setAllowComments(allow_comments) return self def set_array_element_delimiter(self, delimiter: str): self._j_builder = self._j_builder.setArrayElementDelimiter(delimiter) return self def set_quote_character(self, c: str): self._j_builder = self._j_builder.setQuoteCharacter(c) return self def set_escape_character(self, c: str): self._j_builder = self._j_builder.setEscapeCharacter(c) return self def set_null_literal(self, null_literal: str): self._j_builder = self._j_builder.setNullLiteral(null_literal) return self def set_ignore_parse_errors(self, ignore_parse_errors: bool): self._j_builder = self._j_builder.setIgnoreParseErrors(ignore_parse_errors) return self def build(self): j_csv_row_deserialization_schema = self._j_builder.build() return CsvRowDeserializationSchema( j_deserialization_schema=j_csv_row_deserialization_schema)
[docs]class CsvRowSerializationSchema(SerializationSchema): """ Serialization schema that serializes an object of Flink types into a CSV bytes. Serializes the input row into an ObjectNode and converts it into byte[]. Result byte[] messages can be deserialized using CsvRowDeserializationSchema. """ def __init__(self, j_csv_row_serialization_schema): super(CsvRowSerializationSchema, self).__init__(j_csv_row_serialization_schema) class Builder(object): """ A builder for creating a CsvRowSerializationSchema. """ def __init__(self, type_info: TypeInformation): if type_info is None: raise TypeError("Type information must not be None") self._j_builder = get_gateway().jvm\ .org.apache.flink.formats.csv.CsvRowSerializationSchema.Builder( type_info.get_java_type_info()) def set_field_delimiter(self, c: str): self._j_builder = self._j_builder.setFieldDelimiter(c) return self def set_line_delimiter(self, delimiter: str): self._j_builder = self._j_builder.setLineDelimiter(delimiter) return self def set_array_element_delimiter(self, delimiter: str): self._j_builder = self._j_builder.setArrayElementDelimiter(delimiter) return self def disable_quote_character(self): self._j_builder = self._j_builder.disableQuoteCharacter() return self def set_quote_character(self, c: str): self._j_builder = self._j_builder.setQuoteCharacter(c) return self def set_escape_character(self, c: str): self._j_builder = self._j_builder.setEscapeCharacter(c) return self def set_null_literal(self, s: str): self._j_builder = self._j_builder.setNullLiteral(s) return self def build(self): j_serialization_schema = self._j_builder.build() return CsvRowSerializationSchema(j_csv_row_serialization_schema=j_serialization_schema)