patroni.dcs package
Submodules
- patroni.dcs.consul module
ConsulConsul.__init__()Consul._abc_implConsul._cluster_from_nodes()Consul._consistencyConsul._delete_leader()Consul._do_attempt_to_acquire_leader()Consul._do_refresh_session()Consul._load_cluster()Consul._mpp_cluster_loader()Consul._postgresql_cluster_loader()Consul._run_and_handle_exceptions()Consul._set_service_name()Consul._update_leader()Consul._update_service()Consul._write_failsafe()Consul._write_leader_optime()Consul._write_status()Consul.adjust_ttl()Consul.attempt_to_acquire_leader()Consul.cancel_initialization()Consul.create_session()Consul.delete_cluster()Consul.delete_sync_state()Consul.deregister_service()Consul.initialize()Consul.member()Consul.refresh_session()Consul.register_service()Consul.reload_config()Consul.retry()Consul.set_config_value()Consul.set_failover_value()Consul.set_history_value()Consul.set_retry_timeout()Consul.set_sync_state_value()Consul.set_ttl()Consul.take_leader()Consul.touch_member()Consul.ttlConsul.update_service()Consul.watch()
ConsulClientConsulErrorConsulInternalErrorHTTPClientInvalidSessionInvalidSessionTTLResponsecatch_consul_errors()force_if_last_failed()service_name_from_scope_name()
- patroni.dcs.etcd module
AbstractEtcdAbstractEtcd.__init__()AbstractEtcd._abc_implAbstractEtcd._clientAbstractEtcd._handle_exception()AbstractEtcd._run_and_handle_exceptions()AbstractEtcd.get_etcd_client()AbstractEtcd.handle_etcd_exceptions()AbstractEtcd.reload_config()AbstractEtcd.retry()AbstractEtcd.set_retry_timeout()AbstractEtcd.set_socket_options()AbstractEtcd.set_ttl()AbstractEtcd.ttl
AbstractEtcdClientWithFailoverAbstractEtcdClientWithFailover.ERROR_CLSAbstractEtcdClientWithFailover.__init__()AbstractEtcdClientWithFailover._abc_implAbstractEtcdClientWithFailover._calculate_timeouts()AbstractEtcdClientWithFailover._do_http_request()AbstractEtcdClientWithFailover._get_headers()AbstractEtcdClientWithFailover._get_machines_cache_from_config()AbstractEtcdClientWithFailover._get_machines_cache_from_dns()AbstractEtcdClientWithFailover._get_machines_cache_from_srv()AbstractEtcdClientWithFailover._get_machines_list()AbstractEtcdClientWithFailover._get_members()AbstractEtcdClientWithFailover._load_machines_cache()AbstractEtcdClientWithFailover._prepare_common_parameters()AbstractEtcdClientWithFailover._prepare_get_members()AbstractEtcdClientWithFailover._prepare_request()AbstractEtcdClientWithFailover._refresh_machines_cache()AbstractEtcdClientWithFailover._update_dns_cache()AbstractEtcdClientWithFailover.api_execute()AbstractEtcdClientWithFailover.get_srv_record()AbstractEtcdClientWithFailover.machinesAbstractEtcdClientWithFailover.machines_cacheAbstractEtcdClientWithFailover.reload_config()AbstractEtcdClientWithFailover.set_base_uri()AbstractEtcdClientWithFailover.set_machines_cache_ttl()AbstractEtcdClientWithFailover.set_read_timeout()
DnsCachingResolverEtcdEtcd.__init__()Etcd._abc_implEtcd._clientEtcd._cluster_from_nodes()Etcd._delete_leader()Etcd._do_attempt_to_acquire_leader()Etcd._do_update_leader()Etcd._load_cluster()Etcd._mpp_cluster_loader()Etcd._postgresql_cluster_loader()Etcd._update_leader()Etcd._write_failsafe()Etcd._write_leader_optime()Etcd._write_status()Etcd.attempt_to_acquire_leader()Etcd.cancel_initialization()Etcd.delete_cluster()Etcd.delete_sync_state()Etcd.initialize()Etcd.member()Etcd.set_config_value()Etcd.set_failover_value()Etcd.set_history_value()Etcd.set_sync_state_value()Etcd.set_ttl()Etcd.take_leader()Etcd.touch_member()Etcd.watch()
EtcdClientEtcdErrorEtcdRaftInternalStaleEtcdNodeStaleEtcdNodeGuardcatch_etcd_errors()
- patroni.dcs.etcd3 module
AuthFailedAuthNotEnabledAuthOldRevisionDeadlineExceededEtcd3Etcd3.__init__()Etcd3._abc_implEtcd3._clientEtcd3._cluster_from_nodes()Etcd3._delete_leader()Etcd3._do_attempt_to_acquire_leader()Etcd3._do_refresh_lease()Etcd3._load_cluster()Etcd3._mpp_cluster_loader()Etcd3._postgresql_cluster_loader()Etcd3._update_leader()Etcd3._write_failsafe()Etcd3._write_leader_optime()Etcd3._write_status()Etcd3.attempt_to_acquire_leader()Etcd3.cancel_initialization()Etcd3.cluster_prefixEtcd3.create_lease()Etcd3.delete_cluster()Etcd3.delete_sync_state()Etcd3.initialize()Etcd3.member()Etcd3.refresh_lease()Etcd3.set_config_value()Etcd3.set_failover_value()Etcd3.set_history_value()Etcd3.set_socket_options()Etcd3.set_sync_state_value()Etcd3.set_ttl()Etcd3.take_leader()Etcd3.touch_member()Etcd3.watch()
Etcd3ClientEtcd3Client.ERROR_CLSEtcd3Client.__init__()Etcd3Client._abc_implEtcd3Client._do_auth_request()Etcd3Client._do_member_list_request()Etcd3Client._ensure_version_prefix()Etcd3Client._get_headers()Etcd3Client._get_members()Etcd3Client._handle_server_response()Etcd3Client._prepare_get_members()Etcd3Client._prepare_request()Etcd3Client.authenticate()Etcd3Client.authenticate_on_start()Etcd3Client.call_rpc()Etcd3Client.deleteprefix()Etcd3Client.deleterange()Etcd3Client.handle_auth_errors()Etcd3Client.lease_grant()Etcd3Client.lease_keepalive()Etcd3Client.prefix()Etcd3Client.put()Etcd3Client.range()Etcd3Client.txn()Etcd3Client.watchprefix()Etcd3Client.watchrange()
Etcd3ClientErrorEtcd3ErrorEtcd3ExceptionEtcd3WatchCanceledFailedPreconditionGRPCCodeGRPCCode.AbortedGRPCCode.AlreadyExistsGRPCCode.CanceledGRPCCode.DataLossGRPCCode.DeadlineExceededGRPCCode.FailedPreconditionGRPCCode.InternalGRPCCode.InvalidArgumentGRPCCode.NotFoundGRPCCode.OKGRPCCode.OutOfRangeGRPCCode.PermissionDeniedGRPCCode.ResourceExhaustedGRPCCode.UnauthenticatedGRPCCode.UnavailableGRPCCode.UnimplementedGRPCCode.Unknown
InvalidArgumentInvalidAuthTokenKVCacheLeaseNotFoundNotFoundPatroniEtcd3ClientPatroniEtcd3Client.__init__()PatroniEtcd3Client._abc_implPatroniEtcd3Client._restart_watcher()PatroniEtcd3Client._wait_cache()PatroniEtcd3Client.call_rpc()PatroniEtcd3Client.configure()PatroniEtcd3Client.get_cluster()PatroniEtcd3Client.set_base_uri()PatroniEtcd3Client.start_watcher()PatroniEtcd3Client.txn()
PermissionDeniedUnavailableUnknownUnsupportedEtcdVersionUserEmpty_handle_auth_errors()_raise_for_data()base64_decode()base64_encode()build_range_request()prefix_range_end()to_bytes()
- patroni.dcs.exhibitor module
- patroni.dcs.kubernetes module
CoreV1ApiProxyK8sClientK8sClient.ApiClientK8sClient.ApiClient._API_URL_PREFIXK8sClient.ApiClient.__init__()K8sClient.ApiClient._calculate_timeouts()K8sClient.ApiClient._do_http_request()K8sClient.ApiClient._get_api_servers()K8sClient.ApiClient._handle_server_response()K8sClient.ApiClient._load_api_servers_cache()K8sClient.ApiClient._make_headers()K8sClient.ApiClient._refresh_api_servers_cache()K8sClient.ApiClient.api_servers_cacheK8sClient.ApiClient.call_api()K8sClient.ApiClient.refresh_api_servers_cache()K8sClient.ApiClient.request()K8sClient.ApiClient.set_api_servers_cache_ttl()K8sClient.ApiClient.set_base_uri()K8sClient.ApiClient.set_read_timeout()
K8sClient.CoreV1ApiK8sClient._K8sObjectTemplateK8sClient.__init__()K8sClient.rest
K8sConfigK8sConnectionFailedK8sExceptionK8sObjectKubernetesKubernetes.__init__()Kubernetes.__load_cluster()Kubernetes.__target_ref()Kubernetes._abc_implKubernetes._cluster_from_nodes()Kubernetes._config_resource_versionKubernetes._create_config_service()Kubernetes._delete_leader()Kubernetes._isotime()Kubernetes._load_cluster()Kubernetes._map_subsets()Kubernetes._mpp_cluster_loader()Kubernetes._patch_or_create()Kubernetes._postgresql_cluster_loader()Kubernetes._update_leader()Kubernetes._update_leader_with_retry()Kubernetes._wait_caches()Kubernetes._write_failsafe()Kubernetes._write_leader_optime()Kubernetes._write_status()Kubernetes.attempt_to_acquire_leader()Kubernetes.cancel_initialization()Kubernetes.client_path()Kubernetes.compare_ports()Kubernetes.delete_cluster()Kubernetes.delete_leader()Kubernetes.delete_sync_state()Kubernetes.get_mpp_coordinator()Kubernetes.initialize()Kubernetes.leader_pathKubernetes.manual_failover()Kubernetes.member()Kubernetes.patch_or_create()Kubernetes.patch_or_create_config()Kubernetes.reload_config()Kubernetes.retry()Kubernetes.set_config_value()Kubernetes.set_failover_value()Kubernetes.set_history_value()Kubernetes.set_retry_timeout()Kubernetes.set_sync_state_value()Kubernetes.set_ttl()Kubernetes.subsets_changed()Kubernetes.take_leader()Kubernetes.touch_member()Kubernetes.ttlKubernetes.update_leader()Kubernetes.watch()Kubernetes.write_leader_optime()Kubernetes.write_sync_state()
KubernetesErrorKubernetesRetriableExceptionObjectCacheObjectCache.__init__()ObjectCache._build_cache()ObjectCache._do_watch()ObjectCache._finish_response()ObjectCache._list()ObjectCache._process_event()ObjectCache._watch()ObjectCache.copy()ObjectCache.delete()ObjectCache.get()ObjectCache.is_ready()ObjectCache.kill_stream()ObjectCache.run()ObjectCache.set()
_cleanup_temp_files()_create_temp_file()_run_and_handle_exceptions()catch_kubernetes_errors()to_camel_case()
- patroni.dcs.raft module
DynMemberSyncObjKVStoreTTLKVStoreTTL.__check_requirements()KVStoreTTL.__expire_keys()KVStoreTTL.__init__()KVStoreTTL.__pop()KVStoreTTL.__values_match()KVStoreTTL._autoTickThread()KVStoreTTL._delete()KVStoreTTL._delete_v0()KVStoreTTL._expire()KVStoreTTL._expire_v0()KVStoreTTL._onTick()KVStoreTTL._set()KVStoreTTL._set_v0()KVStoreTTL.delete()KVStoreTTL.destroy()KVStoreTTL.get()KVStoreTTL.retry()KVStoreTTL.set()KVStoreTTL.set_retry_timeout()KVStoreTTL.startAutoTick()
RaftRaft.__init__()Raft._abc_implRaft._cluster_from_nodes()Raft._delete_leader()Raft._load_cluster()Raft._mpp_cluster_loader()Raft._on_delete()Raft._on_set()Raft._postgresql_cluster_loader()Raft._update_leader()Raft._write_failsafe()Raft._write_leader_optime()Raft._write_status()Raft.attempt_to_acquire_leader()Raft.cancel_initialization()Raft.delete_cluster()Raft.delete_sync_state()Raft.initialize()Raft.member()Raft.reload_config()Raft.set_config_value()Raft.set_failover_value()Raft.set_history_value()Raft.set_retry_timeout()Raft.set_sync_state_value()Raft.set_ttl()Raft.take_leader()Raft.touch_member()Raft.ttlRaft.watch()
RaftErrorSyncObjUtility_TCPTransportresolve_host()
- patroni.dcs.zookeeper module
PatroniKazooClientPatroniSequentialThreadingHandlerZooKeeperZooKeeper.__init__()ZooKeeper._abc_implZooKeeper._cancel_initialization()ZooKeeper._create()ZooKeeper._delete_leader()ZooKeeper._kazoo_connect()ZooKeeper._load_cluster()ZooKeeper._mpp_cluster_loader()ZooKeeper._postgresql_cluster_loader()ZooKeeper._set_or_create()ZooKeeper._update_leader()ZooKeeper._watcher()ZooKeeper._write_failsafe()ZooKeeper._write_leader_optime()ZooKeeper._write_status()ZooKeeper.attempt_to_acquire_leader()ZooKeeper.cancel_initialization()ZooKeeper.delete_cluster()ZooKeeper.delete_sync_state()ZooKeeper.get_children()ZooKeeper.get_node()ZooKeeper.get_status()ZooKeeper.initialize()ZooKeeper.load_members()ZooKeeper.member()ZooKeeper.reload_config()ZooKeeper.set_config_value()ZooKeeper.set_failover_value()ZooKeeper.set_history_value()ZooKeeper.set_retry_timeout()ZooKeeper.set_sync_state_value()ZooKeeper.set_ttl()ZooKeeper.take_leader()ZooKeeper.touch_member()ZooKeeper.ttlZooKeeper.watch()
ZooKeeperError
Module contents
Abstract classes for Distributed Configuration Store.
- class patroni.dcs.AbstractDCS(config: Dict[str, Any], mpp: AbstractMPP)
Bases:
ABCAbstract representation of DCS modules.
Implementations of a concrete DCS class, using appropriate backend client interfaces, must include the following methods and properties.
Functional methods that are critical in their timing, required to complete within
retry_timeoutperiod in order to prevent the DCS considered inaccessible, each perform construction of complex data objects:_postgresql_cluster_loader():method which processes the structure of data stored in the DCS used to build the
Clusterobject with all relevant associated data.
_mpp_cluster_loader():Similar to above but specifically representing MPP group and workers information.
_load_cluster():main method for calling specific
loadermethod to build theClusterobject representing the state and topology of the cluster.
Functional methods that are critical in their timing and must be written with ACID transaction properties in mind:
attempt_to_acquire_leader():method used in the leader race to attempt to acquire the leader lock by creating the leader key in the DCS, if it does not exist.
_update_leader():method to update
leaderkey in DCS. Relies on Compare-And-Set to ensure the Primary lock key is updated. If this fails to update within theretry_timeoutwindow the Primary will be demoted.
Functional method that relies on Compare-And-Create to ensure only one member creates the relevant key:
initialize():method used in the race for cluster initialization which creates the
initializekey in the DCS.
DCS backend getter and setter methods and properties:
take_leader(): method to create a new leader key in the DCS.set_ttl(): method for setting TTL value in DCS.ttl(): property which returns the current TTL.set_retry_timeout(): method for settingretry_timeoutin DCS backend._write_leader_optime(): compatibility method to write WAL LSN to DCS._write_status(): method to write WAL LSN for slots to the DCS._write_failsafe(): method to write cluster topology to the DCS, used by failsafe mechanism.touch_member(): method to update individual member key in the DCS.set_history_value(): method to set thehistorykey in the DCS.
DCS setter methods using Compare-And-Set which although important are less critical if they fail, attempts can be retried or may result in warning log messages:
set_failover_value(): method to create and/or update thefailoverkey in the DCS.set_config_value(): method to create and/or update thefailoverkey in the DCS.set_sync_state_value(): method to set the synchronous statesynckey in the DCS.
DCS data and key removal methods:
delete_sync_state():likewise, a method to remove synchronous state
synckey from the DCS.
delete_cluster():method which will remove cluster information from the DCS. Used only from patronictl.
_delete_leader():method relies on CAS, used by a member that is the current leader, to remove the
leaderkey in the DCS.
cancel_initialization():method to remove the
initializekey for the cluster from the DCS.
If either of the sync_state set or delete methods fail, although not critical, this may result in
Synchronous replication key updated by someone elsemessages being logged.Care should be taken to consult each abstract method for any additional information and requirements such as expected exceptions that should be raised in certain conditions and the object types for arguments and return from methods and properties.
- _CONFIG = 'config'
- _FAILOVER = 'failover'
- _FAILSAFE = 'failsafe'
- _HISTORY = 'history'
- _INITIALIZE = 'initialize'
- _LEADER = 'leader'
- _LEADER_OPTIME = 'optime/leader'
- _MEMBERS = 'members/'
- _OPTIME = 'optime'
- _STATUS = 'status'
- _SYNC = 'sync'
- __get_postgresql_cluster(path: str | None = None) Cluster
Low level method to load a
Clusterobject from DCS.- Parameters:
path – optional client path in DCS backend to load from.
- Returns:
a loaded
Clusterinstance.
- __init__(config: Dict[str, Any], mpp: AbstractMPP) None
Prepare DCS paths, MPP object, initial values for state information and processing dependencies.
- Parameters:
config –
dict, reference to config section of selected DCS. i.e.:zookeeperfor zookeeper,etcdfor etcd, etc…mpp – an object implementing
AbstractMPPinterface.
- _abc_impl = <_abc._abc_data object>
- _build_retain_slots(cluster: Cluster, slots: Dict[str, int] | None) List[str] | None
Handle retention policy of physical replication slots for cluster members.
When the member key is missing we want to keep its replication slot for a while, so that WAL segments will not be already absent when it comes back online. It is being solved by storing the list of replication slots representing members in the
retain_slotsfield of the/statuskey.This method handles retention policy by keeping the list of such replication slots in memory and removing names when they were observed longer than
member_slots_ttlago.- Parameters:
cluster –
Clusterobject with information about the current cluster state.slots – slot names with LSN values that exist on the leader node and consists of slots for cluster members and permanent replication slots.
- Returns:
the list of replication slots to be written to
/statuskey orNone.
- abstractmethod _delete_leader(leader: Leader) bool
Remove leader key from DCS.
This method should remove leader key if current instance is the leader.
- Parameters:
leader –
Leaderobject with information about the leader.- Returns:
Trueif successfully committed to DCS.
- _get_mpp_cluster() Cluster
Load MPP cluster from DCS.
- Returns:
A MPP
Clusterinstance for the coordinator with workers clusters in the Cluster.workers dict.
- abstractmethod _load_cluster(path: str, loader: Callable[[Any], Cluster | Dict[int, Cluster]]) Cluster | Dict[int, Cluster]
Main abstract method that implements the loading of
Clusterinstance.Note
Internally this method should call the loader method that will build
Clusterobject which represents current state and topology of the cluster in DCS. This method supposed to be called only by theget_cluster()method.- Parameters:
path – the path in DCS where to load Cluster(s) from.
loader – one of
_postgresql_cluster_loader()or_mpp_cluster_loader().
- Raise:
DCSErrorin case of communication problems with DCS. If the current node was running as a primary and exception raised, instance would be demoted.
- abstractmethod _mpp_cluster_loader(path: Any) Dict[int, Cluster]
Load and build all PostgreSQL clusters from a single MPP cluster.
- abstractmethod _postgresql_cluster_loader(path: Any) Cluster
Load and build the
Clusterobject from DCS, which represents a single PostgreSQL cluster.
- abstractmethod _update_leader(leader: Leader) bool
Update
leaderkey (or session) ttl.Note
You have to use CAS (Compare And Swap) operation in order to update leader key, for example for etcd
prevValueparameter must be used.If update fails due to DCS not being accessible or because it is not able to process requests (hopefully temporary), the
DCSErrorexception should be raised.- Parameters:
leader – a reference to a current
leaderobject.- Returns:
Trueifleaderkey (or session) has been updated successfully.
- abstractmethod _write_failsafe(value: str) bool
Write current cluster topology to DCS that will be used by failsafe mechanism (if enabled).
- Parameters:
value – failsafe topology serialized in JSON format.
- Returns:
Trueif successfully committed to DCS.
- abstractmethod _write_leader_optime(last_lsn: str) bool
Write current WAL LSN into
/optime/leaderkey in DCS.- Parameters:
last_lsn – absolute WAL LSN in bytes.
- Returns:
Trueif successfully committed to DCS.
- abstractmethod _write_status(value: str) bool
Write current WAL LSN and
confirmed_flush_lsnof permanent slots into the/statuskey in DCS.- Parameters:
value – status serialized in JSON format.
- Returns:
Trueif successfully committed to DCS.
- acquire_leader_lock() bool
Attempt to acquire leader lock.
Note
This method wraps
attempt_to_acquire_leader(): and is used to reset retention time of physical replication slots that representing members of the cluster when current node is to be promoted to the leader.- Returns:
Trueif the leader key has been created successfully.
- abstractmethod attempt_to_acquire_leader() bool
Attempt to acquire leader lock.
Note
This method should create
/leaderkey with the value_name.The key must be created atomically. In case the key already exists it should not be overwritten and
Falsemust be returned.If key creation fails due to DCS not being accessible or because it is not able to process requests (hopefully temporary), the
DCSErrorexception should be raised.- Returns:
Trueif key has been created successfully.
- abstractmethod cancel_initialization() bool
Removes the
initializekey for a cluster.- Returns:
Trueif successfully committed to DCS.
- client_path(path: str) str
Construct the absolute key name from appropriate parts for the DCS type.
- Parameters:
path – The key name within the current Patroni cluster.
- Returns:
absolute key name for the current Patroni cluster.
- abstractmethod delete_cluster() bool
Delete cluster from DCS.
- Returns:
Trueif successfully committed to DCS.
- delete_leader(leader: Leader | None, last_lsn: int | None = None) bool
Update
optime/leaderand voluntarily remove leader key from DCS.This method should remove leader key if current instance is the leader.
- Parameters:
leader –
Leaderobject with information about the leader.last_lsn – latest checkpoint location in bytes.
- Returns:
boolean result of called abstract
_delete_leader().
- abstractmethod delete_sync_state(version: Any | None = None) bool
Delete the synchronous state from DCS.
- Parameters:
version – for conditional deletion of the key/object.
- Returns:
Trueif delete successful.
- get_cluster() Cluster
Retrieve a fresh view of DCS.
Note
Stores copy of time, status and failsafe values for comparison in DCS update decisions. Caching is required to avoid overhead placed upon the REST API.
Returns either a PostgreSQL or MPP implementation of
Clusterdepending on availability.- Returns:
- get_mpp_coordinator() Cluster | None
Load the PostgreSQL cluster for the MPP Coordinator.
Note
This method is only executed on the worker nodes to find the coordinator.
- Returns:
Select
Clusterinstance associated with the MPP Coordinator group ID.
- abstractmethod initialize(create_new: bool = True, sysid: str = '') bool
Race for cluster initialization.
This method should atomically create
initializekey and returnTrue, otherwise it should returnFalse.- Parameters:
create_new –
Falseif the key should already exist (in the case we are setting the system_id).sysid – PostgreSQL cluster system identifier, if specified, is written to the key.
- Returns:
Trueif key has been created successfully.
- is_mpp_coordinator() bool
Clusterinstance has a Coordinator group ID.- Returns:
Trueif the given node is running as the MPP Coordinator.
- property leader_optime_path: str
Get the client path for
optime/leader(legacy key, superseded bystatus).
- manual_failover(leader: str | None, candidate: str | None, scheduled_at: datetime | None = None, version: Any | None = None) bool
Prepare dictionary with given values and set
/failoverkey in DCS.- Parameters:
leader – value to set for
leader.candidate – value to set for
member.scheduled_at – value converted to ISO date format for
scheduled_at.version – for conditional update of the key/object.
- Returns:
Trueif successfully committed to DCS.
- property mpp: AbstractMPP
Get the effective underlying MPP, if any has been configured.
- reload_config(config: Config | Dict[str, Any]) None
Load and set relevant values from configuration.
Sets
loop_wait,ttlandretry_timeoutproperties.- Parameters:
config – Loaded configuration information object or dictionary of key value pairs.
- abstractmethod set_config_value(value: str, version: Any | None = None) bool
Create or update
/configkey in DCS.- Parameters:
value – new value to set in the
configkey.version – for conditional update of the key/object.
- Returns:
Trueif successfully committed to DCS.
- abstractmethod set_failover_value(value: str, version: Any | None = None) bool
Create or update
/failoverkey.- Parameters:
value – value to set.
version – for conditional update of the key/object.
- Returns:
Trueif successfully committed to DCS.
- abstractmethod set_history_value(value: str) bool
Set value for
historyin DCS.- Parameters:
value – new value of
historykey/object.- Returns:
Trueif successfully committed to DCS.
- abstractmethod set_sync_state_value(value: str, version: Any | None = None) Any | bool
Set synchronous state in DCS.
- Parameters:
value – the new value of
/synckey.version – for conditional update of the key/object.
- Returns:
version of the new object or
Falsein case of error.
- static sync_state(leader: str | None, sync_standby: Collection[str] | None, quorum: int | None) Dict[str, Any]
Build
sync_statedictionary.- Parameters:
leader – name of the leader node that manages
/synckey.sync_standby – collection of currently known synchronous standby node names.
quorum – if the node from
sync_standbylist is doing a leader race it should see at leastquorumother nodes from thesync_standby+leaderlist
- Returns:
dictionary that later could be serialized to JSON or saved directly to DCS.
- abstractmethod take_leader() bool
Establish a new leader in DCS.
Note
This method should create leader key with value of
_nameandttlofttl.Since it could be called only on initial cluster bootstrap it could create this key regardless, overwriting the key if necessary.
- Returns:
Trueif successfully committed to DCS.
- abstractmethod touch_member(data: Dict[str, Any]) bool
Update member key in DCS.
Note
This method should create or update key with the name with
/members/+_nameand the value of data in a given DCS.- Parameters:
data – information about an instance (including connection strings).
- Returns:
Trueif successfully committed to DCS.
- update_leader(cluster: Cluster, last_lsn: int | None, slots: Dict[str, int] | None = None, failsafe: Dict[str, str] | None = None) bool
Update
leaderkey (or session) ttl,/status, and/failsafekeys.- Parameters:
cluster –
Clusterobject with information about the current cluster state.last_lsn – absolute WAL LSN in bytes.
slots – dictionary with permanent slots
confirmed_flush_lsn.failsafe – if defined dictionary passed to
write_failsafe().
- Returns:
Trueifleaderkey (or session) has been updated successfully.
- watch(leader_version: Any | None, timeout: float) bool
Sleep if the current node is a leader, otherwise, watch for changes of leader key with a given timeout.
- Parameters:
leader_version – version of a leader key.
timeout – timeout in seconds.
- Returns:
if
Truethis will reschedule the next run of the HA cycle.
- write_failsafe(value: Dict[str, str]) None
Write the
/failsafekey in DCS.- Parameters:
value – dictionary value to set, consisting of the
nameandapi_urlof members.
- write_leader_optime(last_lsn: int) None
Write value for WAL LSN to
optime/leaderkey in DCS.Note
This method abstracts away the required data structure of
write_status(), so it is not needed in the caller. However, theoptime/leaderis only written inwrite_status()when the cluster has members with a Patroni version that is old enough to require it (i.e. the old Patroni version doesn’t understand the new format).- Parameters:
last_lsn – absolute WAL LSN in bytes.
- write_status(value: Dict[str, Any]) None
Write cluster status to DCS if changed.
Note
The DCS key
/statuswas introduced in Patroni version 2.1.0. Previous to this the position of last known leader LSN was stored inoptime/leader. This method has detection for backwards compatibility of members with a version older than this.- Parameters:
value – JSON serializable dictionary with current WAL LSN and
confirmed_flush_lsnof permanent slots.
- write_sync_state(leader: str | None, sync_standby: Collection[str] | None, quorum: int | None, version: Any | None = None) SyncState | None
Write the new synchronous state to DCS.
Calls
sync_state()to build a dictionary and then calls DCS specificset_sync_state_value().- Parameters:
leader – name of the leader node that manages
/synckey.sync_standby – collection of currently known synchronous standby node names.
version – for conditional update of the key/object.
quorum – if the node from
sync_standbylist is doing a leader race it should see at leastquorumother nodes from thesync_standby+leaderlist
- Returns:
the new
SyncStateobject orNone.
- class patroni.dcs.Cluster(*args: Any, **kwargs: Any)
Bases:
ClusterImmutable object (namedtuple) which represents PostgreSQL or MPP cluster.
Note
We are using an old-style attribute declaration here because otherwise it is not possible to override __new__ method. Without it the workers by default gets always the same
dictobject that could be mutated.Consists of the following fields:
- Variables:
initialize – shows whether this cluster has initialization key stored in DC or not.
config – global dynamic configuration, reference to ClusterConfig object.
leader –
Leaderobject which represents current leader of the cluster.status –
Statusobject which represents the /status key.members – list of:class:` Member` objects, all PostgreSQL cluster members including leader
failover – reference to
Failoverobject.sync – reference to
SyncStateobject, last observed synchronous replication state.history – reference to TimelineHistory object.
failsafe – failsafe topology. Node is allowed to become the leader only if its name is found in this list.
workers – dictionary of workers of the MPP cluster, optional. Each key representing the group and the corresponding value is a
Clusterinstance.
- property __permanent_logical_slots: Dict[str, Any]
Dictionary of permanent
logicalreplication slots.
- property __permanent_slots: Dict[str, Dict[str, Any] | Any]
Dictionary of permanent replication slots with their known LSN.
- _get_members_slots(name: str, role: PostgresqlRole, nofailover: bool, can_advance_slots: bool) Dict[str, Dict[str, Any]]
Get physical replication slots configuration for a given member.
There are following situations possible:
If the
nostreamtag is set on the member - we should not have the replication slot for it on the current primary or any other member even ifreplicatefromis set, becausenostreamdisables WAL streaming.PostgreSQL is 11 and newer and configuration allows retention of member replication slots. In this case we want to have replication slots for every member except the case when we have
nofailovertag set.PostgreSQL is older than 11 or configuration doesn’t allow member slots retention. In this case we want:
On primary have replication slots for all members that don’t have
replicatefromtag pointing to the existing member.On replica node have replication slots only for members which
replicatefromtag pointing to us.
Will log an error if:
Conflicting slot names between members are found
- Parameters:
name – name of this node.
role – role of this node,
primary,standby_leader, orreplica.nofailover –
Trueif this node is tagged to not be a failover candidate,Falseotherwise.can_advance_slots –
Trueifpg_replication_slot_advance()function is available,Falseotherwise.
- Returns:
dictionary of physical replication slots that should exist on a given node.
- _get_permanent_slots(postgresql: Postgresql, tags: Tags, role: PostgresqlRole) Dict[str, Any]
Get configured permanent replication slots.
Note
Permanent replication slots are only considered if
use_slotsconfiguration is enabled. A node that is not supposed to become a leader (nofailover) will not have permanent replication slots. Also node with disabled streaming (nostream) and its cascading followers must not have permanent logical slots due to lack of feedback from node to primary, which makes them unsafe to use.In a standby cluster we only support physical replication slots.
The returned dictionary for a non-standby cluster always contains permanent logical replication slots in order to show a warning if they are not supported by PostgreSQL before v11.
- Parameters:
postgresql – reference to
Postgresqlobject.tags – reference to an object implementing
Tagsinterface.role – role of the node. One of
PostgresqlRolevalues.
- Returns:
dictionary of permanent slot names mapped to attributes.
- _has_permanent_logical_slots(postgresql: Postgresql, member: Tags) bool
Check if the given member node has permanent
logicalreplication slots configured.- Parameters:
postgresql – reference to a
Postgresqlobject.member – reference to an object implementing
Tagsinterface for the node that we are checking permanent logical replication slots for.
- Returns:
Trueif any detected replications slots arelogical, otherwiseFalse.
- _merge_permanent_slots(slots: Dict[str, Dict[str, Any]], permanent_slots: Dict[str, Any], name: str, role: PostgresqlRole, can_advance_slots: bool) List[str]
Merge replication slots for members with permanent_slots.
Perform validation of configured permanent slot name, skipping invalid names.
Will update slots in-line based on
typeof slot,physicalorlogical, and name of node. Type is assumed to bephysicalif there are no attributes stored as the slot value.- Parameters:
slots – Slot names with existing attributes if known.
name – name of this node.
role – role of the node. One of
PostgresqlRolevalues.permanent_slots – dictionary containing slot name key and slot information values.
can_advance_slots –
Trueifpg_replication_slot_advance()function is available,Falseotherwise.
- Returns:
List of disabled permanent, logical slot names, if postgresql version < 11.
- get_clone_member(exclude_name: str) Member | Leader | None
Get member or leader object to use as clone source.
- Parameters:
exclude_name – name of a member name to exclude.
- Returns:
a randomly selected candidate member from available running members that are configured to as viable sources for cloning (has tag
clonefromin configuration). If no member is appropriate the current leader is used.
- get_replication_slots(postgresql: Postgresql, member: Tags, *, role: PostgresqlRole | None = None, show_error: bool = False) Dict[str, Dict[str, Any]]
Lookup configured slot names in the DCS, report issues found and merge with permanent slots.
Will log an error if:
Any logical slots are disabled, due to version compatibility, and show_error is
True.
- Parameters:
postgresql – reference to
Postgresqlobject.member – reference to an object implementing
Tagsinterface.role – role of the node, if not set will be taken from postgresql One of
PostgresqlRolevalues.show_error – if
Truereport error if any disabled logical slots or conflicting slot names are found.
- Returns:
final dictionary of slot names, after merging with permanent slots and performing sanity checks.
- get_slot_name_on_primary(name: str, tags: Tags) str | None
Get the name of physical replication slot for this node on the primary.
Note
P <– I <– L
In case of cascading replication we have to check not our physical slot, but slot of the replica that connects us to the primary.
- Parameters:
name – name of the member node to check.
tags – reference to an object implementing
Tagsinterface.
- Returns:
the slot name on the primary that is in use for physical replication on this node.
- has_member(member_name: str) bool
Check if the given member name is present in the cluster.
- Parameters:
member_name – name to look up in the
members.- Returns:
Trueif the member name is found.
- has_permanent_slots(postgresql: Postgresql, member: Tags) bool
Check if our node has permanent replication slots configured.
- Parameters:
postgresql – reference to
Postgresqlobject.member – reference to an object implementing
Tagsinterface for the node that we are checking permanent logical replication slots for.
- Returns:
Trueif there are permanent replication slots configured, otherwiseFalse.
- is_empty()
Validate definition of all attributes of this
Clusterinstance.- Returns:
Trueif all attributes of the currentClusterare unpopulated.
- static is_logical_slot(value: Any) bool
Check whether provided configuration is for permanent logical replication slot.
- Parameters:
value – configuration of the permanent replication slot.
- Returns:
Trueif value is a logical replication slot, otherwiseFalse.
- static is_physical_slot(value: Any) bool
Check whether provided configuration is for permanent physical replication slot.
- Parameters:
value – configuration of the permanent replication slot.
- Returns:
Trueif value is a physical replication slot, otherwiseFalse.
- is_unlocked() bool
Check if the cluster does not have the leader.
- Returns:
Trueif a leader name is not defined.
- maybe_filter_permanent_slots(postgresql: Postgresql, slots: Dict[str, int]) Dict[str, int]
Filter out all non-permanent slots from provided slots dict.
Note
In case if retention of replication slots for members is enabled we will not do any filtering, because we need to publish LSN values for members replication slots, so that other nodes can use them to advance LSN, like they do it for permanent slots.
- Parameters:
postgresql – reference to
Postgresqlobject.slots – slot names with LSN values.
- Returns:
a
dictobject that contains only slots that are known to be permanent.
- property min_version: Tuple[int, ...] | None
Lowest Patroni software version found in known members of the cluster.
- property permanent_physical_slots: Dict[str, Any]
Dictionary of permanent
physicalreplication slots.
- should_enforce_hot_standby_feedback(postgresql: Postgresql, member: Tags) bool
Determine whether
hot_standby_feedbackshould be enabled for the given member.The
hot_standby_feedbackmust be enabled if the current replica haslogicalslots, or it is working as a cascading replica for the other node that haslogicalslots.- Parameters:
postgresql – reference to a
Postgresqlobject.member – reference to an object implementing
Tagsinterface for the node that we are checking permanent logical replication slots for.
- Returns:
Trueif this node or any member replicating from this node has permanent logical slots, otherwiseFalse.
- property slots: Dict[str, int]
{"slot_name": int}.Note
We are trying to be foolproof here and for values that can’t be parsed to
intwill return0.- Type:
State of permanent replication slots on the primary in the format
- property timeline: int
Get the cluster history index from the
history.- Returns:
If the recorded history is empty assume timeline is
1, if it is not defined or the stored history is not formatted as expected0is returned and an error will be logged. Otherwise, the last number stored incremented by 1 is returned.- Example:
No history provided: >>> Cluster(0, 0, 0, Status.empty(), 0, 0, 0, 0, None, {}).timeline 0
Empty history assume timeline is
1: >>> Cluster(0, 0, 0, Status.empty(), 0, 0, 0, TimelineHistory.from_node(1, ‘[]’), None, {}).timeline 1Invalid history format, a string of
a, returns0: >>> Cluster(0, 0, 0, Status.empty(), 0, 0, 0, TimelineHistory.from_node(1, ‘[[“a”]]’), None, {}).timeline 0History as a list of strings: >>> history = TimelineHistory.from_node(1, ‘[[“3”, “2”, “1”]]’) >>> Cluster(0, 0, 0, Status.empty(), 0, 0, 0, history, None, {}).timeline 4
- class patroni.dcs.ClusterConfig(version: int | str, data: Dict[str, Any], modify_version: int | str)
Bases:
NamedTupleImmutable object (namedtuple) which represents cluster configuration.
- Variables:
version – version number for the object.
data – dictionary of configuration information.
modify_version – modified version number.
- _asdict()
Return a new dict which maps field names to their values.
- _field_defaults = {}
- _fields = ('version', 'data', 'modify_version')
- classmethod _make(iterable)
Make a new ClusterConfig object from a sequence or iterable
- _replace(**kwds)
Return a new ClusterConfig object replacing specified fields with new values
- static from_node(version: int | str, value: str, modify_version: int | str | None = None) ClusterConfig
Factory method to parse value as configuration information.
- Parameters:
version – version number for object.
value – raw JSON serialized data, if not parsable replaced with an empty dictionary.
modify_version – optional modify version number, use version if not provided.
- Returns:
constructed
ClusterConfiginstance.- Example:
>>> ClusterConfig.from_node(1, '{') is None False
- class patroni.dcs.Failover(version: int | str, leader: str | None, candidate: str | None, scheduled_at: datetime | None)
Bases:
NamedTupleImmutable object (namedtuple) representing configuration information required for failover/switchover capability.
- Variables:
version – version of the object.
leader – name of the leader. If value isn’t empty we treat it as a switchover from the specified node.
candidate – the name of the member node to be considered as a failover candidate.
scheduled_at – in the case of a switchover the
datetimeobject to perform the scheduled switchover.
- Example:
>>> 'Failover' in str(Failover.from_node(1, '{"leader": "cluster_leader"}')) True
>>> 'Failover' in str(Failover.from_node(1, {"leader": "cluster_leader"})) True
>>> 'Failover' in str(Failover.from_node(1, '{"leader": "cluster_leader", "member": "cluster_candidate"}')) True
>>> Failover.from_node(1, 'null') is None False
>>> n = '''{"leader": "cluster_leader", "member": "cluster_candidate", ... "scheduled_at": "2016-01-14T10:09:57.1394Z"}'''
>>> 'tzinfo=' in str(Failover.from_node(1, n)) True
>>> Failover.from_node(1, None) is None False
>>> Failover.from_node(1, '{}') is None False
>>> 'abc' in Failover.from_node(1, 'abc:def') True
- _asdict()
Return a new dict which maps field names to their values.
- _field_defaults = {}
- _fields = ('version', 'leader', 'candidate', 'scheduled_at')
- classmethod _make(iterable)
Make a new Failover object from a sequence or iterable
- _replace(**kwds)
Return a new Failover object replacing specified fields with new values
- static from_node(version: int | str, value: str | Dict[str, str]) Failover
Factory method to parse value as failover configuration.
- Parameters:
version – version number for the object.
value – JSON serialized data or a dictionary of configuration. Can also be a colon
:delimited list of leader, followed by candidate name (legacy format). Ifscheduled_atkey is defined the value will be parsed bydateutil.parser.parse().
- Returns:
constructed
Failoverinformation object
- class patroni.dcs.Leader(version: int | str, session: int | float | str | None, member: Member)
Bases:
NamedTupleImmutable object (namedtuple) which represents leader key.
Consists of the following fields:
- Variables:
version – modification version of a leader key in a Configuration Store
session – either session id or just ttl in seconds
member – reference to a
Memberobject which represents current leader (seeCluster.members)
- _asdict()
Return a new dict which maps field names to their values.
- _field_defaults = {}
- _fields = ('version', 'session', 'member')
- classmethod _make(iterable)
Make a new Leader object from a sequence or iterable
- _replace(**kwds)
Return a new Leader object replacing specified fields with new values
- property checkpoint_after_promote: bool | None
Determine whether a checkpoint has occurred for this leader after promotion.
- Returns:
Trueif the role ismasterorprimaryandcheckpoint_after_promoteis not set,Falseif not amasterorprimaryor if the checkpoint hasn’t occurred. If the version of Patroni is older than 1.5.6, returnNone.- Example:
>>> Leader(1, '', Member.from_node(1, '', '', '{"version":"z"}')).checkpoint_after_promote
- class patroni.dcs.Member(version: int | str, name: str, session: int | float | str | None, data: Dict[str, Any])
-
Immutable object (namedtuple) which represents single member of PostgreSQL cluster.
Note
We are using an old-style attribute declaration here because otherwise it is not possible to override
__new__method in theRemoteMemberclass.Note
These two keys in data are always written to the DCS, but care is taken to maintain consistency and resilience from data that is read:
conn_url: connection string containing host, user and password which could be used to access this member.api_url: REST API url of patroni instanceConsists of the following fields:
- Variables:
version – modification version of a given member key in a Configuration Store.
name – name of PostgreSQL cluster member.
session – either session id or just ttl in seconds.
data – dictionary containing arbitrary data i.e.
conn_url,api_url,xlog_location,state,role,tags, etc…
- _abc_impl = <_abc._abc_data object>
- conn_kwargs(auth: Any | None = None) Dict[str, Any]
Give keyword arguments used for PostgreSQL connection settings.
- Parameters:
auth – Authentication properties - can be defined as anything supported by the
psycopg2orpsycopgmodules. Converts a key ofusernametouserif supplied.- Returns:
A dictionary containing a merge of default parameter keys
host,portanddbname, with the contents ofdataconn_kwargskey. If those are not defined will parse and reform connection parameters fromconn_url. One of these two attributes needs to have data defined to construct the output dictionary. Finally, auth parameters are merged with the dictionary before returned.
- property conn_url: str | None
The
conn_urlvalue fromdataif defined or constructed fromconn_kwargs.
- static from_node(version: int | str, name: str, session: int | float | str | None, value: str) Member
Factory method for instantiating
Memberfrom a JSON serialised string or object.- Parameters:
version – modification version of a given member key in a Configuration Store.
name – name of PostgreSQL cluster member.
session – either session id or just ttl in seconds.
value – JSON encoded string containing arbitrary data i.e.
conn_url,api_url,xlog_location,state,role,tags, etc. OR a connection URL starting withpostgres://.
- Returns:
an
Memberinstance built with the given arguments.- Example:
>>> Member.from_node(-1, '', '', '{"conn_url": "postgres://foo@bar/postgres"}') is not None True
>>> Member.from_node(-1, '', '', '{') Member(version=-1, name='', session='', data={})
- get_endpoint_url(endpoint: str | None = None) str
Get URL from member
api_urland endpoint.- Parameters:
endpoint – URL path of REST API.
- Returns:
full URL for this REST API.
- class patroni.dcs.RemoteMember(name: str, data: Dict[str, Any])
Bases:
MemberRepresents a remote member (typically a primary) for a standby cluster.
- Variables:
ALLOWED_KEYS – Controls access to relevant key names that could be in stored
data.
- ALLOWED_KEYS: Tuple[str, ...] = ('primary_slot_name', 'create_replica_methods', 'restore_command', 'archive_cleanup_command', 'recovery_min_apply_delay', 'no_replication_slot')
- _abc_impl = <_abc._abc_data object>
- exception patroni.dcs.ReturnFalseException
Bases:
ExceptionException to be caught by the
catch_return_false_exception()decorator.
- class patroni.dcs.Status(last_lsn: int, slots: Dict[str, int] | None, retain_slots: List[str])
Bases:
NamedTupleImmutable object (namedtuple) which represents /status key.
Consists of the following fields:
- Variables:
last_lsn –
intobject containing position of last known leader LSN.slots – state of permanent replication slots on the primary in the format:
{"slot_name": int}.retain_slots – list physical replication slots for members that exist in the cluster.
- _asdict()
Return a new dict which maps field names to their values.
- _field_defaults = {}
- _fields = ('last_lsn', 'slots', 'retain_slots')
- classmethod _make(iterable)
Make a new Status object from a sequence or iterable
- _replace(**kwds)
Return a new Status object replacing specified fields with new values
- static from_node(value: str | Dict[str, Any] | None) Status
Factory method to parse value as
Statusobject.
- class patroni.dcs.SyncState(version: int | str | None, leader: str | None, sync_standby: str | None, quorum: int)
Bases:
NamedTupleImmutable object (namedtuple) which represents last observed synchronous replication state.
- Variables:
version – modification version of a synchronization key in a Configuration Store.
leader – reference to member that was leader.
sync_standby – synchronous standby list (comma delimited) which are last synchronized to leader.
quorum – if the node from
sync_standbylist is doing a leader race it should see at leastquorumother nodes from thesync_standby+leaderlist.
- _asdict()
Return a new dict which maps field names to their values.
- _field_defaults = {}
- _fields = ('version', 'leader', 'sync_standby', 'quorum')
- classmethod _make(iterable)
Make a new SyncState object from a sequence or iterable
- _replace(**kwds)
Return a new SyncState object replacing specified fields with new values
- static _str_to_list(value: str) List[str]
Splits a string by comma and returns list of strings.
- Parameters:
value – a comma separated string.
- Returns:
list of non-empty strings after splitting an input value by comma.
- static empty(version: int | str | None = None) SyncState
Construct an empty
SyncStateinstance.- Parameters:
version – optional version number.
- Returns:
empty synchronisation state object.
- static from_node(version: int | str | None, value: str | Dict[str, Any] | None) SyncState
Factory method to parse value as synchronisation state information.
- Parameters:
version – optional version number for the object.
value – (optionally JSON serialised) synchronisation state information
- Returns:
constructed
SyncStateobject.- Example:
>>> SyncState.from_node(1, None).leader is None True
>>> SyncState.from_node(1, '{}').leader is None True
>>> SyncState.from_node(1, '{').leader is None True
>>> SyncState.from_node(1, '[]').leader is None True
>>> SyncState.from_node(1, '{"leader": "leader"}').leader == "leader" True
>>> SyncState.from_node(1, {"leader": "leader"}).leader == "leader" True
- leader_matches(name: str | None) bool
Compare the given name to stored leader value.
- Returns:
Trueif name is matching theleadervalue.
- matches(name: str | None, check_leader: bool = False) bool
Checks if node is presented in the /sync state.
Since PostgreSQL does case-insensitive checks for synchronous_standby_name we do it also.
- Parameters:
name – name of the node.
check_leader – by default the name is searched for only in members, a value of
Truewill include the leader to list.
- Returns:
Trueif the/synckey notis_empty()and the given name is among those presented in the sync state.- Example:
>>> s = SyncState(1, 'foo', 'bar,zoo', 0)
>>> s.matches('foo') False
>>> s.matches('fOo', True) True
>>> s.matches('Bar') True
>>> s.matches('zoO') True
>>> s.matches('baz') False
>>> s.matches(None) False
>>> SyncState.empty(1).matches('foo') False
- property members: List[str]
sync_standbyandleaderas list or an empty list if object consideredempty.
- property voters: List[str]
sync_standbyas list or an empty list if undefined or object consideredempty.
- class patroni.dcs.TimelineHistory(version: int | str, value: Any, lines: List[Tuple[int, int, str] | Tuple[int, int, str, str] | Tuple[int, int, str, str, str]])
Bases:
NamedTupleObject representing timeline history file.
Note
The content held in lines deserialized from value are lines parsed from PostgreSQL timeline history files, consisting of the timeline number, the LSN where the timeline split and any other string held in the file. The files are parsed by
parse_history().- Variables:
version – version number of the file.
value – raw JSON serialised data consisting of parsed lines from history files.
lines –
ListofTupleparsed lines from history files.
- _asdict()
Return a new dict which maps field names to their values.
- _field_defaults = {}
- _fields = ('version', 'value', 'lines')
- classmethod _make(iterable)
Make a new TimelineHistory object from a sequence or iterable
- _replace(**kwds)
Return a new TimelineHistory object replacing specified fields with new values
- static from_node(version: int | str, value: str) TimelineHistory
Parse the given JSON serialized string as a list of timeline history lines.
- Parameters:
version – version number
value – JSON serialized string, consisting of parsed lines of PostgreSQL timeline history files, see
TimelineHistory.
- Returns:
composed timeline history object using parsed lines.
- Example:
If the passed value argument is not parsed an empty list of lines is returned:
>>> h = TimelineHistory.from_node(1, 2)
>>> h.lines []
- patroni.dcs.catch_return_false_exception(func: Callable[[...], Any]) Any
Decorator function for catching functions raising
ReturnFalseException.- Parameters:
func – function to be wrapped.
- Returns:
wrapped function.
- patroni.dcs.dcs_modules() List[str]
Get names of DCS modules, depending on execution environment.
- Returns:
list of known module names with absolute python module path namespace, e.g.
patroni.dcs.etcd.
- patroni.dcs.get_dcs(config: Config | Dict[str, Any]) AbstractDCS
Attempt to load a Distributed Configuration Store from known available implementations.
Note
Using the list of available DCS classes returned by
iter_classes()attempt to dynamically instantiate the class that implements a DCS using the abstract classAbstractDCS.Basic top-level configuration parameters retrieved from config are propagated to the DCS specific config before being passed to the module DCS class.
If no module is found to satisfy configuration then report and log an error. This will cause Patroni to exit.
:raises
PatroniFatalException: if a load of all available DCS modules have been tried and none succeeded.- Parameters:
config – object or dictionary with Patroni configuration. This is normally a representation of the main Patroni
- Returns:
The first successfully loaded DCS module which is an implementation of
AbstractDCS.
- patroni.dcs.iter_dcs_classes(config: Config | Dict[str, Any] | None = None) Iterator[Tuple[str, Type[AbstractDCS]]]
Attempt to import DCS modules that are present in the given configuration.
Note
If a module successfully imports we can assume that all its requirements are installed.
- Parameters:
config – configuration information with possible DCS names as keys. If given, only attempt to import DCS modules defined in the configuration. Else, if
None, attempt to import any supported DCS module.- Returns:
an iterator of tuples, each containing the module
nameand the imported DCS class object.
- patroni.dcs.parse_connection_string(value: str) Tuple[str, str | None]
Split and rejoin a URL string into a connection URL and an API URL.
Note
Original Governor stores connection strings for each cluster members in a following format:
postgres://{username}:{password}@{connect_address}/postgres
Since each of our patroni instances provides their own REST API endpoint, it’s good to store this information in DCS along with PostgreSQL connection string. In order to not introduce new keys and be compatible with original Governor we decided to extend original connection string in a following way:
postgres://{username}:{password}@{connect_address}/postgres?application_name={api_url}
This way original Governor could use such connection string as it is, because of feature of
libpqlibrary.- Parameters:
value – The URL string to split.
- Returns:
the connection string stored in DCS split into two parts,
conn_urlandapi_url.
- patroni.dcs.slot_name_from_member_name(member_name: str) str
Translate member name to valid PostgreSQL slot name.
Note
PostgreSQL’s replication slot names must be valid PostgreSQL names. This function maps the wider space of member names to valid PostgreSQL names. Names have their case lowered, dashes and periods common in hostnames are replaced with underscores, other characters are encoded as their unicode codepoint. Name is truncated to 64 characters. Multiple different member names may map to a single slot name.
- Parameters:
member_name – The string to convert to a slot name.
- Returns:
The string converted using the rules described above.