Search

Kinesis Data Analytics Studio

%flink.ssql DROP TABLE IF EXISTS game_table; CREATE TABLE game_table ( event_time TIMESTAMP(3), title VARCHAR(4), ip VARCHAR(16), buffer DOUBLE, network_input DOUBLE, network_output DOUBLE, WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kinesis', 'stream' = 'korea-kinesis-stream', 'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json' );
SQL
복사
%flink.pyflink st_env.get_config().get_configuration().set_string( "execution.checkpointing.interval", "1min" ) st_env.get_config().get_configuration().set_string( "execution.checkpointing.mode", "EXACTLY_ONCE" )
SQL
복사
%flink.ssql(type=update) DROP TABLE IF EXISTS game_sink_table_s3; CREATE TABLE game_sink_table_s3 ( `event_time` TIMESTAMP(3), `buffer` DOUBLE, `ip` VARCHAR(64), `network_input` DOUBLE, `title` VARCHAR(3), `datetime` STRING, `hour` STRING ) PARTITIONED BY (`title`, `datetime`, `hour`) WITH ( 'connector' = 'filesystem', 'path' = 's3a://korea-s3-bucket/output/', 'format' = 'csv', 'sink.partition-commit.policy.kind' = 'success-file' );
SQL
복사
%flink.ssql(type=update) -- %flink.ssql(type=update, parallelism=4) INSERT INTO game_sink_table_s3 SELECT TUMBLE_END(event_time, INTERVAL '1' MINUTE ) as window_end_time, AVG(buffer) as avg_price, ip, network_input, title, DATE_FORMAT(event_time, 'yyyy-MM-dd') as dt, DATE_FORMAT(event_time, 'HH') as ht FROM game_table WHERE buffer > 90 GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE), event_time, title, network_input, ip
SQL
복사