Syntaxe de base

Création d’un stream sur un topic Kafka existant :

CREATE [OR REPLACE] [SOURCE] STREAM [IF NOT EXISTS] stream_name
  ( { column_name data_type [KEY | HEADERS | HEADER(key)] } [, ...] )
WITH ( property_name = expression [, ...] );

SOURCE : indique que le stream lit depuis un topic existant sans y écrire.

IF NOT EXISTS : évite erreur si le stream existe déjà.

Exemple complet

Stream pour données de capteurs IoT :

CREATE STREAM iot_stream_data (
    sensor_id VARCHAR KEY,
    ville VARCHAR,
    temperature DOUBLE,
    ts VARCHAR
) WITH (
    KAFKA_TOPIC = 'iot-sensor-data',
    VALUE_FORMAT = 'JSON',
    TIMESTAMP = 'ts',
    TIMESTAMP_FORMAT = 'yyyy-MM-dd''T''HH:mm:ss.SSSX'
);

sensor_id est la clé du message Kafka.

Les données sont au format JSON dans le topic iot-sensor-data.

Le champ ts est utilisé comme timestamp avec format ISO 8601.

Types de colonnes

Spécification du rôle des colonnes :

KEY : colonne utilisée comme clé de partition Kafka.

HEADERS : métadonnées du message.

HEADER(key) : header spécifique du message.

Sans spécification : colonne de valeur standard.

Exemple avec headers

CREATE STREAM orders_with_headers (
    order_id VARCHAR KEY,
    customer_id VARCHAR,
    amount DOUBLE,
    source VARCHAR HEADER('source')
) WITH (
    KAFKA_TOPIC = 'orders',
    VALUE_FORMAT = 'AVRO'
);