Pulsar 配置Function runtime
配置thread runtime
我们可以在conf/functions_worker.yml文件中使用线程运行时的默认配置
如果需要自定义更多参数,如线程组名称,请参考以下示例:
functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory
functionRuntimeFactoryConfigs:
threadGroupName: "Your Function Container Group"
要设置线程运行时的客户机内存限制,可以配置pulsarClientMemoryLimit:
functionRuntimeFactoryConfigs:
# pulsarClientMemoryLimit
# # the max memory in bytes the pulsar client can use
# absoluteValue:
# # the max memory the pulsar client can use as a percentage of max direct memory set for JVM
# percentOfMaxDirectMemory:
如果同时设置了absoluteValue和percentOfMaxDirectMemory,则使用较小的值.
配置process runtime
我们可以在配置文件conf/functions_worker.yml文件中使用流程运行时的默认配置
如果需要自定义更多参数,请参考以下示例:
functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory
functionRuntimeFactoryConfigs:
# the directory for storing the function logs
logDirectory:
# change the jar location only when you put the java instance jar in a different location
javaInstanceJarLocation:
# change the python instance location only when you put the python instance jar in a different location
pythonInstanceLocation:
# change the extra dependencies location:
extraFunctionDependenciesDir:
配置Kubernetes runtime
当函数工作者生成并应用Kubernetes清单时,Kubernetes运行时工作。由函数工作者生成的清单包括:
- 默认情况下,StatefulSet清单有一个带有多个副本的pod。这个数由函数的并行度决定。pod在pod启动时下载函数有效负载(通过函数工作者REST API)。如果配置了函数运行时,pod的容器映像是可配置的。
- a Service(用于与pod通信)
- 用于验证凭证的Secret(当适用时)。Kubernetes运行时支持秘钥。我们可以创建一个Kubernetes秘钥,并将其作为pod中的环境变量公开
配置基本设置
为了快速配置Kubernetes运行时,可以在conf/functions_worker.yml文件中使用KubernetesRuntimeFactoryConfig的默认设置
如果你已经使用Helm chart在Kubernetes上建立了一个Pulsar集群,这意味着函数工作者也已经在Kubernetes上建立了,你可以使用与函数工作者运行的pod关联的serviceAccount。 否则,我们可以通过将functionRuntimeFactoryConfigs设置为k8Uri来配置函数工作者与Kubernetes集群通信
集成 Kubernetes 密钥
Kubernetes中的Secret是一个保存一些机密数据(如密码、令牌或密钥)的对象。当我们在部署函数的Kubernetes名称空间中创建一个Secret时,函数可以安全地引用和分发它。 要启用该特性,请将配置文件conf/functions-worker.yml的secretsProviderConfiguratorClassName设置为org.apache.pulsar.functions.secretsproviderconfigurator.KubernetesSecretsProviderConfigurator
例如,我们将一个函数部署到pulsar-func Kubernetes名称空间,并且有一个名为database-creds的密钥名称和一个字段名password,我们希望将其作为一个名为DATABASE_PASSWORD的环境变量挂载到pod中。下面的配置允许函数引用密钥并将该值作为pod中的环境变量挂载
tenant: "mytenant"
namespace: "mynamespace"
name: "myfunction"
inputs: [ "persistent://mytenant/mynamespace/myfuncinput" ]
className: "com.company.pulsar.myfunction"
secrets:
# the secret will be mounted from the `password` field in the `database-creds` secret as an env var called `DATABASE_PASSWORD`
DATABASE_PASSWORD:
path: "database-creds"
key: "password"
启用token认证
当我们使用令牌身份验证、TLS加密或自定义身份验证来保护与Pulsar集群的通信时,Pulsar将我们的证书颁发机构(CA)传递给客户端,以便客户端可以使用我们的签名证书对集群进行身份验证
要为Pulsar集群启用身份验证,需要通过实现org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider接口,为运行函数的pod指定一种机制来对代理进行身份验证
对于令牌身份验证,Pulsar包含了上述接口的实现,用于分发CA。函数工作者捕获部署(或更新)函数的令牌,将其保存为密钥,并将其装入pod
在配置文件conf/function-worker.yml中配置functionAuthProviderClassName:
functionAuthProviderClassName: org.apache.pulsar.functions.auth.KubernetesSecretsTokenAuthProvider
对于TLS或自定义身份验证,我们可以实现org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider接口或使用替代机制
如果用于部署函数的令牌有到期日期,则可能需要在到期后重新部署函数
自定义 Kubernetes 运行时
自定义Kubernetes运行时允许我们自定义由运行时创建的Kubernetes资源,包括如何生成清单,如何将经过身份验证的数据传递到pods,以及如何集成密钥secrets
需要在配置文件conf/functions-worker.yml配置runtimeCustomizerClassName,使用全限定类名
函数API提供了一个名为customRuntimeOptions的标志,该标志被传递给org.apache.pulsar.functions.runtime.KubernetesManifestCustomizer 接口。
去实例化KubernetesManifestCustomizer,可以在配置文件conf/functions-worker.yml中设置runtimeCustomizerConfig
runtimeCustomizerConfig在所有函数中都是相同的。如果同时提供runtimeCustomizerConfig和customRuntimeOptions,则需要决定如何在KubernetesManifestCustomizer接口的实现中管理这两个配置
Pulsar包含一个用runtimeCustomizerConfig初始化的内置实现。它允许我们将JSON文档作为customRuntimeOptions传递,并添加某些属性。 要使用这个内置实现,将runtimeCustomizerClassName设置为org.apache.pulsar.functions.runtime.kubernetes.BasicKubernetesManifestCustomizer
如果同时提供了runtimeCustomizerConfig和customRuntimeOptions并且存在冲突,BasicKubernetesManifestCustomizer使用customRuntimeOptions覆盖runtimeCustomizerConfig
customRuntimeOptions配置实例:
{
"jobName": "jobname", // the k8s pod name to run this function instance
"jobNamespace": "namespace", // the k8s namespace to run this function in
"extractLabels": { // extra labels to attach to the statefulSet, service, and pods
"extraLabel": "value"
},
"extraAnnotations": { // extra annotations to attach to the statefulSet, service, and pods
"extraAnnotation": "value"
},
"nodeSelectorLabels": { // node selector labels to add on to the pod spec
"customLabel": "value"
},
"tolerations": [ // tolerations to add to the pod spec
{
"key": "custom-key",
"value": "value",
"effect": "NoSchedule"
}
],
"resourceRequirements": { // values for cpu and memory should be defined as described here: https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container
"requests": {
"cpu": 1,
"memory": "4G"
},
"limits": {
"cpu": 2,
"memory": "8G"
}
}
}
如何定义pulsar资源名称时运行pulsar在Kubernetes
如果我们在Kubernetes上运行Pulsar Functions或连接器,则需要遵循Kubernetes命名约定来定义Pulsar资源的名称,无论我们使用哪个管理界面
Kubernetes需要一个RFC 1123中定义的可以用作DNS子域名的名称。Pulsar支持比Kubernetes命名惯例更多的合法字符 如果我们使用Kubernetes不支持的特殊字符创建Pulsar资源名称(例如,在Pulsar名称空间名称中包含冒号),Kubernetes运行时将Pulsar对象名称转换为符合RFC 1123格式的Kubernetes资源标签。 因此,我们可以使用Kubernetes运行时运行函数或连接器。Pulsar对象名称转换为Kubernetes资源标签的规则如下:
- 截短至63个字符
- 将以下字符替换为破折号(-):
- 非字母数字字符
- 下划线
- .
- 将开头和结尾的非字母数字字符替换为0
自定义 Java 运行时选项
此设置仅适用于进程运行时和Kubernetes运行时
为函数工作者启动的每个进程向JVM命令行传递附加参数,我们可以配置additionalJavaRuntimeArguments 参数在conf/functions_worker.yml配置文件中
- 添加 JVM 参数, 如: -XX:+ExitOnOutOfMemoryError
- 传递自定义系统属性,如:-Dlog4j2.formatMsgNoLookups
additionalJavaRuntimeArguments: ['-XX:+ExitOnOutOfMemoryError','-Dfoo=bar']