Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions internal/kafka/command_topic_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ import (
"github.com/confluentinc/cli/v4/pkg/utils"
)

// confluent.*.association values are JSON strings. They must be stored verbatim,
// so keys are passed as rawValueKeys to skip special-character un-escaping.
var rawValueTopicConfigs = []string{"confluent.key.association", "confluent.value.association"}

func (c *command) newCreateCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "create <topic>",
Expand All @@ -35,7 +39,7 @@ func (c *command) newCreateCommand() *cobra.Command {
}

cmd.Flags().Uint32("partitions", 0, "Number of topic partitions.")
cmd.Flags().StringSlice("config", nil, `A comma-separated list of configuration overrides ("key=value") for the topic being created.`)
pcmd.AddConfigFlag(cmd)
pcmd.AddEndpointFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddDryRunFlag(cmd)
cmd.Flags().Bool("if-not-exists", false, "Exit gracefully if topic already exists.")
Expand All @@ -58,8 +62,7 @@ func (c *command) create(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}

configMap, err := properties.ConfigFlagToMap(configs)
configMap, err := properties.GetMap(configs, rawValueTopicConfigs...)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/kafka/command_topic_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (c *command) update(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}
configMap, err := properties.GetMap(configs)
configMap, err := properties.GetMap(configs, rawValueTopicConfigs...)
if err != nil {
return err
}
Expand Down
29 changes: 20 additions & 9 deletions pkg/properties/properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,35 @@ import (
"bytes"
"fmt"
"os"
"slices"
"sort"
"strings"

"github.com/confluentinc/cli/v4/pkg/utils"
)

// GetMap reads newline-separated configuration files or comma-separated lists of key=value pairs, and supports configuration values containing commas.
func GetMap(config []string) (map[string]string, error) {
// Values whose keys are listed in rawValueKeys are stored verbatim, skipping special characters un-escaping, to preserve values such as JSON strings unchanged.
func GetMap(config []string, rawValueKeys ...string) (map[string]string, error) {
if len(config) == 1 && utils.FileExists(config[0]) {
return fileToMap(config[0])
return fileToMap(config[0], rawValueKeys...)
}

return ConfigFlagToMap(config)
return ConfigFlagToMap(config, rawValueKeys...)
}

// fileToMap reads key=value pairs from a properties file, ignoring comments and empty lines.
func fileToMap(filename string) (map[string]string, error) {
func fileToMap(filename string, rawValueKeys ...string) (map[string]string, error) {
buf, err := os.ReadFile(filename)
if err != nil {
return nil, err
}

return ConfigSliceToMap(ParseLines(string(buf)))
return ConfigSliceToMap(ParseLines(string(buf)), rawValueKeys...)
}

// ConfigSliceToMap converts a list of key=value strings into a map.
func ConfigSliceToMap(configs []string) (map[string]string, error) {
func ConfigSliceToMap(configs []string, rawValueKeys ...string) (map[string]string, error) {
m := make(map[string]string)

for _, config := range configs {
Expand All @@ -39,7 +41,12 @@ func ConfigSliceToMap(configs []string) (map[string]string, error) {
return nil, fmt.Errorf(`failed to parse "key=value" pattern from configuration: %s`, config)
}

m[x[0]] = replaceSpecialCharacters(x[1])
// rawValueKeys are stored as-is, all other values get un-escaped.
if slices.Contains(rawValueKeys, x[0]) {
m[x[0]] = x[1]
} else {
m[x[0]] = replaceSpecialCharacters(x[1])
}
}

return m, nil
Expand All @@ -62,14 +69,18 @@ func ParseLines(content string) []string {
}

// ConfigFlagToMap reads key=values pairs from the --config flag and supports configuration values containing commas.
func ConfigFlagToMap(configs []string) (map[string]string, error) {
func ConfigFlagToMap(configs []string, rawValueKeys ...string) (map[string]string, error) {
m := make(map[string]string)

for i := len(configs) - 1; i >= 0; i-- {
if strings.Contains(configs[i], "=") {
x := strings.SplitN(configs[i], "=", 2)
if _, ok := m[x[0]]; !ok {
m[x[0]] = replaceSpecialCharacters(x[1])
if slices.Contains(rawValueKeys, x[0]) {
m[x[0]] = x[1]
} else {
m[x[0]] = replaceSpecialCharacters(x[1])
}
}
} else {
if i-1 >= 0 {
Expand Down
14 changes: 14 additions & 0 deletions pkg/properties/properties_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package properties

import (
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -124,3 +126,15 @@ func TestCreateKeyValuePairsKeysWithDotsAndSorts(t *testing.T) {
m["connection.mode"] = "OUTBOUND"
require.Equal(t, "\"connection.mode\"=\"OUTBOUND\"\n\"link.mode\"=\"BIDIRECTIONAL\"\n", CreateKeyValuePairs(m))
}

func TestGetMap_FileWithRawValueKeyPreservesJSON(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "topic.properties")
// confluent.value.association is stored verbatim, so its JSON value must be preserved.
json := `{"schema":"{\"type\":\"record\",\"name\":\"TestRecord\",\"doc\":\"Basic test.\\na=b.\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\",\"string\"]}]}"}`
require.NoError(t, os.WriteFile(path, []byte("confluent.value.association="+json+"\n"), 0o600))

m, err := GetMap([]string{path}, "confluent.value.association")
require.NoError(t, err)
require.Equal(t, json, m["confluent.value.association"])
}