Building Custom Kubernetes Operators Part 5: Building Operators in Python

  • July 6, 2019

Kubernetes operators were introduced as an implementation of the Infrastructure as software concept. Using them you can abstract the deployment of applications and services in a Kubernetes cluster. This is the fifth of a series of articles explaining how operators work and how they can be implemented in different programming languages.  

Introduction

In past articles we learned about Kubernetes operators: what they are, how they work, and how they can be implemented and tested in Go. Now we are going to implement the “immortal containers” operator, already introduced in this series, using Python   During the Go implementation we used Operator SDK extensively to generate code, create artifacts, build images, run tests, and deploy operator. Such tools are not available for Python. This is Python’s one downside: there are almost no mature tools to help us during the development process. But don’t be afraid, the fact that Python is a dynamically-typed language greatly reduces the amount of code needed.   This article assumes you have Python (version at least 3.6) installed in your computer. You will also need access to a Kubernetes cluster to try your operator; you can use minikube to create a development cluster.   The complete source code for the operator described through this article can be found at https://github.com/flugel-it/k8s-python-operator  

Revisiting the immortal containers operator

(If you have read previous articles, you may choose to skip this section entirely.)   The purpose of the immortal containers operator is to enable users to define containers that should run forever — that is, whenever such containers terminate for any reason, they will be restarted.   Keep in mind that that the operator demonstrated in this article is just a toy operator which serves only to illustrate the steps involved in the implementation of an operator. The functionality it provides can be achieved with already existing Kubernetes features, such as deployments.   This operator defines a new object kind named ImmortalContainer. Users create objects of this kind to specify containers that must run forever. In each object the user specifies the image he wants to run.   For each ImmortalContainer object the operator’s controller creates a pod to run the container and then recreates the pod whenever it terminates or is deleted. In the same object the operator also exposes the name of the created pod and the number of times it has been created.   Each ImmortalContainer object has the following structure:

 

ImmortalContainer
    - Spec
        - Image
    - Status
        - CurrentPod
        - StartTimes

 

Let’s say the operator has been installed and the user wants to create an immortal container to run the image nginx:latest. To do so, he can use kubectl to create an ImmortalContainer object.


apiVersion: immortalcontainer.flugel.it/v1alpha1
kind: ImmortalContainer
metadata:
  name: example-immortalcontainer
spec:
  image: nginx:latest

 

 

$ kubectl apply -f example.yaml

 

 

The controller will detect the new immortal container and respond by creating a pod to run the image nginx:latest. The user can then view the running pod using the following command:

 

$ kubectl get pods
NAME                       READY      STATUS    RESTARTS    AGE
example-immortalcontainer-immortalpod   1/1     Running 0   25m

 

 

If someone deletes the pod, it will be recreated.

 

$ kubectl delete pods example-immortalcontainer-immortalpod
pod "example-immortalcontainer-immortalpod" deleted
 
$ kubectl get pods                                         
NAME                          READY   STATUS           RESTARTS   AGE
example-immortalcontainer-immortalpod   0/1   ContainerCreating 0  3s

 

 

Finally, the user can edit the ImmortalContainer object he has created in order to to see the CurrentPod and StartTimes fields.   $ kubectl edit immortalcontainer example-immortalcontainer


apiVersion: immortalcontainer.flugel.it/v1alpha1
kind: ImmortalContainer
metadata:
…
spec:
 image: nginx:latest
status:
 currentPod: example-immortalcontainer-immortalpod
 startTimes: 2

 

Implementation process

These are the steps we followed to implement the immortal containers operator:  

  1. Project initialization: define the structure of our code and install required dependencies using pip.
  2. Custom resources definitions: create CRD yaml files that create new object kinds and resources. For our operator we created a new object kind, ImmortalContainer, and defined the resource to give users access to those objects.
  3. Custom controller implementation: the controller reacts to relevant events and transforms the actual state into the desired one. For our operator, the controller watches for ImmortalContainer and Pods events.
  4. Building: Installation of an operator in a cluster requires some artifacts, such as  a controller image and a deployment definition to run it.

Project initialization

This project uses pip to manage its dependencies. It also uses a Python virtual environment to avoid breaking any system-wide packages.   Inside the project root directory there are two main subdirectories:

  • config: stores all the yaml files needed to deploy the operator to a cluster.
  • src: contains the operator’s controller source code

  We used the following commands to create and activate the project virtual environment:

 

$ python3 -m venv venv
$ . ./venv/bin/activate

 

Then we installed the project sole dependency, the official Kubernetes API client, using pip: 

$ pip install kubernetes

 

It’s worth noting that the Kubernetes API client is useful not only for operators but for any program that needs to interact with Kubernetes clusters.   Finally we created the requirements.txt file saving our dependencies

 

$ pip freeze > requirements.txt

 

Defining the custom resource

The custom resources are used to expose the desired and actual states. They define endpoints that provide access to collections of objects.   The operator we implemented exposes a collection of objects belonging to the ImmortalContainer object kind. Users create objects of this kind to specify containers that need to run forever.  As we said previously, each ImmortalContainer object has the following structure: 

ImmortalContainer
    - Spec
        - Image
    - Status
        - CurrentPod
        - StartTimes

We used a Custom Resource Definition to create the operator’s custom resource. When we implemented the operator using Go, we used Operator SDK to generate the CRD from source code. Unfortunately, there is no such tool for Python.   So, we had to write the CRD yaml file manually. This file defines the new object kind, ImmortalContainer, with its fields and validations. config/crds/exampleoperator_v1alpha1_immortalcontainer.yaml:


apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
 name: immortalcontainers.immortalcontainer.flugel.it
spec:
 group: immortalcontainer.flugel.it
 names:
   kind: ImmortalContainer
   listKind: ImmortalContainerList
   plural: immortalcontainers
   singular: immortalcontainer
 scope: Namespaced
 subresources:
   status: {}
 validation:
   openAPIV3Schema:
     properties:
       apiVersion:
         type: string
       kind:
         type: string
       metadata:
         type: object
       spec:
         properties:
           image:
             minLength: 1
             type: string
         required:
         - image
         type: object
       status:
         properties:
           currentPod:
             type: string
           startTimes:
             format: int64
             type: integer
         type: object
 version: v1alpha1

  Note that we’ve indicated that our API group is immortalcontainer.flugel.it, the API version is v1alpha1, and the name of the new object kind is ImmortalContainer.

Custom controller implementation

This section describes how we implemented the operator’s controller. As we said in previous articles, the mission of the controller is to keep the desired and actual states synchronized. To do this, the controller watches for changes in ImmortalContainer objects (desired state — containers to run) and Pods (actual state — containers running), and executes the actions necessary to reconcile both states.   The following diagram illustrates the controller’s main components.

 

In order to watch events and reconcile states concurrently, the controller uses threads. Each event watcher runs in its own thread. For each received event the event watcher adds to the work queue objects whose desired or actual state might have changed.   In another thread, the controller takes elements from the work queue and executes the reconcile loop for each of them.

Watching for events

We developed a little Python module, threadedwatch, to make it easier to watch for events concurrently. This module exposes a class named ThreadedWatcher.   The class constructor receives the Kubernetes API function be watched. For each event in the watched function, every handler registered using the add_handler method will be called. For example, the following code uses a TheadedWatcher object to watch events about pods and prints them:


v1 = kubernetes.client.CoreV1Api()
watcher = ThreadedWatcher(v1.list_pod_for_all_namespaces)
def on_event(event):
   print(event)
watcher.add_handler(on_event)
watcher.start()
watcher.join()

The immortal containers operator’s controller uses two ThreadedWatcher objects, one for our custom resource and one for pods.

 

When an event about an ImmortalContainer object is received, the name of the object is added to the work queue. In the case of pod events, its owner name is added to the work queue only if it is an ImmortalContainer object.


So, as in the Go implementation, for every received event, the name of the ImmortalContainer that might be affected is enqueued.

States Reconciling

The reconcile loop runs in its own thread. It works dequeuing names of ImmortalContainer objects from the work queue and processing them. This work queue contains names of objects whose states might need reconciliation.

 

For each dequeued name, the ImmortalContainer object is fetched using Kubernetes API. Then, the controller compares the desired and actual states of the object and executes the actions to make them match. Finally, it updates the status of the object if necessary.

 

This is how the implemented reconcile loop works:

  1. Fetch the ImmortalContainer object.
  2. Create the corresponding pod definition (just the definition, not the pod).
  3. Check if a pod matching the definition already exists.
  4. If the pod does not exist
    1. Create the pod
    2. Update the status fields of the ImmortalContainer object.

 

The next block is an excerpt from the controller’s source code. It has extra comments to make it easier to understand. The full source code can be found here.


class Controller(threading.Thread):
…
 
   def _reconcile_state(self, object_key):
       ns, name = object_key.split("/")
 
       # ---------------------------------------------------
       # 1- Fetch the ImmortalContainer object
       try:
           immortalcontainer = self.customsapi.get_namespaced_custom_object(
               self.custom_group, self.custom_version, ns, self.custom_plural, name)
       except ApiException as e:
           if e.status == 404:
               logger.info(
                   "Element {:s} in workqueue no longer exist".format(object_key))
               return
           raise e
       # ---------------------------------------------------
 
       # ---------------------------------------------------
       # 2- Create pod definition
       # ---------------------------------------------------
 
       # ---------------------------------------------------
       # 3- Check if a pod matching the definition exists
       pod_definition = self._new_pod(immortalcontainer)
       pod = None
       try:
           pod = self.corev1api.read_namespaced_pod(
               pod_definition.metadata.name, ns)
       except ApiException as e:
           if e.status != 404:
               logger.info("Error retrieving pod {:s} for immortalcontainer {:s}".format(
                   pod_definition.metadata.name, object_key))
               raise e
       # ---------------------------------------------------
 
       # ---------------------------------------------------
       # 4- If the pod is not found
       if pod is None:
     # 4.1- Create the pod
           pod = self.corev1api.create_namespaced_pod(ns, pod_definition)
           # 4.2- Update ImmortalContainer object status
           self._update_status(immortalcontainer, pod)
 
   # 4.2- Update ImmortalContainer object status
   def _update_status(self, immortalcontainer, pod):
       """Updates an ImmortalContainer status"""
       new_status = self._calculate_status(immortalcontainer, pod)
       try:
           self.customsapi.patch_namespaced_custom_object_status(
               self.custom_group, self.custom_version,
               immortalcontainer['metadata']['namespace'],
               self.custom_plural, immortalcontainer['metadata']['name'],
               new_status
           )
       except Exception as e:
           logger.error("Error updating status for ImmortalContainer {:s}/{:s}".format(
               immortalcontainer['metadata']['namespace'], immortalcontainer['metadata']['name']))
 
   def _calculate_status(self, immortalcontainer, pod):
       """Calculates what the status of an ImmortalContainer should be """
       new_status = copy.deepcopy(immortalcontainer)
       if 'status' in immortalcontainer and 'startTimes' in immortalcontainer['status']:
           startTimes = immortalcontainer['status']['startTimes']+1
       else:
           startTimes = 1
       new_status['status'] = dict(
           currentPod=pod.metadata.name,
           startTimes=startTimes
       )
       return new_status
 
   # 2- Create pod definition
   def _new_pod(self, immortalcontainer):
       """Returns the pod definition to create the pod for an ImmortalContainer"""
       labels = dict(controller=immortalcontainer['metadata']['name'])
       return models.V1Pod(
           metadata=models.V1ObjectMeta(
               name=immortalcontainer['metadata']['name']+"-immortalpod",
               labels=labels,
               namespace=immortalcontainer['metadata']['namespace'],
               owner_references=[models.V1OwnerReference(
                   api_version=self.custom_group+"/"+self.custom_version,
                   controller=True,
                   kind=self.custom_kind,
                   name=immortalcontainer['metadata']['name'],
                   uid=immortalcontainer['metadata']['uid']
               )]),
           spec=models.V1PodSpec(
               containers=[
                   models.V1Container(
                       name="acontainer",
                       image=immortalcontainer['spec']['image']
                   )
               ]
           )
       )

Running the operator

When an operator is installed on a cluster, the controller of the operator runs in a pod inside the cluster. For testing and debugging purposes, however, it may be useful to execute the controller outside the cluster. In both cases, the controller communicates with the cluster using Kubernetes API.

 

Before continuing, be sure you have a cluster available for use and your credentials configured. You can run `kubectl get nodes` to verify that you can reach the cluster. If you don’t have any cluster, you can use minikube or microk8s to create a local development cluster.

Running outside the cluster

Running outside the cluster means that, while all the resources — such as ImmortalContainer objects and pods — live inside the cluster, the controller is executed externally, for example in the developer’s computer. The following diagram illustrates such a situation:

 

Assuming that the cluster is running and that your credentials are stored in ~/.kube/config, we are ready to try the operator.

Check the cluster availability running

$ kubectl get nodes

 

The following two commands install the custom resource in the cluster and run the controller locally, in your computer:

$ kubectl apply -f config/crds
$ python3 src/main.py --kubeconfig ~/.kube/config

After running these commands, you should see the logs output.

 

INFO:controller:Controller starting

 

 

Creating an ImmortalContainer

To try the operator we are going to create an ImmortalContainer object to run the nginx:latest image. To do this we need to edit the file config/example-use.yaml to make it look like this:


apiVersion: immortalcontainer.flugel.it/v1alpha1
kind: ImmortalContainer
metadata:
 name: example-immortal-container
spec:
 image: nginx:latest

We then use kubectl to create the ImmortalContainer object in the cluster.

$ kubectl apply -f config/example-use.yaml

 

The controller will detect the new immortal container and subsequently create a pod to run its image. Let’s try it.

 

$ kubectl get pods
NAME                           READY   STATUS    RESTARTS       AGE
example-immortal-container-immortalpod   1/1     Running  0     2m

 

Finally, let’s verify that the pod is recreated if we delete it.

 

$ kubectl delete pods example-immortal-container-immortalpod
pod "example-immortal-container-immortalpod" deleted
 
$ kubectl get pods                                         
NAME                           READY   STATUS            RESTARTS   AGE
example-immortal-container-immortalpod   0/1     ContainerCreating  0  3s

 

 

You can edit the ImmortalContainer object to see its status fields, CurrentPod and StartTimes.

 

$ kubectl edit immortalcontainer example-immortalcontainer

apiVersion: immortalcontainer.flugel.it/v1alpha1
kind: ImmortalContainer
metadata:
…
spec:
 image: nginx:latest
status:
 currentPod: example-immortal-container-immortalpod
 startTimes: 2

 

As you can see, the operator works as expected.

 

Deploying the operator to a cluster

In the previous section we’ve seen that the operator functions as expected. Now we are going to build all the artifacts needed to deploy it to run inside a cluster. In this setup, the operator’s controller runs in a pod.

 

The controller still uses Kubernetes API to watch events and to manage objects. But, since it is no longer running in the user/developer computer (and therefore has no access to the user’s credentials), it requires custom authorization rules to access Kubernetes API . 

 

These authorization rules can be implemented using a custom role assigned to a service account. The operator’s controller runs using this service account. The yaml files for this setup can be found in config/rbac. You can read more about authorization in Kubernetes here


To run the controller in a pod, we need to build 1) an image containing the code of the controller and 2) a pod or deployment definition to instantiate the pod. We created a Dockerfile to build the image. Using multi-stage we were able to reduce the image size.


# Dockerfile
FROM python:3-alpine3.9 as base
 
FROM base as builder
RUN mkdir /install
WORKDIR /install
COPY requirements.txt /requirements.txt
RUN apk add --no-cache --virtual .build-deps gcc musl-dev libffi-dev openssl-dev
RUN pip install --install-option="--prefix=/install" -r /requirements.txt
 
FROM base
COPY --from=builder /install /usr/local
COPY src /exampleoperatorpy
WORKDIR /exampleoperatorpy
CMD ["python", "main.py"]

 

For this article, we’ve decided to name the image flugelit/immortalcontainer-operator-py:dev. W pushed it to a public registry (Docker Hub) in order for our Kubernetes cluster to fetch it.

Note: You can use any other public or private registry

$ docker build  build flugelit/immortalcontainer-operator-py:dev
$ docker push flugelit/immortalcontainer-operator-py:dev

 

We’ve pushed the image here https://hub.docker.com/r/flugelit/immortalcontainer-operator-py

 

To deploy the operator to the cluster we use the following steps:

  1. Create a new namespace, immortalcontainers-operator, for the operator. The operator’s controller runs inside this namespace.
  2. Configure the authorization rules. To do this we:
    1. Create a role, immortalcontainers-operator, and grant it permissions on the pods, events and immortal containers resources.
    2. Create a service account named immortalcontainers-operator in the namespace with the same name.
    3. Assign the role to the service account.
  3. Install the Custom Resource Definition
  4. Create a deployment object to run the operator image in the namespace, immortalcontainers-operator, using the just created service account.


We automated these steps in a Makefile, so if you clone the code from the repository, you can deploy the operator by running:

 

$ make deploy

 

Clean up

Using the following commands you can remove the operator from the cluster.

 

$ make undeploy

 

Be careful: pods that were created for ImmortalContainers may still be running. If so, you can delete them manually. They will not be restarted since the operator is not running.

 

In Conclusion

In this article we’ve seen all the steps necessary to implement and deploy a Kubernetes operator using Python.

 

Note that, this time, we did not use any tool to help us with the project scaffolding and code generation. Still, we followed the same steps described in our earlier article, in which we implemented the operator in Go using Operator SDK. 

 

  1. Project initialization
  2. Custom resources definitions
  3. Custom controller implementation
  4. Running outside the cluster
  5. Building
  6. Deployment

 

We would like to highlight one weak point regarding the implementation of operators in Python. There are no good tools or libraries to help in the development of automated tests. So, if you want to write tests for an operator using Python, you are more or less on your own.


In the next article we are going to review Metacontroller, a Kubernetes add-on that greatly simplifies the development of operators. Metacontroller takes care of all the work related to handling events and API calls. The developer has only to provide a function that maps the desired and actual states to a new desired state.

 

References

https://github.com/flugel-it/k8s-python-operator