根据Heartbeat of TaskManager with id和The heartbeat of ResourceManager with id在源码中找出这样的代码
private class TaskManagerHeartbeatListener implements HeartbeatListener<AccumulatorReport, Void> { private final JobMasterGateway jobMasterGateway; private TaskManagerHeartbeatListener(JobMasterGateway jobMasterGateway) { this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway); } @Override public void notifyHeartbeatTimeout(ResourceID resourceID) { jobMasterGateway.disconnectTaskManager( resourceID, new TimeoutException("Heartbeat of TaskManager with id " + resourceID + " timed out.")); } @Override public void reportPayload(ResourceID resourceID, AccumulatorReport payload) { for (AccumulatorSnapshot snapshot : payload.getAccumulatorSnapshots()) { schedulerNG.updateAccumulators(snapshot); } } @Override public CompletableFuture<Void> retrievePayload(ResourceID resourceID) { return CompletableFuture.completedFuture(null); } } private class ResourceManagerHeartbeatListener implements HeartbeatListener<Void, Void> { @Override public void notifyHeartbeatTimeout(final ResourceID resourceId) { runAsync(() -> { log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId); if (establishedResourceManagerConnection != null && establishedResourceManagerConnection.getResourceManagerResourceID().equals(resourceId)) { reconnectToResourceManager( new JobMasterException( String.format("The heartbeat of ResourceManager with id %s timed out.", resourceId))); } }); } @Override public void reportPayload(ResourceID resourceID, Void payload) { // nothing to do since the payload is of type Void } @Override public CompletableFuture<Void> retrievePayload(ResourceID resourceID) { return CompletableFuture.completedFuture(null); } }
然后在这实例化
this.taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(resourceId,new TaskManagerHeartbeatListener(selfGateway),rpcService.getScheduledExecutor(),log);
顺着去heartbeatServices瞅瞅了
/** * HeartbeatServices gives access to all services needed for heartbeating. This includes the * creation of heartbeat receivers and heartbeat senders. */ public class HeartbeatServices { /** Heartbeat interval for the created services. */ protected final long heartbeatInterval; /** Heartbeat timeout for the created services. */ protected final long heartbeatTimeout; public HeartbeatServices(long heartbeatInterval, long heartbeatTimeout) { Preconditions.checkArgument(0L < heartbeatInterval, "The heartbeat interval must be larger than 0."); Preconditions.checkArgument(heartbeatInterval <= heartbeatTimeout, "The heartbeat timeout should be larger or equal than the heartbeat interval."); this.heartbeatInterval = heartbeatInterval; this.heartbeatTimeout = heartbeatTimeout; } /** * Creates a heartbeat manager which does not actively send heartbeats. * * @param resourceId Resource Id which identifies the owner of the heartbeat manager * @param heartbeatListener Listener which will be notified upon heartbeat timeouts for registered * targets * @param scheduledExecutor Scheduled executor to be used for scheduling heartbeat timeouts * @param log Logger to be used for the logging * @param <I> Type of the incoming payload * @param <O> Type of the outgoing payload * @return A new HeartbeatManager instance */ public <I, O> HeartbeatManager<I, O> createHeartbeatManager( ResourceID resourceId, HeartbeatListener<I, O> heartbeatListener, ScheduledExecutor scheduledExecutor, Logger log) { return new HeartbeatManagerImpl<>( heartbeatTimeout, resourceId, heartbeatListener, scheduledExecutor, scheduledExecutor, log); } /** * Creates a heartbeat manager which actively sends heartbeats to monitoring targets. * * @param resourceId Resource Id which identifies the owner of the heartbeat manager * @param heartbeatListener Listener which will be notified upon heartbeat timeouts for registered * targets * @param scheduledExecutor Scheduled executor to be used for scheduling heartbeat timeouts * @param log Logger to be used for the logging * @param <I> Type of the incoming payload * @param <O> Type of the outgoing payload * @return A new HeartbeatManager instance which actively sends heartbeats */ public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender( ResourceID resourceId, HeartbeatListener<I, O> heartbeatListener, ScheduledExecutor scheduledExecutor, Logger log) { return new HeartbeatManagerSenderImpl<>( heartbeatInterval, heartbeatTimeout, resourceId, heartbeatListener, scheduledExecutor, scheduledExecutor, log); } /** * Creates an HeartbeatServices instance from a {@link Configuration}. * * @param configuration Configuration to be used for the HeartbeatServices creation * @return An HeartbeatServices instance created from the given configuration */ public static HeartbeatServices fromConfiguration(Configuration configuration) { long heartbeatInterval = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL); long heartbeatTimeout = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT); return new HeartbeatServices(heartbeatInterval, heartbeatTimeout); } }
没错超时时间就在HeartbeatManagerOptions.HEARTBEAT_TIMEOUT
/** Timeout for requesting and receiving heartbeat for both sender and receiver sides. */ public static final ConfigOption<Long> HEARTBEAT_TIMEOUT = key("heartbeat.timeout") .defaultValue(50000L) .withDescription("Timeout for requesting and receiving heartbeat for both sender and receiver sides.");
引起心跳超时有可能是yarn压力比较大引起的,先暂时在conf/flink-conf.yaml将这个值调大一点,再观察。
#Timeout for requesting and receiving heartbeat for both sender and receiver sides. heartbeat.timeout: 180000
原文:https://www.cnblogs.com/felixzh/p/14891620.html