public class ZooKeeperHaServices extends AbstractHaServices
AbstractHaServices
using Apache ZooKeeper. The services store
data in ZooKeeper's nodes as illustrated by the following tree structure:
/flink +/cluster_id_1/resource_manager_lock | | | +/job-id-1/job_manager_lock | | /checkpoints/latest | | /latest-1 | | /latest-2 | | | +/job-id-2/job_manager_lock | +/cluster_id_2/resource_manager_lock | +/job-id-1/job_manager_lock |/checkpoints/latest | /latest-1 |/persisted_job_graph
The root path "/flink" is configurable via the option HighAvailabilityOptions.HA_ZOOKEEPER_ROOT
. This makes sure Flink stores its data under specific
subtrees in ZooKeeper, for example to accommodate specific permission.
The "cluster_id" part identifies the data stored for a specific Flink "cluster". This "cluster" can be either a standalone or containerized Flink cluster, or it can be job on a framework like YARN or Mesos (in a "per-job-cluster" mode).
In case of a "per-job-cluster" on YARN or Mesos, the cluster-id is generated and configured automatically by the client or dispatcher that submits the Job to YARN or Mesos.
In the case of a standalone cluster, that cluster-id needs to be configured via HighAvailabilityOptions.HA_CLUSTER_ID
. All nodes with the same cluster id will join the same
cluster and participate in the execution of the same set of jobs.
configuration, ioExecutor, logger
DEFAULT_JOB_ID, DEFAULT_LEADER_ID
Constructor and Description |
---|
ZooKeeperHaServices(org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework client,
Executor executor,
Configuration configuration,
BlobStoreService blobStoreService) |
Modifier and Type | Method and Description |
---|---|
CheckpointRecoveryFactory |
createCheckpointRecoveryFactory()
Create the checkpoint recovery factory for the job manager.
|
JobGraphStore |
createJobGraphStore()
Create the submitted job graph store for the job manager.
|
protected LeaderElectionService |
createLeaderElectionService(String leaderName)
Create leader election service with specified leaderName.
|
protected LeaderRetrievalService |
createLeaderRetrievalService(String leaderName)
Create leader retrieval service with specified leaderName.
|
RunningJobsRegistry |
createRunningJobsRegistry()
Create the registry that holds information about whether jobs are currently running.
|
protected String |
getLeaderNameForDispatcher()
Get the leader name for Dispatcher.
|
String |
getLeaderNameForJobManager(JobID jobID)
Get the leader name for specific JobManager.
|
protected String |
getLeaderNameForResourceManager()
Get the leader name for ResourceManager.
|
protected String |
getLeaderNameForRestServer()
Get the leader name for RestServer.
|
void |
internalCleanup()
Clean up the meta data in the distributed system(e.g.
|
void |
internalCleanupJobData(JobID jobID)
Clean up the meta data in the distributed system(e.g.
|
void |
internalClose()
Closes the components which is used for external operations(e.g.
|
cleanupJobData, close, closeAndCleanupAllData, createBlobStore, getCheckpointRecoveryFactory, getClusterRestEndpointLeaderElectionService, getClusterRestEndpointLeaderRetriever, getDispatcherLeaderElectionService, getDispatcherLeaderRetriever, getJobGraphStore, getJobManagerLeaderElectionService, getJobManagerLeaderRetriever, getJobManagerLeaderRetriever, getResourceManagerLeaderElectionService, getResourceManagerLeaderRetriever, getRunningJobsRegistry
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
getWebMonitorLeaderElectionService, getWebMonitorLeaderRetriever
public ZooKeeperHaServices(org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework client, Executor executor, Configuration configuration, BlobStoreService blobStoreService)
public CheckpointRecoveryFactory createCheckpointRecoveryFactory()
AbstractHaServices
createCheckpointRecoveryFactory
in class AbstractHaServices
public JobGraphStore createJobGraphStore() throws Exception
AbstractHaServices
createJobGraphStore
in class AbstractHaServices
Exception
- if the submitted job graph store could not be createdpublic RunningJobsRegistry createRunningJobsRegistry()
AbstractHaServices
createRunningJobsRegistry
in class AbstractHaServices
protected LeaderElectionService createLeaderElectionService(String leaderName)
AbstractHaServices
createLeaderElectionService
in class AbstractHaServices
leaderName
- ConfigMap name in Kubernetes or child node path in Zookeeper.protected LeaderRetrievalService createLeaderRetrievalService(String leaderName)
AbstractHaServices
createLeaderRetrievalService
in class AbstractHaServices
leaderName
- ConfigMap name in Kubernetes or child node path in Zookeeper.public void internalClose()
AbstractHaServices
internalClose
in class AbstractHaServices
public void internalCleanup() throws Exception
AbstractHaServices
If an exception occurs during internal cleanup, we will continue the cleanup in AbstractHaServices.closeAndCleanupAllData()
and report exceptions only after all cleanup steps have been
attempted.
internalCleanup
in class AbstractHaServices
Exception
- when do the cleanup operation on external storage.public void internalCleanupJobData(JobID jobID) throws Exception
AbstractHaServices
internalCleanupJobData
in class AbstractHaServices
jobID
- The identifier of the job to cleanup.Exception
- when do the cleanup operation on external storage.protected String getLeaderNameForResourceManager()
AbstractHaServices
getLeaderNameForResourceManager
in class AbstractHaServices
protected String getLeaderNameForDispatcher()
AbstractHaServices
getLeaderNameForDispatcher
in class AbstractHaServices
public String getLeaderNameForJobManager(JobID jobID)
AbstractHaServices
getLeaderNameForJobManager
in class AbstractHaServices
jobID
- job idprotected String getLeaderNameForRestServer()
AbstractHaServices
getLeaderNameForRestServer
in class AbstractHaServices
Copyright © 2014–2021 The Apache Software Foundation. All rights reserved.