Skip to main content

PunchPlatform Tutorial

punch_logo

Introduction

In kubernetes it is possible to send commands using another identity within the cluster. For example, if I were to execute a command as if I were the service account "k8saas-example-sa", I would do: kubectl [...] --as=system:serviceaccount:dev:k8saas-example-sa.

However, user impersonation can be a real security threat as a cyber threat actors can use these identities to conduct malicious actions.

In this use case, we'll use Punchplatform to get information when a command using the user "k8saas-generic-sa-cicd" is run.

The kube-audit logs are kubernetes logs that gather information about the cluster actions.

In this scenario, we'll use the punch parsing capabilities to:

  • listen to the kube-audit logs from Azure in real-time;
  • parse and filter the logs;
  • write to an Opensearch.

The ultimate goal being to see a pretty formatted log indexed in Opensearch every time a command is ran with user impersonation. Then we'll be able to use the Opensearch search engine to easily retrieve information about this logs, like the timestamp, usrAgent or source IP.

info

This test case demonstrate the capacities describe in the punchplatform parsing documentation.

Architecture

punch_logo

The kube-audit logs are only available on the kubernetes master nodes, and therefore, only Azure can gather them. However, we can use Azure Diagnostic Settings to ask them to send the logs to the Azure Event Hub instance.

Then, we'll configure the punchline :

  • (1) to listen to Azure Event Hubs
  • (2) to parse and filter the logs
  • (3) to send the results to Opensearch

Requirements

You needs to submit two tickets via postit:

For Azure Event Hubs: "Something is missing"

  • Ask for an Azure Event Hubs Namespace and Instance
  • Ask to get your primary key for the Namespace
  • Activate Diagnostic settings to route "Kube Audit" logs to the Azure Event Hubs

For Opensearch: "Subscribe to DBaaS Discover"

  • Ask for a DBAAS Opensearch Instance
  • Specify the need to whitelist your cluster external ip at Opensearch level
  • Ask for an Opensearch user to write data to Opensearch

Punchline structure

The punchline respect the format of a Kubernetes yaml file.

The fields value are:

apiVersion: punchline.punchplatform.io/v2
kind: StreamPunchline
metadata:
name: k8saas-punchline-sa-java
labels:
app: k8saas-punchline-sa-java
annotations:
# Enable Prometheus scraping
prometheus.io/scrape: 'true'
# Expose your metric on this port (can be any unused port)
prometheus.io/port: '7770'
spec:
containers:
applicationContainer:
image: ghcr.io/punchplatform/punchline-java:8.0.0
imagePullPolicy: IfNotPresent
env:
- name: JDK_JAVA_OPTIONS
value: "-Xms100m -Xmx450m"
dag:
...

Then, the pipeline process is describe within the dag field. "dag" stands for "Directed acyclic graph" and is a list of nodes that will be executed in the specified order.

There are 3 kind of nodes:

  • "source": input to gather data from;
  • "function": to parse, enrich or filter the data;
  • "sink": output to send the data to.

For each of the following steps we will add one or several node to the dag field.

Receive data from Event hubs

The first step of the process is to receive the logs from Azure Event Hubs.

To this end, we'll need to add an input node to the punchline:

  - id: input
kind: source
type: kafka_source
settings:
bootstrap.servers: <INSTANCE_NAME>.servicebus.windows.net:9093 # Event Hubs Namespace Name
group.id: connect-cluster-group
topics: <INSTANCE_NAME> # Event Hubs instance Name
start_offset_strategy: latest
value.format: string
offset.metadata.max.bytes: 1024
max.message.bytes: 20971520
security.protocol: SASL_SSL
sasl.mechanism: PLAIN
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://<INSTANCE_NAME>.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=<SHARED_ACCESS_KEY>";
out:
- id: punchlet_split_record
table: logs
columns:
- name: value
type: string

This source will configure a Kafka consumer to listen to the logs from Azure Event Hubs.

Be sure to replace the following placeholder with you own value:

  • <INSTANCE_NAME>: your instance name. For example: k8saas-toto-sandbox
  • <SHARED_ACCESS_KEY>: your Azure Event Hub access key. Your Access Key should have been sent to you after your Azure Event Hubs instance creation.

The important fieds are:

  • id: unique id for the node
  • kind: kind of punchline node
  • type: type of source, here it is a kafka consumer
  • Settings:
    • bootstrap.servers: Event Hub Namespace url
    • topics: Event Hub Instance name
    • sasl.jaas.config: credentials to connect to Azure Event Hubs
  • out: info about to which node send the data next
    • id: id of the following node
    • table/columns: punch table and columns in which the data is stored. For our kafka source, it is [logs][value]
Kafka offset strategy

The setting "start_offset_strategy" will define the consumer strategy to retrieve the logs form Azure Event Hubs. "latest" will only get the latest messages, where "earliest" will get all the messages from the earliest one in memory.

Parse data

To parse the data, Punch has it own parsing language. It is an intuitive and very powerful tool to parse, filter and enrich data in real-time.

These code blocks are stored in "punchlets".

For this scenario, we'll define two punchlets:

  • split_records.punch: split the bulk messages received from Azure event Hubs into single messages
  • azure_parsing_header.punch: parse and enrich every log

Punchlet to split records

Azure event hubs send messages in bulk, so the messages needs to be split into single log instance to be parsed later.

The punchlet split_records.punch is:

{
// records field contains an Array
// by putting this array in the fields, the second punchlet_node wil get all the record one by one
for (Tuple t : [logs][value][records].asCollection()) {
Tuple tmp = tuple();
tmp:[logs][value] = t;
root.add(tmp);
}
}

The bulk messages from azure are in the variable [logs][value][records]. By iterating over them in a for loop, we can create a Tuple object for each log, and add it to the stream (root) so it may be consume independently by the next punchline node.

tip

Reminder: In the punch language, the Tuple is the base object for every log. One log is always stored as a Tuple.

Punchlet to parse and enrich the logs

One the log finally in our hands, we can manipulate it to collect all important information.

THe log received from Azure event hub is a kube-audit log. The Azure event hub message looks like this:

{
"ccpNamespace": "62e2b831fb25b000014a2ad8",
"resourceId": "/SUBSCRIPTIONS/XXX-7ED8-4C24-A9A4-D7B4C65600E2/RESOURCEGROUPS/K8SAAS-XXX-SANDBOX/PROVIDERS/MICROSOFT.CONTAINERSERVICE/MANAGEDCLUSTERS/K8SAAS-XXX-SANDBOX",
"Cloud": "AzureCloud",
"Environment": "prod",
"operationName": "Microsoft.ContainerService/managedClusters/diagnosticLogs/Read",
"time": "2022-08-24T21:56:08.0000000Z",
"UnderlayClass": "hcp-underlay",
"category": "kube-audit",
"UnderlayName": "hcp-underlay-eastus-cx-809",
"properties": {
"pod": "kube-apiserver-d76d49667-qq58b",
"log": "{\"kind\":\"Event\",\"apiVersion\":\"audit.k8s.io/v1\",\"level\":\"Metadata\",\"auditID\":\"7293d441-XXX-4521-b5ea-a347ea0c7292\",\"stage\":\"ResponseComplete\",\"requestURI\":\"/apis/status.gatekeeper.sh/v1beta1?timeout=32s\",\"verb\":\"get\",\"user\":{\"username\":\"system:serviceaccount:kube-system:resourcequota-controller\",\"uid\":\"0743f75a-5a8f-4f82-bf95-17511c5d0054\",\"groups\":[\"system:serviceaccounts\",\"system:serviceaccounts:kube-system\",\"system:authenticated\"]},\"sourceIPs\":[\"172.31.22.105\"],\"userAgent\":\"kube-controller-manager/v1.22.11 (linux/amd64) kubernetes/3555818/system:serviceaccount:kube-system:resourcequota-controller\",\"responseStatus\":{\"metadata\":{},\"code\":200},\"requestReceivedTimestamp\":\"2022-08-24T21:56:08.274404Z\",\"stageTimestamp\":\"2022-08-24T21:56:08.275390Z\",\"annotations\":{\"authorization.k8s.io/decision\":\"allow\",\"authorization.k8s.io/reason\":\"RBAC: allowed by ClusterRoleBinding \\\"system:discovery\\\" of ClusterRole \\\"system:discovery\\\" to Group \\\"system:authenticated\\\"\"}}\n",
"stream": "stdout"
}
}

with the actual kube-audit log within the field propertied.log that looks like:

{
{
"auditID": "XXX-9940-4b96-95e4-90fd0164bd43",
"requestReceivedTimestamp": "2022-10-25T16: 24: 10.427986Z",
"objectRef": {
"apiGroup": "apps",
"apiVersion": "v1",
"resource": "deployments",
"namespace": "A"
},
"level": "Request",
"kind": "Event",
"verb": "list",
"annotations": {
"authorization.k8s.io/decision": "forbid",
"authorization.k8s.io/reason": ""
},
"userAgent": "kubectl/v1.24.3 (darwin/amd64) kubernetes/aef86a9",
"requestURI": "/apis/apps/v1/namespaces/A/deployments?limit=500",
"responseStatus": {
"reason": "Forbidden",
"metadata": {},
"code": 403,
"status": "Failure"
},
"impersonatedUser": {
"groups": [
"system:serviceaccounts",
"system:serviceaccounts:dev",
"system:authenticated"
],
"username": "system:serviceaccount:dev:k8saas-generic-sa-cicd"
},
"stageTimestamp": "2022-10-25T16: 24: 10.428650Z",
"sourceIPs": [
"XXX.170.131.52"
],
"apiVersion": "audit.k8s.io/v1",
"stage": "ResponseComplete",
"user": {
"groups": [
"system:masters",
"system:authenticated"
],
"username": "masterclient"
}
}

The field impersonatedUser is dynamic and only present if an alias as been used. We want to filter the logs depending on its presence.

The other important fields necessary for an investigation are:

  • requestReceivedTimestamp: time of the command
  • userAgent: request's userAgent
  • requestURI: the requested URI
  • sourceIP: request's source IP

The punchlet split_records.punch is:

{
// init 2 tuples one for parsing and one for the output
Tuple document = [logs][log];
Tuple tmp = [logs][value];

// keep initial message for forensics purpose
document:[message] = tmp.toString();
// keep data when receiving the logs by kast-data
document:[rep][ts] = date("yyyy-MM-dd'T'HH:mm:ssZ").get();

// parse azure resource id
String[] split = tmp:[resourceId].split("/");
document:[azure][subscription][id] = split[2];
document:[azure][resource_group][name] = split[4].toLowerCase();
document:[azure][service] = "aks";
document:[k8saas][instance][name] = split[8].toLowerCase();

document:[obs][ts] = tmp:[ts];

document:[app][name] = tmp:[category];

document:[audit][pod][name] = tmp:[properties][pod];

String logMsg = tmp:[properties][log];
logMsg = logMsg.replace('\"', '"').replace("\n", "").replace("\r", "");
Tuple tmplogs = TupleFactory.getTupleFromJson(logMsg);
document:[audit][log] = tmplogs.toString();
document:[audit][user][username] = tmplogs:[user][username];
document:[audit][user][groups] = tmplogs:[user][groups];
document:[audit][sourceIPs] = tmplogs:[sourceIPs];
document:[audit][userAgent] = tmplogs:[userAgent];
document:[audit][requestReceivedTimestamp] = tmplogs:[requestReceivedTimestamp];

if (tmplogs.hasKey("impersonatedUser")) {
document:[audit][impersonatedUser][username] = tmplogs:[impersonatedUser][username];
document:[audit][impersonatedUser][groups] = tmplogs:[impersonatedUser][username];
} else {
document:[audit][impersonatedUser][username] = "";
document:[audit][impersonatedUser][groups] = "";
}

if (document:[audit][impersonatedUser][username].contains("k8saas-generic-sa-cicd")) {
[logs][document] = document;
}
}

Here we use two tuple along the process. The Tuple tmp contain the raw log from Azure Event Hubs, where document contains the final output log.

Then all the lines except for last three are dedicated to parse every field.

At the end, the last if condition is here to filter all logs and only keeps the one that contains a field impersonatedUser with the value "k8saas-generic-sa-cicd".

The output logs contains all relevant information parsed from the original logs and the original kube-audit logs in raw text in the "message" field.

Here is an example of output log once parsed:

{
"app": {
"name": "kube-audit"
},
"audit": {
"requestReceivedTimestamp": "2022-10-28T20:23:38.028442Z",
"sourceIPs": [
"70.81.XXX.111"
],
"pod": {
"name": "kube-apiserver-7c8db89564-vjhmg"
},
"log": "{...}",
"userAgent": "k9s/v0.0.0 (darwin/amd64) kubernetes/$Format","user": {
"groups": [
"system:masters",
"system:authenticated"
],
"username": "masterclient"
},
"impersonatedUser": {
"groups": "",
"username": ""
}
},
"message": "{...}",
"rep": {
"ts": "2022-10-28T20:24:20+0000"
},
"k8saas": {
"instance": {
"name": "k8saas-XXX-sandbox"
}
},
"azure": {
"resource_group": {
"name": "k8saas-XXX-sandbox"
},
"service": "aks",
"subscription": {
"id": "XXX-7ED8-4C24-A9A4-D7B4C65600E2"
}
}
}

Implementation of the punchlets in punchline nodes

Now that our punchlets are ready, they will be mounted in a dedicated configmap when it's time to deploy the solution on kube.

For the moment, let's see the new punchline node that calls these punchlets:

- id: punchlet_split_record
kind: function
type: punchlet_function
settings:
punchlets:
- /resources/split_records.punch
out:
- id: punchlet_parse_log
table: logs
columns:
- name: value
type: string
- id: punchlet_parse_log
kind: function
type: punchlet_function
settings:
punchlets:
- /resources/azure_parsing_header.punch
out:
- id: elastic_sink
table: logs
columns:
- name: document
type: string

Each node configuration is straight forward:

  • kind is "function"
  • type is "punchlet function"
  • settings.punchlet is the path to one of the mounted punchlets

The final output points to the elasticsearch sink node.

Write to Opensearch

We wants to analyze the parsed logs within Opensearch. To do so, we'll set up a output node that writes into an elastic index.

Here is the punchline node:

      - id: elastic_sink
kind: sink
type: elasticsearch
engine_settings:
tick_row_frequency_ms: 1000
settings:
http_hosts:
- host: elasticsearch.esaas-<ESAAS_INSTANCE_NAME>.kaas.thalesdigital.io
port: 443
scheme: https
security:
credentials:
username: "<OPENSEARCH_USER>"
password: "<OPENSEARCH_PASSWORD>"
index:
type: constant
value: k8saas-kast-audit-<INSTANCE_NAME>
document:
json_column: document

Please replace:

  • <INSTANCE_NAME>: name of your cluster
  • <ESAAS_INSTANCE_NAME>: name of your elastic cluster instance
  • <OPENSEARCH_USER>: Opensearch user name, given at cluster creation
  • <OPENSEARCH_PASSWORD>: Opensearch user password, given at cluster creation

Run the punchline in Kubernetes

To run the punchline in kube, there are 3 different files:

  • the deployment of the punchlets configmap
  • the deployment of the punchline configmap
  • the deployment of Punch application

First, both of the punchlets are added to a configmap: k8saas-punch-audit-punchlets-cm.yaml:

apiVersion: v1
kind: ConfigMap
metadata:
name: k8saas-punch-audit-punchlets-cm
data:
split_records.punch: |
{
// records field contains an Array
// by putting this array in the fields, the second punchlet_node wil get all the record one by one
for (Tuple t : [logs][value][records].asCollection()) {
Tuple tmp = tuple();
tmp:[logs][value] = t;
root.add(tmp);
}
}
azure_parsing_header.punch: |
{
// init 2 tuples one for parsing and one for the output
Tuple document = [logs][log];
Tuple tmp = [logs][value];

// keep initial message for forensics purpose
document:[message] = tmp.toString();
// keep data when receiving the logs by kast-data
document:[rep][ts] = date("yyyy-MM-dd'T'HH:mm:ssZ").get();

// parse azure resource id
String[] split = tmp:[resourceId].split("/");
document:[azure][subscription][id] = split[2];
document:[azure][resource_group][name] = split[4].toLowerCase();
document:[azure][service] = "aks";
document:[k8saas][instance][name] = split[8].toLowerCase();

document:[obs][ts] = tmp:[ts];

document:[app][name] = tmp:[category];

document:[audit][pod][name] = tmp:[properties][pod];

String logMsg = tmp:[properties][log];
logMsg = logMsg.replace('\"', '"').replace("\n", "").replace("\r", "");
Tuple tmplogs = TupleFactory.getTupleFromJson(logMsg);
document:[audit][log] = tmplogs.toString();
document:[audit][user][username] = tmplogs:[user][username];
document:[audit][user][groups] = tmplogs:[user][groups];
document:[audit][sourceIPs] = tmplogs:[sourceIPs];
document:[audit][userAgent] = tmplogs:[userAgent];
document:[audit][requestReceivedTimestamp] = tmplogs:[requestReceivedTimestamp];

if (tmplogs.hasKey("impersonatedUser")) {
document:[audit][impersonatedUser][username] = tmplogs:[impersonatedUser][username];
document:[audit][impersonatedUser][groups] = tmplogs:[impersonatedUser][username];
} else {
document:[audit][impersonatedUser][username] = "";
document:[audit][impersonatedUser][groups] = "";
}

if (document:[audit][impersonatedUser][username].contains("k8saas-generic-sa-cicd")) {
[logs][document] = document;
}

// print(tmp);
// print(document);
}

Then,the punchline is added to a configmap: k8saas-punch-audit-punchline-cm.yaml:

apiVersion: v1
kind: ConfigMap
metadata:
name: k8saas-punch-audit-punchline-cm
data:
punchline.yml: |
apiVersion: punchline.punchplatform.io/v2
kind: StreamPunchline
metadata:
name: k8saas-punchline-sa-java
labels:
app: k8saas-punchline-sa-java
annotations:
# Enable Prometheus scraping
prometheus.io/scrape: 'true'
# Expose your metric on this port (can be any unused port)
prometheus.io/port: '7770'
spec:
containers:
applicationContainer:
image: ghcr.io/punchplatform/punchline-java:8.0.0
imagePullPolicy: IfNotPresent
env:
- name: JDK_JAVA_OPTIONS
value: "-Xms100m -Xmx450m"
dag:
- id: input
kind: source
type: kafka_source
settings:
bootstrap.servers: <INSTANCE_NAME>.servicebus.windows.net:9093 # Event Hubs Namespace Name
group.id: connect-cluster-group
topics: <INSTANCE_NAME> # Event Hubs instance Name
start_offset_strategy: latest
value.format: string
offset.metadata.max.bytes: 1024
max.message.bytes: 20971520
security.protocol: SASL_SSL
sasl.mechanism: PLAIN
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://<INSTANCE_NAME>.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=<SHARED_ACCESS_KEY>";
out:
- id: punchlet_split_record
table: logs
columns:
- name: value
type: string
- id: punchlet_split_record
kind: function
type: punchlet_function
settings:
punchlets:
- /resources/split_records.punch
out:
- id: punchlet_parse_log
table: logs
columns:
- name: value
type: string
- id: punchlet_parse_log
kind: function
type: punchlet_function
settings:
punchlets:
- /resources/azure_parsing_header.punch
out:
- id: elastic_sink
table: logs
columns:
- name: document
type: string
- id: elastic_sink
kind: sink
type: elasticsearch
engine_settings:
tick_row_frequency_ms: 1000
settings:
http_hosts:
- host: elasticsearch.esaas-teh-esaas-b243-sandbox.kaas.thalesdigital.io
port: 443
scheme: https
security:
credentials:
username: "<OPENSEARCH_USER>"
password: "<OPENSEARCH_PASSWORD>"
index:
type: constant
value: k8saas-punch-audit-teh-b262-20221024
document:
json_column: document

Finally both configmap are mounted with a deployment that will run the application: k8saas-punch-audit-dpl.yaml:

apiVersion: apps/v1
kind: Deployment
metadata:
name: k8saas-punch-audit-dpl
labels:
app: k8saas-punch-audit-pod
annotations:
fluentbit.io/exclude: "true"
spec:
replicas: 1
selector:
matchLabels:
app: k8saas-punch-audit-pod
template:
metadata:
labels:
app: k8saas-punch-audit-pod
annotations:
fluentbit.io/exclude: "true"
spec:
containers:
- name: punchline
image: ghcr.io/punchplatform/punchline-java:8.0.0
imagePullPolicy: Always
args: ["/punchline.yml"]
ports:
- name: metrics
containerPort: 7770
volumeMounts:
- name: punchline
mountPath: /punchline.yml
subPath: punchline.yml
- name: punchlets
mountPath: /resources
volumes:
- name: punchline
configMap:
name: k8saas-punch-audit-punchline-cm
- name: punchlets
configMap:
name: k8saas-punch-audit-punchlets-cm

Then, just apply the three files in the dev namespace:

kubectl apply -f k8saas-punch-audit-punchlets-cm.yaml -n dev
kubectl apply -f k8saas-punch-audit-punchline-cm.yaml -n dev
kubectl apply -f k8saas-punch-audit-deploy.yaml -n dev

And check that the deployment is successful:

kubectl get configmaps -n dev
# NAME DATA AGE
# k8saas-punch-audit-punchlets-cm 2 10s
# k8saas-punch-audit-punchline-cm 1 10s
kubectl get pods -n dev
# NAME READY STATUS RESTARTS AGE
# k8saas-punch-audit-dpl-7fdb87fdbf-p5gs8 2/2 Running 0 10s

How to test

Run the following commands in your cluster:

for i in {1..100}; do kubectl get deploy --as=system:serviceaccount:dev:k8saas-generic-sa-cicd -n dev; done
for i in {1..100}; do kubectl get deploy --as=system:serviceaccount:dev:k8saas-generic-sa-cicd -n A; done

Do not worry if errors appears on the screen, the idea is just to generate some logs with the use of the service account k8saas-generic-sa-cicd.

Then, a few minutes later, they should appears in Opensearch:

punch_test_case_opensearch-screenshot.png

Monitoring

There are two Grafana dashboard that Punchplatform provides to monitor your punchlines:

  • Punch Advanced Insights
  • Punch Ack Fail Rates

Punch Advanced Insights Dashboard

This dashboard displays information about:

  • the CPU consumption of every punchline node individually
  • pending rows
  • the total traversal time over time

You can import it in Grafana using the following json file: punch_ack_fail_rates_dashboard.json

Here is a view of the dashboard during the execution of the scenario: punch_test_case_grafana_insight-screenshot.png

tip

As we can see information per node, it is easy to see any malfunction within the dag. For example, we sent the wrong log field to the Elasticsearch node at some point, and this field was filtered out somewhere else in a punchlet. Then we can easily see with the dashboard that the node is not active.

Punch Ack Failed Rates

Ack stands for "acknowledgment". It means that the log message from Azure event hubs as been successfully processed at every step.

This dashboard displays information about:

  • the evolution of acks per seconds over time per punchline nodes
  • the evolution of fails per seconds over time per punchline nodes
  • the fail rate
  • the acks rate
  • a summary of these information by node

You can import this dashboard in Grafana using the following json file: punch_ack_fail_rates_dashboard.json

punch_test_case_grafana_acks-screenshot.png

tip

This dashboard is really useful to quickly have a summary of all failure happened and at which node. The spikes we see on the screenshot matches to the reception of AZure Event Hubs bulk messages.