Kubernetes operators were introduced as an implementation of the Infrastructure as software concept. Using them you can abstract the deployment of applications and services in a Kubernetes cluster. This is the fifth of a series of articles explaining how operators work and how they can be implemented in different programming languages.
In past articles we learned about Kubernetes operators: what they are, how they work, and how they can be implemented and tested in Go. Now we are going to implement the “immortal containers” operator, already introduced in this series, using Python. During the Go implementation we used Operator SDK extensively to generate code, create artifacts, build images, run tests, and deploy operator. Such tools are not available for Python. This is Python’s one downside: there are almost no mature tools to help us during the development process. But don’t be afraid, the fact that Python is a dynamically-typed language greatly reduces the amount of code needed. This article assumes you have Python (version at least 3.6) installed in your computer. You will also need access to a Kubernetes cluster to try your operator; you can use minikube to create a development cluster. The complete source code for the operator described through this article can be found at https://github.com/flugel-it/k8s-python-operator
(If you have read previous articles, you may choose to skip this section entirely.) The purpose of the immortal containers operator is to enable users to define containers that should run forever — that is, whenever such containers terminate for any reason, they will be restarted. Keep in mind that that the operator demonstrated in this article is just a toy operator which serves only to illustrate the steps involved in the implementation of an operator. The functionality it provides can be achieved with already existing Kubernetes features, such as deployments. This operator defines a new object kind named ImmortalContainer. Users create objects of this kind to specify containers that must run forever. In each object the user specifies the image he wants to run. For each ImmortalContainer object the operator’s controller creates a pod to run the container and then recreates the pod whenever it terminates or is deleted. In the same object the operator also exposes the name of the created pod and the number of times it has been created. Each ImmortalContainer object has the following structure:
ImmortalContainer - Spec - Image - Status - CurrentPod - StartTimes
Let’s say the operator has been installed and the user wants to create an immortal container to run the image nginx:latest. To do so, he can use kubectl to create an ImmortalContainer object.
apiVersion: immortalcontainer.flugel.it/v1alpha1
kind: ImmortalContainer
metadata:
name: example-immortalcontainer
spec:
image: nginx:latest
$ kubectl apply -f example.yaml
The controller will detect the new immortal container and respond by creating a pod to run the image nginx:latest. The user can then view the running pod using the following command:
$ kubectl get pods NAME READY STATUS RESTARTS AGE example-immortalcontainer-immortalpod 1/1 Running 0 25m
If someone deletes the pod, it will be recreated.
$ kubectl delete pods example-immortalcontainer-immortalpod pod "example-immortalcontainer-immortalpod" deleted $ kubectl get pods NAME READY STATUS RESTARTS AGE example-immortalcontainer-immortalpod 0/1 ContainerCreating 0 3s
Finally, the user can edit the ImmortalContainer object he has created in order to to see the CurrentPod and StartTimes fields. $ kubectl edit immortalcontainer example-immortalcontainer
apiVersion: immortalcontainer.flugel.it/v1alpha1
kind: ImmortalContainer
metadata:
…
spec:
image: nginx:latest
status:
currentPod: example-immortalcontainer-immortalpod
startTimes: 2
These are the steps we followed to implement the immortal containers operator:
This project uses pip to manage its dependencies. It also uses a Python virtual environment to avoid breaking any system-wide packages. Inside the project root directory there are two main subdirectories:
We used the following commands to create and activate the project virtual environment:
$ python3 -m venv venv $ . ./venv/bin/activate
Then we installed the project sole dependency, the official Kubernetes API client, using pip:
$ pip install kubernetes
It’s worth noting that the Kubernetes API client is useful not only for operators but for any program that needs to interact with Kubernetes clusters. Finally we created the requirements.txt file saving our dependencies
$ pip freeze > requirements.txt
The custom resources are used to expose the desired and actual states. They define endpoints that provide access to collections of objects. The operator we implemented exposes a collection of objects belonging to the ImmortalContainer object kind. Users create objects of this kind to specify containers that need to run forever. As we said previously, each ImmortalContainer object has the following structure:
ImmortalContainer - Spec - Image - Status - CurrentPod - StartTimes
We used a Custom Resource Definition to create the operator’s custom resource. When we implemented the operator using Go, we used Operator SDK to generate the CRD from source code. Unfortunately, there is no such tool for Python. So, we had to write the CRD yaml file manually. This file defines the new object kind, ImmortalContainer, with its fields and validations. config/crds/exampleoperator_v1alpha1_immortalcontainer.yaml:
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: immortalcontainers.immortalcontainer.flugel.it
spec:
group: immortalcontainer.flugel.it
names:
kind: ImmortalContainer
listKind: ImmortalContainerList
plural: immortalcontainers
singular: immortalcontainer
scope: Namespaced
subresources:
status: {}
validation:
openAPIV3Schema:
properties:
apiVersion:
type: string
kind:
type: string
metadata:
type: object
spec:
properties:
image:
minLength: 1
type: string
required:
- image
type: object
status:
properties:
currentPod:
type: string
startTimes:
format: int64
type: integer
type: object
version: v1alpha1
Note that we’ve indicated that our API group is immortalcontainer.flugel.it, the API version is v1alpha1, and the name of the new object kind is ImmortalContainer.
This section describes how we implemented the operator’s controller. As we said in previous articles, the mission of the controller is to keep the desired and actual states synchronized. To do this, the controller watches for changes in ImmortalContainer objects (desired state — containers to run) and Pods (actual state — containers running), and executes the actions necessary to reconcile both states. The following diagram illustrates the controller’s main components.
In order to watch events and reconcile states concurrently, the controller uses threads. Each event watcher runs in its own thread. For each received event the event watcher adds to the work queue objects whose desired or actual state might have changed. In another thread, the controller takes elements from the work queue and executes the reconcile loop for each of them.
We developed a little Python module, threadedwatch, to make it easier to watch for events concurrently. This module exposes a class named ThreadedWatcher. The class constructor receives the Kubernetes API function be watched. For each event in the watched function, every handler registered using the add_handler method will be called. For example, the following code uses a TheadedWatcher object to watch events about pods and prints them:
v1 = kubernetes.client.CoreV1Api()
watcher = ThreadedWatcher(v1.list_pod_for_all_namespaces)
def on_event(event):
print(event)
watcher.add_handler(on_event)
watcher.start()
watcher.join()
The immortal containers operator’s controller uses two ThreadedWatcher objects, one for our custom resource and one for pods.
When an event about an ImmortalContainer object is received, the name of the object is added to the work queue. In the case of pod events, its owner name is added to the work queue only if it is an ImmortalContainer object.
So, as in the Go implementation, for every received event, the name of the ImmortalContainer that might be affected is enqueued.
The reconcile loop runs in its own thread. It works dequeuing names of ImmortalContainer objects from the work queue and processing them. This work queue contains names of objects whose states might need reconciliation.
For each dequeued name, the ImmortalContainer object is fetched using Kubernetes API. Then, the controller compares the desired and actual states of the object and executes the actions to make them match. Finally, it updates the status of the object if necessary.
This is how the implemented reconcile loop works:
The next block is an excerpt from the controller’s source code. It has extra comments to make it easier to understand. The full source code can be found here.
class Controller(threading.Thread):
…
def _reconcile_state(self, object_key):
ns, name = object_key.split("/")
# ---------------------------------------------------
# 1- Fetch the ImmortalContainer object
try:
immortalcontainer = self.customsapi.get_namespaced_custom_object(
self.custom_group, self.custom_version, ns, self.custom_plural, name)
except ApiException as e:
if e.status == 404:
logger.info(
"Element {:s} in workqueue no longer exist".format(object_key))
return
raise e
# ---------------------------------------------------
# ---------------------------------------------------
# 2- Create pod definition
# ---------------------------------------------------
# ---------------------------------------------------
# 3- Check if a pod matching the definition exists
pod_definition = self._new_pod(immortalcontainer)
pod = None
try:
pod = self.corev1api.read_namespaced_pod(
pod_definition.metadata.name, ns)
except ApiException as e:
if e.status != 404:
logger.info("Error retrieving pod {:s} for immortalcontainer {:s}".format(
pod_definition.metadata.name, object_key))
raise e
# ---------------------------------------------------
# ---------------------------------------------------
# 4- If the pod is not found
if pod is None:
# 4.1- Create the pod
pod = self.corev1api.create_namespaced_pod(ns, pod_definition)
# 4.2- Update ImmortalContainer object status
self._update_status(immortalcontainer, pod)
# 4.2- Update ImmortalContainer object status
def _update_status(self, immortalcontainer, pod):
"""Updates an ImmortalContainer status"""
new_status = self._calculate_status(immortalcontainer, pod)
try:
self.customsapi.patch_namespaced_custom_object_status(
self.custom_group, self.custom_version,
immortalcontainer['metadata']['namespace'],
self.custom_plural, immortalcontainer['metadata']['name'],
new_status
)
except Exception as e:
logger.error("Error updating status for ImmortalContainer {:s}/{:s}".format(
immortalcontainer['metadata']['namespace'], immortalcontainer['metadata']['name']))
def _calculate_status(self, immortalcontainer, pod):
"""Calculates what the status of an ImmortalContainer should be """
new_status = copy.deepcopy(immortalcontainer)
if 'status' in immortalcontainer and 'startTimes' in immortalcontainer['status']:
startTimes = immortalcontainer['status']['startTimes']+1
else:
startTimes = 1
new_status['status'] = dict(
currentPod=pod.metadata.name,
startTimes=startTimes
)
return new_status
# 2- Create pod definition
def _new_pod(self, immortalcontainer):
"""Returns the pod definition to create the pod for an ImmortalContainer"""
labels = dict(controller=immortalcontainer['metadata']['name'])
return models.V1Pod(
metadata=models.V1ObjectMeta(
name=immortalcontainer['metadata']['name']+"-immortalpod",
labels=labels,
namespace=immortalcontainer['metadata']['namespace'],
owner_references=[models.V1OwnerReference(
api_version=self.custom_group+"/"+self.custom_version,
controller=True,
kind=self.custom_kind,
name=immortalcontainer['metadata']['name'],
uid=immortalcontainer['metadata']['uid']
)]),
spec=models.V1PodSpec(
containers=[
models.V1Container(
name="acontainer",
image=immortalcontainer['spec']['image']
)
]
)
)
When an operator is installed on a cluster, the controller of the operator runs in a pod inside the cluster. For testing and debugging purposes, however, it may be useful to execute the controller outside the cluster. In both cases, the controller communicates with the cluster using Kubernetes API.
Before continuing, be sure you have a cluster available for use and your credentials configured. You can run `kubectl get nodes` to verify that you can reach the cluster. If you don’t have any cluster, you can use minikube or microk8s to create a local development cluster.
Running outside the cluster means that, while all the resources — such as ImmortalContainer objects and pods — live inside the cluster, the controller is executed externally, for example in the developer’s computer. The following diagram illustrates such a situation:
Assuming that the cluster is running and that your credentials are stored in ~/.kube/config, we are ready to try the operator.
Check the cluster availability running
$ kubectl get nodes
The following two commands install the custom resource in the cluster and run the controller locally, in your computer:
$ kubectl apply -f config/crds $ python3 src/main.py --kubeconfig ~/.kube/config
After running these commands, you should see the logs output.
INFO:controller:Controller starting
To try the operator we are going to create an ImmortalContainer object to run the nginx:latest image. To do this we need to edit the file config/example-use.yaml to make it look like this:
apiVersion: immortalcontainer.flugel.it/v1alpha1
kind: ImmortalContainer
metadata:
name: example-immortal-container
spec:
image: nginx:latest
We then use kubectl to create the ImmortalContainer object in the cluster.
$ kubectl apply -f config/example-use.yaml
The controller will detect the new immortal container and subsequently create a pod to run its image. Let’s try it.
$ kubectl get pods NAME READY STATUS RESTARTS AGE example-immortal-container-immortalpod 1/1 Running 0 2m
Finally, let’s verify that the pod is recreated if we delete it.
$ kubectl delete pods example-immortal-container-immortalpod pod "example-immortal-container-immortalpod" deleted $ kubectl get pods NAME READY STATUS RESTARTS AGE example-immortal-container-immortalpod 0/1 ContainerCreating 0 3s
You can edit the ImmortalContainer object to see its status fields, CurrentPod and StartTimes.
$ kubectl edit immortalcontainer example-immortalcontainer
apiVersion: immortalcontainer.flugel.it/v1alpha1
kind: ImmortalContainer
metadata:
…
spec:
image: nginx:latest
status:
currentPod: example-immortal-container-immortalpod
startTimes: 2
As you can see, the operator works as expected.
In the previous section we’ve seen that the operator functions as expected. Now we are going to build all the artifacts needed to deploy it to run inside a cluster. In this setup, the operator’s controller runs in a pod.
The controller still uses Kubernetes API to watch events and to manage objects. But, since it is no longer running in the user/developer computer (and therefore has no access to the user’s credentials), it requires custom authorization rules to access Kubernetes API .
These authorization rules can be implemented using a custom role assigned to a service account. The operator’s controller runs using this service account. The yaml files for this setup can be found in config/rbac. You can read more about authorization in Kubernetes here.
To run the controller in a pod, we need to build 1) an image containing the code of the controller and 2) a pod or deployment definition to instantiate the pod. We created a Dockerfile to build the image. Using multi-stage we were able to reduce the image size.
# Dockerfile
FROM python:3-alpine3.9 as base
FROM base as builder
RUN mkdir /install
WORKDIR /install
COPY requirements.txt /requirements.txt
RUN apk add --no-cache --virtual .build-deps gcc musl-dev libffi-dev openssl-dev
RUN pip install --install-option="--prefix=/install" -r /requirements.txt
FROM base
COPY --from=builder /install /usr/local
COPY src /exampleoperatorpy
WORKDIR /exampleoperatorpy
CMD ["python", "main.py"]
For this article, we’ve decided to name the image flugelit/immortalcontainer-operator-py:dev. W pushed it to a public registry (Docker Hub) in order for our Kubernetes cluster to fetch it.
Note: You can use any other public or private registry
$ docker build build flugelit/immortalcontainer-operator-py:dev $ docker push flugelit/immortalcontainer-operator-py:dev
We’ve pushed the image here https://hub.docker.com/r/flugelit/immortalcontainer-operator-py
To deploy the operator to the cluster we use the following steps:
We automated these steps in a Makefile, so if you clone the code from the repository, you can deploy the operator by running:
$ make deploy
Using the following commands you can remove the operator from the cluster.
$ make undeploy
Be careful: pods that were created for ImmortalContainers may still be running. If so, you can delete them manually. They will not be restarted since the operator is not running.
In this article we’ve seen all the steps necessary to implement and deploy a Kubernetes operator using Python.
Note that, this time, we did not use any tool to help us with the project scaffolding and code generation. Still, we followed the same steps described in our earlier article, in which we implemented the operator in Go using Operator SDK.
We would like to highlight one weak point regarding the implementation of operators in Python. There are no good tools or libraries to help in the development of automated tests. So, if you want to write tests for an operator using Python, you are more or less on your own.
In the next article we are going to review Metacontroller, a Kubernetes add-on that greatly simplifies the development of operators. Metacontroller takes care of all the work related to handling events and API calls. The developer has only to provide a function that maps the desired and actual states to a new desired state.
2018, Cryptoland Theme by Artureanec - Ninetheme