Syntaxe de base
Création d’un stream sur un topic Kafka existant :
CREATE [OR REPLACE] [SOURCE] STREAM [IF NOT EXISTS] stream_name
( { column_name data_type [KEY | HEADERS | HEADER(key)] } [, ...] )
WITH ( property_name = expression [, ...] );SOURCE : indique que le stream lit depuis un topic existant sans y écrire.
IF NOT EXISTS : évite erreur si le stream existe déjà.
Exemple complet
Stream pour données de capteurs IoT :
CREATE STREAM iot_stream_data (
sensor_id VARCHAR KEY,
ville VARCHAR,
temperature DOUBLE,
ts VARCHAR
) WITH (
KAFKA_TOPIC = 'iot-sensor-data',
VALUE_FORMAT = 'JSON',
TIMESTAMP = 'ts',
TIMESTAMP_FORMAT = 'yyyy-MM-dd''T''HH:mm:ss.SSSX'
);sensor_id est la clé du message Kafka.
Les données sont au format JSON dans le topic iot-sensor-data.
Le champ ts est utilisé comme timestamp avec format ISO 8601.
Types de colonnes
Spécification du rôle des colonnes :
KEY : colonne utilisée comme clé de partition Kafka.
HEADERS : métadonnées du message.
HEADER(key) : header spécifique du message.
Sans spécification : colonne de valeur standard.
Exemple avec headers
CREATE STREAM orders_with_headers (
order_id VARCHAR KEY,
customer_id VARCHAR,
amount DOUBLE,
source VARCHAR HEADER('source')
) WITH (
KAFKA_TOPIC = 'orders',
VALUE_FORMAT = 'AVRO'
);