%flink.ssql
DROP TABLE IF EXISTS game_table;
CREATE TABLE game_table (
event_time TIMESTAMP(3),
title VARCHAR(4),
ip VARCHAR(16),
buffer DOUBLE,
network_input DOUBLE,
network_output DOUBLE,
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kinesis',
'stream' = 'korea-kinesis-stream',
'aws.region' = 'us-east-1',
'scan.stream.initpos' = 'LATEST',
'format' = 'json'
);
SQL
복사
%flink.pyflink
st_env.get_config().get_configuration().set_string(
"execution.checkpointing.interval", "1min"
)
st_env.get_config().get_configuration().set_string(
"execution.checkpointing.mode", "EXACTLY_ONCE"
)
SQL
복사
%flink.ssql(type=update)
DROP TABLE IF EXISTS game_sink_table_s3;
CREATE TABLE game_sink_table_s3 (
`event_time` TIMESTAMP(3),
`buffer` DOUBLE,
`ip` VARCHAR(64),
`network_input` DOUBLE,
`title` VARCHAR(3),
`datetime` STRING,
`hour` STRING
)
PARTITIONED BY (`title`, `datetime`, `hour`)
WITH (
'connector' = 'filesystem',
'path' = 's3a://korea-s3-bucket/output/',
'format' = 'csv',
'sink.partition-commit.policy.kind' = 'success-file'
);
SQL
복사
%flink.ssql(type=update)
-- %flink.ssql(type=update, parallelism=4)
INSERT INTO game_sink_table_s3
SELECT
TUMBLE_END(event_time, INTERVAL '1' MINUTE ) as window_end_time,
AVG(buffer) as avg_price,
ip,
network_input,
title,
DATE_FORMAT(event_time, 'yyyy-MM-dd') as dt,
DATE_FORMAT(event_time, 'HH') as ht
FROM game_table
WHERE buffer > 90
GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE), event_time, title, network_input, ip
SQL
복사


