env.enableCheckpointing(60_000);
env.setStateBackend((StateBackend) new FsStateBackend("/tmp/flink/checkpoints"));
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
$ bin/flink cancel -s /tmp/flink/savepoints 1253cc85e5c702dbe963dd7d8d279038
Cancelled job 1253cc85e5c702dbe963dd7d8d279038. Savepoint stored in file:/tmp/flink/savepoints/savepoint-1253cc-0df030f4f2ee.
flink run -s /tmp/flink/savepoints/savepoint-1253cc-0df030f4f2ee -c com.shzhangji.flinksandbox.kafka.KafkaLoader target/flink-sandbox-0.1.0.jar
$ export HADOOP_CONF_DIR=/path/to/hadoop/conf
$ bin/flink run -m yarn-cluster -c com.shzhangji.flinksandbox.kafka.KafkaLoader target/flink-sandbox-0.1.0.jar
Submitted application application_1545534487726_0001
bin/flink cancel -s hdfs://localhost:9000/tmp/flink/savepoints -yid application_1545534487726
abstract class FlinkKafkaConsumerBase implements CheckpointedFunction { public void initializeState(FunctionInitializationContext context) { OperatorStateStore stateStore = context.getOperatorStateStore(); this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>( OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {}))); if (context.isRestored()) { for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) { restoredState.put(kafkaOffset.f0, kafkaOffset.f1); } } } public void snapshotState(FunctionSnapshotContext context) { unionOffsetStates.clear(); for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) { unionOffsetStates.add(Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue())); } } }
2018-12-23 10:56:47,380 INFO FlinkKafkaConsumerBase
Consumer subtask 0 will start reading 2 partitions with offsets in restored state:
{KafkaTopicPartition{topic=‘flink_test‘, partition=1}=725,
KafkaTopicPartition{topic=‘flink_test‘, partition=0}=721}
HadoopRecoverableFsDataOutputStream(FileSystem fs, HadoopFsRecoverable recoverable) { this.tempFile = checkNotNull(recoverable.tempFile()); truncate(fs, tempFile, recoverable.offset()); out = fs.append(tempFile); }
原文:https://www.cnblogs.com/mn-lily/p/14607916.html