Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

This file was deleted.

51 changes: 0 additions & 51 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package v1alpha1

import (
"context"
"encoding/json"

"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -214,41 +213,6 @@ func (in *Inputs) DeepCopyInto(out *Inputs) {
// Once we figure out the autogenerate story we can replace this
}

// Deprecated: Please use Connections instead
type DeprecatedConnections struct {
DownstreamEdges map[NodeID][]NodeID
UpstreamEdges map[NodeID][]NodeID
}

func (in *DeprecatedConnections) UnmarshalJSON(b []byte) error {
in.DownstreamEdges = map[NodeID][]NodeID{}
err := json.Unmarshal(b, &in.DownstreamEdges)
if err != nil {
return err
}
in.UpstreamEdges = map[NodeID][]NodeID{}
for from, nodes := range in.DownstreamEdges {
for _, to := range nodes {
if _, ok := in.UpstreamEdges[to]; !ok {
in.UpstreamEdges[to] = []NodeID{}
}
in.UpstreamEdges[to] = append(in.UpstreamEdges[to], from)
}
}
return nil
}

func (in *DeprecatedConnections) MarshalJSON() ([]byte, error) {
return json.Marshal(in.DownstreamEdges)
}

// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *DeprecatedConnections) DeepCopyInto(out *DeprecatedConnections) {
*out = *in
// We do not manipulate the object, so its ok
// Once we figure out the autogenerate story we can replace this
}

// Connections keep track of downstream and upstream dependencies (including data and execution dependencies).
type Connections struct {
Downstream map[NodeID][]NodeID `json:"downstream"`
Expand All @@ -267,12 +231,6 @@ type WorkflowSpec struct {
ID WorkflowID `json:"id"`
Nodes map[NodeID]*NodeSpec `json:"nodes"`

// Defines the set of connections (both data dependencies and execution dependencies) that the graph is
// formed of. The execution engine will respect and follow these connections as it determines which nodes
// can and should be executed.
// Deprecated: Please use Connections
DeprecatedConnections DeprecatedConnections `json:"connections"`

// Defines the set of connections (both data dependencies and execution dependencies) that the graph is
// formed of. The execution engine will respect and follow these connections as it determines which nodes
// can and should be executed.
Expand Down Expand Up @@ -337,15 +295,6 @@ func (in *WorkflowSpec) GetNode(nodeID NodeID) (ExecutableNode, bool) {
}

func (in *WorkflowSpec) GetConnections() *Connections {
// For backward compatibility, if the new Connections field is not yet populated then copy the connections from the
// deprecated field. This will happen in one of two cases:
// 1. If an old Admin generated the CRD
// 2. If new propeller is deployed and is unmarshalling an old CRD.
if len(in.Connections.Upstream) == 0 && len(in.Connections.Downstream) == 0 {
in.Connections.Upstream = in.DeprecatedConnections.UpstreamEdges
in.Connections.Downstream = in.DeprecatedConnections.DownstreamEdges
}

return &in.Connections
}

Expand Down
35 changes: 0 additions & 35 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow_test.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,15 @@
package v1alpha1_test

import (
"encoding/json"
"io/ioutil"
"testing"

"github.com/ghodss/yaml"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
)

func TestMarshalUnmarshal_Connections(t *testing.T) {
r, err := ioutil.ReadFile("testdata/connections.json")
assert.NoError(t, err)
o := v1alpha1.DeprecatedConnections{}
err = json.Unmarshal(r, &o)
assert.NoError(t, err)
assert.Equal(t, map[v1alpha1.NodeID][]v1alpha1.NodeID{
"n1": {"n2", "n3"},
"n2": {"n4"},
"n3": {"n4"},
"n4": {"n5"},
}, o.DownstreamEdges)
assert.Equal(t, []v1alpha1.NodeID{"n1"}, o.UpstreamEdges["n2"])
assert.Equal(t, []v1alpha1.NodeID{"n1"}, o.UpstreamEdges["n3"])
assert.Equal(t, []v1alpha1.NodeID{"n4"}, o.UpstreamEdges["n5"])
assert.True(t, sets.NewString(o.UpstreamEdges["n4"]...).Equal(sets.NewString("n2", "n3")))
}

func ReadYamlFileAsJSON(path string) ([]byte, error) {
r, err := ioutil.ReadFile(path)
if err != nil {
Expand All @@ -38,21 +18,6 @@ func ReadYamlFileAsJSON(path string) ([]byte, error) {
return yaml.YAMLToJSON(r)
}

func TestWorkflowSpec(t *testing.T) {
j, err := ReadYamlFileAsJSON("testdata/workflowspec.yaml")
assert.NoError(t, err)
w := &v1alpha1.FlyteWorkflow{}
err = json.Unmarshal(j, w)
if !assert.NoError(t, err) {
t.FailNow()
}

assert.NotNil(t, w.WorkflowSpec)
assert.Nil(t, w.GetOnFailureNode())
assert.Equal(t, 7, len(w.GetConnections().Downstream))
assert.Equal(t, 8, len(w.GetConnections().Upstream))
}

func TestWorkflowIsInterruptible(t *testing.T) {
w := &v1alpha1.FlyteWorkflow{}

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading