Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 15 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,42 +135,35 @@ In the future, additional flags may be added to allow you to include tables, add

## Plugins

If you would like to implement your own `valueFuncs`, you can do so by writing a ripoff plugin.

Plugins are local unauthenticated TCP servers that consume and emit newline-separated JSON messages from ripoff.
If you would like to implement your own `valueFuncs`, you can do so by writing a ripoff plugin, which is a local TCP server that sends/recieves JSON.

### Writing a plugin

Plugins must listen to a local TCP port and provide a TCP stream (loop of receiving and sending messages) to clients.
Plugins must meet the following requirements:

On startup, plugins must output the string `READY` in its first line of output to indicate to ripoff that it is ready to receive TCP messges.
- Listen to a local TCP port
- Consume newline-separated JSON messages, which come in as a stream
- Output newline-separated JSON responses
- Ouput `READY` in the first line of standard output when the plugin is ready for TCP connections

Each incoming message will be a single line of JSON in the following types:
Each incoming message will be a single line of JSON of the following shapes:

#### Return a value
#### valueFunc

Your plugin must process an arbitrary `valueFunc` and return a string value. You can decide how to handle functions you do not expect/provide, by either returning an empty value or disconnecting the client.

The `id` field is used to support unordered stream messages, so you can return responses at any time and in any order as long as they have the same `id` as the relevant request.

Message from ripoff:

```json
{"type": "valueFunc", "valueFunc": "someFuncName", "args": ["some", "argument", "list"]}
{"id": "some-id", "type": "valueFunc", "valueFunc": "someFuncName", "args": ["some", "argument", "list"]}
```

Response from your TCP server:

```json
{"value": "someString"}
```

#### Exit your process

Ripoff will send a kill signal to your process, but if you'd like to clean up before that an exit message will be sent beforehand.

Request message:

```json
{"type": "exit"}
{"id": "the-same-id-from-the-request", "value": "someString"}
```

#### Example
Expand All @@ -179,7 +172,9 @@ An example plugin can be found at `cmd/helloplugin/helloplugin.go`. although TCP

### Using a plugin

Plugins are defined in your ripoff files, which instruct ripoff to spawn a process to start your TCP server, then later connect to it with a single TCP stream. Here's an example from ripoff's tests:
Plugins are defined in your ripoff files, which instruct ripoff to spawn a process to start your TCP server.

Here's an example from ripoff's tests:

```yml
# A list of plugins to register with ripoff.
Expand Down
8 changes: 3 additions & 5 deletions cmd/helloplugin/helloplugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"log"
"net"
"os"
)

func main() {
Expand All @@ -33,12 +32,14 @@ func main() {
}

type Request struct {
Id string `json:"id"`
Type string `json:"type"`
ValueFunc string `json:"valueFunc"`
Args []string `json:"args"`
}

type Response struct {
Id string `json:"id"`
Value string `json:"value"`
}

Expand All @@ -59,10 +60,6 @@ func handleConnection(conn net.Conn) {
log.Println("Error parsing body:", err)
return
}
if r.Type == "exit" {
os.Exit(0)
return
}
Comment on lines -62 to -65

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This didn't really do anything useful, IMO. Mostly I added it before I had process group IDs working

if len(r.Args) == 0 {
log.Println("No args provided")
return
Expand All @@ -78,6 +75,7 @@ func handleConnection(conn net.Conn) {
return
}
resp, err := json.Marshal(Response{
Id: r.Id,
Value: value,
})
if err != nil {
Expand Down
54 changes: 53 additions & 1 deletion cmd/ripoff/ripoff.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package main

import (
"bufio"
"context"
"flag"
"fmt"
"log/slog"
"os"
"path"
"slices"
"strings"

"github.com/jackc/pgx/v5"

Expand All @@ -17,9 +20,54 @@ func errAttr(err error) slog.Attr {
return slog.Any("error", err)
}

func confirmPluginsSafe(plugins map[string]ripoff.RipoffPlugin) {

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is slightly weird but better than code execution? I would consider this prompt for any ripoff file as well

baseDir, err := os.UserHomeDir()
if err != nil {
baseDir = os.TempDir()
}
consentFilePath := path.Join(baseDir, ".ripoff-consent")
consentFile, err := os.ReadFile(consentFilePath)
if err != nil && !os.IsNotExist(err) {
slog.Error("Could not read from consent file", errAttr(err), slog.String("filepath", consentFilePath))
}
consentFileLines := strings.Split(string(consentFile), "\n")
scanner := bufio.NewScanner(os.Stdin)
newConsentLines := []string{}
for _, plugin := range plugins {
cmdJoined := strings.Join(append([]string{plugin.Address, " -> "}, plugin.Command...), " ")
if !slices.Contains(consentFileLines, cmdJoined) {
newConsentLines = append(newConsentLines, cmdJoined)
}
}
if len(newConsentLines) > 0 {
fmt.Printf("You have not run these ripoff plugins before, please confirm that the following commands are safe to run on your machine: \n")
fmt.Println()
for _, consentLine := range newConsentLines {
fmt.Printf(" %s\n", consentLine)
}
fmt.Println()
fmt.Println("Run the above? (Y/N)")
scanner.Scan()
input := scanner.Text()
if input == "y" || input == "Y" {
consentFileLines = append(consentFileLines, newConsentLines...)
err = os.WriteFile(consentFilePath, []byte(strings.Join(consentFileLines, "\n")), 0644)
if err != nil {
slog.Error("Could not append to the consent file", errAttr(err), slog.String("filepath", consentFilePath))
}
fmt.Println("Proceeding...")
} else {
fmt.Println("ABORT")
os.Exit(1)
}
}
}

func main() {
verbosePtr := flag.Bool("v", false, "enable verbose output")
softPtr := flag.Bool("s", false, "do not commit generated queries")
maxConcurrencyPtr := flag.Int("c", ripoff.DEFAULT_MAX_CONCURRENCY, "maximum number of rows to generate queries for at one time. defaults at 1000")
unsafePluginPtr := flag.Bool("u", false, "execute new plugin commands without prompting. only for use in CI or trusted environments")
flag.Parse()

if *verbosePtr {
Expand Down Expand Up @@ -77,7 +125,11 @@ func main() {
os.Exit(1)
}

err = ripoff.RunRipoff(ctx, tx, totalRipoff)
if !*unsafePluginPtr && len(totalRipoff.Plugins) > 0 {
confirmPluginsSafe(totalRipoff.Plugins)
}

err = ripoff.RunRipoff(ctx, tx, totalRipoff, *maxConcurrencyPtr)
if err != nil {
slog.Error("Could not run ripoff", errAttr(err))
os.Exit(1)
Expand Down
65 changes: 47 additions & 18 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"regexp"
"slices"
"strings"
"sync"
"time"

"github.com/brianvoe/gofakeit/v7"
Expand All @@ -19,9 +20,11 @@ import (
"github.com/tj/go-naturaldate"
)

const DEFAULT_MAX_CONCURRENCY = 1000

// Runs ripoff from start to finish, without committing the transaction.
func RunRipoff(ctx context.Context, tx pgx.Tx, totalRipoff RipoffFile) error {
manager, err := NewPluginManager(totalRipoff.Plugins)
func RunRipoff(ctx context.Context, tx pgx.Tx, totalRipoff RipoffFile, maxConcurrency int) error {
manager, err := NewPluginManager(ctx, totalRipoff.Plugins)
if err != nil {
return err
}
Expand All @@ -32,7 +35,7 @@ func RunRipoff(ctx context.Context, tx pgx.Tx, totalRipoff RipoffFile) error {
return err
}

queries, err := buildQueriesForRipoff(manager, primaryKeys, totalRipoff)
queries, err := buildQueriesForRipoff(maxConcurrency, manager, primaryKeys, totalRipoff)
if err != nil {
return err
}
Expand Down Expand Up @@ -163,10 +166,11 @@ func prepareValue(manager *PluginManager, rawValue string) (string, error) {
return fakerResult, nil
}

func buildQueryForRow(manager *PluginManager, primaryKeys PrimaryKeysResult, rowId string, row Row, dependencyGraph map[string][]string) (string, error) {
func buildQueryForRow(manager *PluginManager, primaryKeys PrimaryKeysResult, rowId string, row Row) (string, []string, error) {
dependencyResult := []string{}
parts := strings.Split(rowId, ":")
if len(parts) < 2 {
return "", fmt.Errorf("invalid id: %s", rowId)
return "", dependencyResult, fmt.Errorf("invalid id: %s", rowId)
}
table := parts[0]
primaryKeysForTable, hasPrimaryKeysForTable := primaryKeys[table]
Expand Down Expand Up @@ -210,10 +214,10 @@ func buildQueryForRow(manager *PluginManager, primaryKeys PrimaryKeysResult, row
case []string:
dependencies = v
default:
return "", fmt.Errorf("cannot parse ~dependencies value in row %s", rowId)
return "", dependencyResult, fmt.Errorf("cannot parse ~dependencies value in row %s", rowId)
}
dependencyGraph[rowId] = append(dependencyGraph[rowId], dependencies...)
dependencyGraph[rowId] = slices.Compact(dependencyGraph[rowId])
dependencyResult = append(dependencyResult, dependencies...)
dependencyResult = slices.Compact(dependencyResult)
continue
}

Expand All @@ -230,14 +234,14 @@ func buildQueryForRow(manager *PluginManager, primaryKeys PrimaryKeysResult, row
addEdge := referenceRegex.MatchString(value)
// Don't add edges to and from the same row.
if addEdge && rowId != value {
dependencyGraph[rowId] = append(dependencyGraph[rowId], value)
dependencyGraph[rowId] = slices.Compact(dependencyGraph[rowId])
dependencyResult = append(dependencyResult, value)
dependencyResult = slices.Compact(dependencyResult)
}

columns = append(columns, pq.QuoteIdentifier(column))
valuePrepared, err := prepareValue(manager, value)
if err != nil {
return "", err
return "", dependencyResult, err
}
// Assume this column is the primary key.
if rowId == value && onConflictColumn == "" {
Expand All @@ -249,7 +253,7 @@ func buildQueryForRow(manager *PluginManager, primaryKeys PrimaryKeysResult, row
}

if onConflictColumn == "" {
return "", fmt.Errorf("cannot determine column to conflict with for: %s, saw %s", rowId, row)
return "", dependencyResult, fmt.Errorf("cannot determine column to conflict with for: %s, saw %s", rowId, row)
}

// Extremely smart query builder.
Expand All @@ -263,11 +267,11 @@ func buildQueryForRow(manager *PluginManager, primaryKeys PrimaryKeysResult, row
strings.Join(values, ","),
onConflictColumn,
strings.Join(setStatements, ","),
), nil
), dependencyResult, nil
}

// Returns a sorted array of queries to run based on a given ripoff file.
func buildQueriesForRipoff(manager *PluginManager, primaryKeys PrimaryKeysResult, totalRipoff RipoffFile) ([]string, error) {
func buildQueriesForRipoff(maxConcurrency int, manager *PluginManager, primaryKeys PrimaryKeysResult, totalRipoff RipoffFile) ([]string, error) {
dependencyGraph := map[string][]string{}
queries := map[string]string{}

Expand All @@ -277,12 +281,37 @@ func buildQueriesForRipoff(manager *PluginManager, primaryKeys PrimaryKeysResult
}

// Build queries.
var wg sync.WaitGroup
semaphore := make(chan struct{}, maxConcurrency)
type rowChanItem struct {
rowId string
query string
dependencies []string
err error

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered errgroup for this but couldn't get it working

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(or, at least, better than the semaphore)

}
rowChan := make(chan rowChanItem, len(totalRipoff.Rows))
for rowId, row := range totalRipoff.Rows {
query, err := buildQueryForRow(manager, primaryKeys, rowId, row, dependencyGraph)
if err != nil {
return []string{}, err
semaphore <- struct{}{}
wg.Add(1)
go func(rowId string, row Row) {
defer wg.Done()
defer func() { <-semaphore }()
query, dependencies, err := buildQueryForRow(manager, primaryKeys, rowId, row)
rowChan <- rowChanItem{rowId, query, dependencies, err}
}(rowId, row)
}

go func() {
wg.Wait()
close(rowChan)
}()

for rowItem := range rowChan {
if rowItem.err != nil {
return []string{}, rowItem.err
}
queries[rowId] = query
dependencyGraph[rowItem.rowId] = rowItem.dependencies
queries[rowItem.rowId] = rowItem.query
}

// Sort and reverse the graph, so queries are in order of least (hopefully none) to most dependencies.
Expand Down
4 changes: 2 additions & 2 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ func runTestData(t *testing.T, ctx context.Context, tx pgx.Tx, testDir string) {
require.NoError(t, err)
totalRipoff, err := RipoffFromDirectory(testDir, enums)
require.NoError(t, err)
err = RunRipoff(ctx, tx, totalRipoff)
err = RunRipoff(ctx, tx, totalRipoff, DEFAULT_MAX_CONCURRENCY)
require.NoError(t, err)
// Run again to implicitly test upsert behavior.
err = RunRipoff(ctx, tx, totalRipoff)
err = RunRipoff(ctx, tx, totalRipoff, DEFAULT_MAX_CONCURRENCY)
require.NoError(t, err)
// Try to verify that the number of generated rows matches the ripoff.
tableCount := map[string]int{}
Expand Down
2 changes: 1 addition & 1 deletion export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func runExportTestData(t *testing.T, ctx context.Context, tx pgx.Tx, testDir str
_, err = tx.Exec(ctx, string(truncateFile))
require.NoError(t, err)
// Run generated ripoff.
err = RunRipoff(ctx, tx, ripoffFile)
err = RunRipoff(ctx, tx, ripoffFile, DEFAULT_MAX_CONCURRENCY)
require.NoError(t, err)
// Try to verify that the number of generated rows matches the ripoff.
tableCount := map[string]int{}
Expand Down
Loading
Loading