From 7cd32545c6fcc69925ec5507eaa746e27ae10953 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Wed, 30 Nov 2022 14:35:23 +0100 Subject: [PATCH 01/21] Squashed commit of the following: MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit commit 541cd83529626e596d9c891c0889ce08cdf22e8f Author: Haris Osmanagic Date: Tue Nov 29 14:32:49 2022 +0100 simplify tests commit 6737ab407fbeead2d8e737f5ef8e8ed25534d44e Author: Haris Osmanagic Date: Tue Nov 29 13:57:14 2022 +0100 cosmetics commit 9dbc8b1f7ae2a5bf3f7dd764ded8e0535bede1ed Author: Haris Osmanagic Date: Tue Nov 29 13:36:32 2022 +0100 rename commit f218ed229c1661286f179b1ead63a9d85809827e Author: Haris Osmanagic Date: Tue Nov 29 13:18:46 2022 +0100 tests commit b7f2baf5335bc50af51784b4b18e9accc394e1c8 Author: Haris Osmanagic Date: Tue Nov 29 11:40:25 2022 +0100 simplify commit fd699c1a9053625cecfc4eed157e7bb1a35cdb0f Author: Haris Osmanagic Date: Mon Nov 28 13:02:55 2022 +0100 improvements commit 27bf6ec9a5bd85637ee4953b12d0f908153c881c Author: Haris Osmanagic Date: Mon Nov 28 12:47:54 2022 +0100 rename method commit 7ec409d76a5d8b8becc21374b980901dadc36de0 Author: Haris Osmanagic Date: Mon Nov 28 12:09:34 2022 +0100 linter commit 359328286414f24e4cc764842f945afcda493163 Author: Haris Osmanagic Date: Mon Nov 28 12:02:16 2022 +0100 init copy commit 52ffbc7626857cf8c661fff511a156fcf2991157 Author: Haris Osmanagic Date: Tue Nov 22 20:24:40 2022 +0100 setup commit 623ca25cb7db84be1ecc4689f0c631e9ad20ab3e Author: Haris Osmanagic Date: Tue Nov 22 12:10:43 2022 +0100 pr feedback 3 commit 83c6152588fac5cd55be60b6acb0cfb0ad98c22d Author: Haris Osmanagic Date: Tue Nov 22 11:34:29 2022 +0100 Revert "Update pkg/web/api/connector_v1_test.go" This reverts commit 8f40c222cfe965a7f4639df6a4de266836194b50. commit 6df36fc37013ca785de23185df78ce2ce4f71f03 Merge: dd8b1b4 8f40c22 Author: Haris Osmanagic Date: Tue Nov 22 11:29:21 2022 +0100 Merge branch 'haris/stream-inspector-destination' of github.com:ConduitIO/conduit into haris/stream-inspector-destination commit dd8b1b48c8a2c721989a349dfa1168396c460cf5 Author: Haris Osmanagic Date: Tue Nov 22 11:28:27 2022 +0100 linter commit 7865b60463dae19df0261b6abab4109eacdfffe7 Author: Haris Osmanagic Date: Tue Nov 22 11:26:07 2022 +0100 pr feedback 2 commit 8f40c222cfe965a7f4639df6a4de266836194b50 Author: Haris Osmanagić Date: Tue Nov 22 11:18:58 2022 +0100 Update pkg/web/api/connector_v1_test.go Co-authored-by: Lovro Mažgon commit 5067241b30c6168e6bce63a8983b813c202f3e72 Author: Haris Osmanagić Date: Tue Nov 22 11:08:57 2022 +0100 Update pkg/inspector/inspector_test.go Co-authored-by: Lovro Mažgon commit e7b3306af1a1b6815746d1b6c950cde39b5facad Author: Haris Osmanagić Date: Tue Nov 22 11:08:17 2022 +0100 Update pkg/inspector/inspector_test.go Co-authored-by: Lovro Mažgon commit 3a0b31bd20f3d4ded935e656f74419f72ca97248 Author: Haris Osmanagić Date: Tue Nov 22 10:58:40 2022 +0100 Update pkg/inspector/inspector_test.go Co-authored-by: Lovro Mažgon commit 1a944582e6ebad730221383f8b8a1bbb9c93a6e6 Author: Haris Osmanagic Date: Mon Nov 21 14:06:27 2022 +0100 linter commit ae1408f319ac44adebb6d0d9a3e1dc4821b1d3b5 Author: Haris Osmanagic Date: Mon Nov 21 14:05:46 2022 +0100 linter commit 1848a057295d9b9ea3f0ef33ea0c9db7abf1ff5b Author: Haris Osmanagic Date: Mon Nov 21 14:05:01 2022 +0100 Stream inspector for destinations --- go.mod | 4 +- go.sum | 9 +- pkg/conduit/runtime.go | 1 + pkg/foundation/grpcutil/gateway.go | 5 +- pkg/foundation/grpcutil/websocket.go | 131 ++++++++++++++++++++++ pkg/foundation/grpcutil/websocket_test.go | 88 +++++++++++++++ 6 files changed, 225 insertions(+), 13 deletions(-) create mode 100644 pkg/foundation/grpcutil/websocket.go create mode 100644 pkg/foundation/grpcutil/websocket_test.go diff --git a/go.mod b/go.mod index a9fae9359..0ec6080ab 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( github.com/golang/mock v1.6.0 github.com/google/go-cmp v0.5.9 github.com/google/uuid v1.3.0 + github.com/gorilla/websocket v1.5.0 github.com/grpc-ecosystem/grpc-gateway/v2 v2.14.0 github.com/hashicorp/go-hclog v1.3.1 github.com/hashicorp/go-plugin v1.4.6 @@ -32,7 +33,6 @@ require ( github.com/prometheus/client_model v0.3.0 github.com/prometheus/common v0.37.0 github.com/rs/zerolog v1.28.0 - github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75 go.buf.build/grpc/go/conduitio/conduit-connector-protocol v1.4.3 go.buf.build/protocolbuffers/go/grpc-ecosystem/grpc-gateway v1.3.50 golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 @@ -84,7 +84,6 @@ require ( github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/flatbuffers v2.0.0+incompatible // indirect - github.com/gorilla/websocket v1.4.2 // indirect github.com/hashicorp/yamux v0.1.1 // indirect github.com/jackc/chunkreader/v2 v2.0.1 // indirect github.com/jackc/pgconn v1.13.0 // indirect @@ -109,7 +108,6 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/procfs v0.8.0 // indirect github.com/segmentio/kafka-go v0.4.35 // indirect - github.com/sirupsen/logrus v1.8.1 // indirect github.com/xdg/scram v1.0.5 // indirect github.com/xdg/stringprep v1.0.3 // indirect github.com/xitongsys/parquet-go v1.6.2 // indirect diff --git a/go.sum b/go.sum index 7e10d50b4..c322f7f2e 100644 --- a/go.sum +++ b/go.sum @@ -308,8 +308,8 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= -github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= -github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/grpc-ecosystem/grpc-gateway/v2 v2.14.0 h1:t7uX3JBHdVwAi3G7sSSdbsk8NfgA+LnUS88V/2EKaA0= github.com/grpc-ecosystem/grpc-gateway/v2 v2.14.0/go.mod h1:4OGVnY4qf2+gw+ssiHbW+pq4mo2yko94YxxMmXZ7jCA= @@ -532,8 +532,6 @@ github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPx github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= -github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= -github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= @@ -559,8 +557,6 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75 h1:6fotK7otjonDflCTK0BCfls4SPy3NcCVb5dqqmbRknE= -github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75/go.mod h1:KO6IkyS8Y3j8OdNO85qEYBsRPuteD+YciPomcXdrMnk= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/xdg/scram v1.0.5 h1:TuS0RFmt5Is5qm9Tm2SoD89OPqe4IRiFtyFY4iwWXsw= github.com/xdg/scram v1.0.5/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= @@ -710,7 +706,6 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20211123203042-d83791d6bcd9/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= diff --git a/pkg/conduit/runtime.go b/pkg/conduit/runtime.go index 3de42a7c8..36c48c4ba 100644 --- a/pkg/conduit/runtime.go +++ b/pkg/conduit/runtime.go @@ -445,6 +445,7 @@ func (r *Runtime) serveHTTPAPI( grpcutil.WithDefaultGatewayMiddleware( r.logger, allowCORS(gwmux, "http://localhost:4200"), ), + r.logger, ) return r.serveHTTP( diff --git a/pkg/foundation/grpcutil/gateway.go b/pkg/foundation/grpcutil/gateway.go index 77c9c6275..8cc0a36af 100644 --- a/pkg/foundation/grpcutil/gateway.go +++ b/pkg/foundation/grpcutil/gateway.go @@ -21,7 +21,6 @@ import ( "github.com/conduitio/conduit/pkg/foundation/log" "github.com/google/uuid" "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" - "github.com/tmc/grpc-websocket-proxy/wsproxy" "google.golang.org/protobuf/encoding/protojson" ) @@ -110,8 +109,8 @@ func WithHTTPEndpointHeader(h http.Handler) http.Handler { }) } -func WithWebsockets(h http.Handler) http.Handler { - return wsproxy.WebsocketProxy(h) +func WithWebsockets(h http.Handler, l log.CtxLogger) http.Handler { + return newWebSocketProxy(h, l) } func extractEndpoint(r *http.Request) string { diff --git a/pkg/foundation/grpcutil/websocket.go b/pkg/foundation/grpcutil/websocket.go new file mode 100644 index 000000000..699ea84d8 --- /dev/null +++ b/pkg/foundation/grpcutil/websocket.go @@ -0,0 +1,131 @@ +// Copyright © 2022 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpcutil + +import ( + "bufio" + "context" + "io" + "net/http" + + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/gorilla/websocket" +) + +type inMemoryResponseWriter struct { + io.Writer + header http.Header + code int + closed chan bool +} + +func newInMemoryResponseWriter(writer io.Writer) *inMemoryResponseWriter { + return &inMemoryResponseWriter{ + Writer: writer, + header: http.Header{}, + closed: make(chan bool, 1), + } +} + +func (w *inMemoryResponseWriter) Write(b []byte) (int, error) { + return w.Writer.Write(b) +} +func (w *inMemoryResponseWriter) Header() http.Header { + return w.header +} +func (w *inMemoryResponseWriter) WriteHeader(code int) { + w.code = code +} +func (w *inMemoryResponseWriter) CloseNotify() <-chan bool { + return w.closed +} +func (w *inMemoryResponseWriter) Flush() {} + +// wsProxy is a proxy around a http.Handler which +// redirects the response data from the http.Handler +// to a WebSocket connection. +type wsProxy struct { + handler http.Handler + logger log.CtxLogger + upgrader websocket.Upgrader +} + +func newWebSocketProxy(handler http.Handler, logger log.CtxLogger) *wsProxy { + return &wsProxy{ + handler: handler, + logger: logger.WithComponent("grpcutil.websocket"), + upgrader: websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + }, + } +} + +func (p *wsProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if !websocket.IsWebSocketUpgrade(r) { + p.handler.ServeHTTP(w, r) + return + } + p.proxy(w, r) +} + +func (p *wsProxy) proxy(w http.ResponseWriter, r *http.Request) { + ctx, cancelFn := context.WithCancel(r.Context()) + defer cancelFn() + + // upgrade connection to WebSocket + conn, err := p.upgrader.Upgrade(w, r, http.Header{}) + if err != nil { + p.logger.Err(ctx, err).Msg("error upgrading websocket") + return + } + defer conn.Close() + + // We use a pipe to read the data + // being written to the underlying http.Handler + responseR, responseW := io.Pipe() + response := newInMemoryResponseWriter(responseW) + go func() { + <-ctx.Done() + p.logger.Debug(ctx).Err(ctx.Err()).Msg("closing pipes") + responseW.CloseWithError(io.EOF) + response.closed <- true + }() + + go func() { + defer cancelFn() + p.handler.ServeHTTP(response, r) + }() + + scanner := bufio.NewScanner(responseR) + + for scanner.Scan() { + if len(scanner.Bytes()) == 0 { + p.logger.Warn(ctx).Err(scanner.Err()).Msg("[write] empty scan") + continue + } + + p.logger.Trace(ctx).Msgf("[write] scanned %v", scanner.Text()) + if err := conn.WriteMessage(websocket.TextMessage, scanner.Bytes()); err != nil { + p.logger.Warn(ctx).Err(err).Msg("[write] error writing websocket message") + return + } + } + // todo properly communicate the error to the client + // and close the connection + if sErr := scanner.Err(); sErr != nil { + p.logger.Err(ctx, sErr).Msg("scanner err") + } +} diff --git a/pkg/foundation/grpcutil/websocket_test.go b/pkg/foundation/grpcutil/websocket_test.go new file mode 100644 index 000000000..9440650e0 --- /dev/null +++ b/pkg/foundation/grpcutil/websocket_test.go @@ -0,0 +1,88 @@ +// Copyright © 2022 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpcutil + +import ( + "context" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/gorilla/websocket" + "github.com/matryer/is" +) + +type testHandler struct { + is *is.I + response string +} + +func (h *testHandler) ServeHTTP(w http.ResponseWriter, _ *http.Request) { + _, err := w.Write([]byte(h.response)) + h.is.NoErr(err) +} + +func TestWebSocket_NoUpgradeToWebSocket(t *testing.T) { + is := is.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + h := &testHandler{ + is: is, + response: "hi there", + } + s := httptest.NewServer(newWebSocketProxy(h, log.Nop())) + defer s.Close() + + req, err := http.NewRequestWithContext(ctx, "GET", s.URL, nil) + is.NoErr(err) + + resp, err := http.DefaultClient.Do(req) + is.NoErr(err) + is.True(resp.Body != nil) // expected response to have a body + defer resp.Body.Close() + + bytes, err := io.ReadAll(resp.Body) + is.NoErr(err) + is.Equal(h.response, string(bytes)) +} + +func TestWebSocket_UpgradeToWebSocket(t *testing.T) { + is := is.New(t) + + h := &testHandler{ + is: is, + response: "hi there", + } + s := httptest.NewServer(newWebSocketProxy(h, log.Nop())) + defer s.Close() + + // Convert http to ws + wsURL := "ws" + strings.TrimPrefix(s.URL, "http") + + // Connect to the server + ws, resp, err := websocket.DefaultDialer.Dial(wsURL, nil) + is.NoErr(err) + defer ws.Close() + defer resp.Body.Close() + + msgType, bytes, err := ws.ReadMessage() + is.NoErr(err) + is.Equal(h.response, string(bytes)) + is.Equal(websocket.TextMessage, msgType) +} From b51b9d04149867b1defa8ad3287cad557399f001 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Wed, 30 Nov 2022 15:19:38 +0100 Subject: [PATCH 02/21] send scanner error --- pkg/foundation/grpcutil/websocket.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/pkg/foundation/grpcutil/websocket.go b/pkg/foundation/grpcutil/websocket.go index 699ea84d8..d59f76fe6 100644 --- a/pkg/foundation/grpcutil/websocket.go +++ b/pkg/foundation/grpcutil/websocket.go @@ -27,7 +27,6 @@ import ( type inMemoryResponseWriter struct { io.Writer header http.Header - code int closed chan bool } @@ -45,8 +44,8 @@ func (w *inMemoryResponseWriter) Write(b []byte) (int, error) { func (w *inMemoryResponseWriter) Header() http.Header { return w.header } -func (w *inMemoryResponseWriter) WriteHeader(code int) { - w.code = code +func (w *inMemoryResponseWriter) WriteHeader(int) { + // we don't have a use for the code } func (w *inMemoryResponseWriter) CloseNotify() <-chan bool { return w.closed @@ -93,8 +92,8 @@ func (p *wsProxy) proxy(w http.ResponseWriter, r *http.Request) { } defer conn.Close() - // We use a pipe to read the data - // being written to the underlying http.Handler + // We use a pipe to read the data being written to the underlying http.Handler + // and then write it to the WebSocket connection. responseR, responseW := io.Pipe() response := newInMemoryResponseWriter(responseW) go func() { @@ -123,9 +122,11 @@ func (p *wsProxy) proxy(w http.ResponseWriter, r *http.Request) { return } } - // todo properly communicate the error to the client - // and close the connection + if sErr := scanner.Err(); sErr != nil { p.logger.Err(ctx, sErr).Msg("scanner err") + if err := conn.WriteMessage(websocket.TextMessage, []byte(sErr.Error())); err != nil { + p.logger.Warn(ctx).Err(err).Msg("[write] failed writing scanner error") + } } } From 596753a5482dbc44566e5bcfe15319aa6558e5bd Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Wed, 30 Nov 2022 15:19:59 +0100 Subject: [PATCH 03/21] send scanner error --- pkg/foundation/grpcutil/websocket.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/foundation/grpcutil/websocket.go b/pkg/foundation/grpcutil/websocket.go index d59f76fe6..21c12d708 100644 --- a/pkg/foundation/grpcutil/websocket.go +++ b/pkg/foundation/grpcutil/websocket.go @@ -124,7 +124,7 @@ func (p *wsProxy) proxy(w http.ResponseWriter, r *http.Request) { } if sErr := scanner.Err(); sErr != nil { - p.logger.Err(ctx, sErr).Msg("scanner err") + p.logger.Err(ctx, sErr).Msg("failed reading data from original response") if err := conn.WriteMessage(websocket.TextMessage, []byte(sErr.Error())); err != nil { p.logger.Warn(ctx).Err(err).Msg("[write] failed writing scanner error") } From 80c504dcc33a5561e438014e457d3401ba18eee8 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Mon, 19 Dec 2022 13:21:20 +0100 Subject: [PATCH 04/21] rename --- pkg/foundation/grpcutil/websocket.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/foundation/grpcutil/websocket.go b/pkg/foundation/grpcutil/websocket.go index 21c12d708..d3bbf6c21 100644 --- a/pkg/foundation/grpcutil/websocket.go +++ b/pkg/foundation/grpcutil/websocket.go @@ -52,19 +52,19 @@ func (w *inMemoryResponseWriter) CloseNotify() <-chan bool { } func (w *inMemoryResponseWriter) Flush() {} -// wsProxy is a proxy around a http.Handler which +// webSocketProxy is a proxy around a http.Handler which // redirects the response data from the http.Handler // to a WebSocket connection. -type wsProxy struct { +type webSocketProxy struct { handler http.Handler logger log.CtxLogger upgrader websocket.Upgrader } -func newWebSocketProxy(handler http.Handler, logger log.CtxLogger) *wsProxy { - return &wsProxy{ +func newWebSocketProxy(handler http.Handler, logger log.CtxLogger) *webSocketProxy { + return &webSocketProxy{ handler: handler, - logger: logger.WithComponent("grpcutil.websocket"), + logger: logger.WithComponent("grpcutil.webSocketProxy"), upgrader: websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, @@ -72,7 +72,7 @@ func newWebSocketProxy(handler http.Handler, logger log.CtxLogger) *wsProxy { } } -func (p *wsProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (p *webSocketProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { if !websocket.IsWebSocketUpgrade(r) { p.handler.ServeHTTP(w, r) return @@ -80,7 +80,7 @@ func (p *wsProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { p.proxy(w, r) } -func (p *wsProxy) proxy(w http.ResponseWriter, r *http.Request) { +func (p *webSocketProxy) proxy(w http.ResponseWriter, r *http.Request) { ctx, cancelFn := context.WithCancel(r.Context()) defer cancelFn() From 692a9882a048c261e153e69dd5c235092089eca1 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Wed, 21 Dec 2022 11:53:23 +0100 Subject: [PATCH 05/21] read loop --- pkg/foundation/grpcutil/websocket.go | 67 +++++++++++++++++++++++++--- 1 file changed, 61 insertions(+), 6 deletions(-) diff --git a/pkg/foundation/grpcutil/websocket.go b/pkg/foundation/grpcutil/websocket.go index d3bbf6c21..ba3db4b20 100644 --- a/pkg/foundation/grpcutil/websocket.go +++ b/pkg/foundation/grpcutil/websocket.go @@ -19,6 +19,8 @@ import ( "context" "io" "net/http" + "strings" + "time" "github.com/conduitio/conduit/pkg/foundation/log" "github.com/gorilla/websocket" @@ -56,13 +58,17 @@ func (w *inMemoryResponseWriter) Flush() {} // redirects the response data from the http.Handler // to a WebSocket connection. type webSocketProxy struct { - handler http.Handler - logger log.CtxLogger - upgrader websocket.Upgrader + handler http.Handler + logger log.CtxLogger + upgrader websocket.Upgrader + conn *websocket.Conn + pingInterval time.Duration + pongWait time.Duration + pingWait time.Duration } func newWebSocketProxy(handler http.Handler, logger log.CtxLogger) *webSocketProxy { - return &webSocketProxy{ + proxy := &webSocketProxy{ handler: handler, logger: logger.WithComponent("grpcutil.webSocketProxy"), upgrader: websocket.Upgrader{ @@ -70,6 +76,11 @@ func newWebSocketProxy(handler http.Handler, logger log.CtxLogger) *webSocketPro WriteBufferSize: 1024, }, } + proxy.pingInterval = 30 * time.Second + proxy.pongWait = (proxy.pingInterval * 10) / 9 + proxy.pingWait = proxy.pongWait / 6 + + return proxy } func (p *webSocketProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -92,6 +103,8 @@ func (p *webSocketProxy) proxy(w http.ResponseWriter, r *http.Request) { } defer conn.Close() + p.conn = conn + // We use a pipe to read the data being written to the underlying http.Handler // and then write it to the WebSocket connection. responseR, responseW := io.Pipe() @@ -103,11 +116,14 @@ func (p *webSocketProxy) proxy(w http.ResponseWriter, r *http.Request) { response.closed <- true }() + // Start the "underlying" http.Handler go func() { defer cancelFn() p.handler.ServeHTTP(response, r) }() + go p.startReadLoop(ctx) + scanner := bufio.NewScanner(responseR) for scanner.Scan() { @@ -117,7 +133,7 @@ func (p *webSocketProxy) proxy(w http.ResponseWriter, r *http.Request) { } p.logger.Trace(ctx).Msgf("[write] scanned %v", scanner.Text()) - if err := conn.WriteMessage(websocket.TextMessage, scanner.Bytes()); err != nil { + if err := p.conn.WriteMessage(websocket.TextMessage, scanner.Bytes()); err != nil { p.logger.Warn(ctx).Err(err).Msg("[write] error writing websocket message") return } @@ -125,8 +141,47 @@ func (p *webSocketProxy) proxy(w http.ResponseWriter, r *http.Request) { if sErr := scanner.Err(); sErr != nil { p.logger.Err(ctx, sErr).Msg("failed reading data from original response") - if err := conn.WriteMessage(websocket.TextMessage, []byte(sErr.Error())); err != nil { + if err := p.conn.WriteMessage(websocket.TextMessage, []byte(sErr.Error())); err != nil { p.logger.Warn(ctx).Err(err).Msg("[write] failed writing scanner error") } } } + +// startReadLoop starts a read loop on the proxy's WebSocket connection +func (p *webSocketProxy) startReadLoop(ctx context.Context) { + p.conn.SetReadLimit(512) + // todo handle errors + p.conn.SetReadDeadline(time.Now().Add(p.pongWait)) + p.conn.SetPongHandler(func(string) error { + p.conn.SetReadDeadline(time.Now().Add(p.pongWait)) + return nil + }) + + for { + // The only use we have for reads right now + // is for ping, pong and close messages. + // https://pkg.go.dev/github.com/gorilla/websocket#hdr-Control_Messages + // Also, a read loop can detect client disconnects much quicker: + // https://groups.google.com/g/golang-nuts/c/FFzQO26jEoE/m/mYhcsK20EwAJ + _, _, err := p.conn.ReadMessage() + if err != nil { + if p.isClosedConnErr(err) { + p.logger.Warn(ctx).Err(err).Msg("read error") + } + break + } + } +} + +func (p *webSocketProxy) isClosedConnErr(err error) bool { + str := err.Error() + if strings.Contains(str, "use of closed network connection") { + return true + } + return websocket.IsCloseError( + err, + websocket.CloseNormalClosure, + websocket.CloseGoingAway, + websocket.CloseAbnormalClosure, + ) +} From e3a87a15dc202352d5cddcd7019b9fb0158ad1d3 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Wed, 21 Dec 2022 12:04:22 +0100 Subject: [PATCH 06/21] refactor + ping write loop --- pkg/foundation/grpcutil/websocket.go | 84 ++++++++++++++++++++-------- 1 file changed, 60 insertions(+), 24 deletions(-) diff --git a/pkg/foundation/grpcutil/websocket.go b/pkg/foundation/grpcutil/websocket.go index ba3db4b20..2c28c02b1 100644 --- a/pkg/foundation/grpcutil/websocket.go +++ b/pkg/foundation/grpcutil/websocket.go @@ -122,33 +122,13 @@ func (p *webSocketProxy) proxy(w http.ResponseWriter, r *http.Request) { p.handler.ServeHTTP(response, r) }() - go p.startReadLoop(ctx) - - scanner := bufio.NewScanner(responseR) - - for scanner.Scan() { - if len(scanner.Bytes()) == 0 { - p.logger.Warn(ctx).Err(scanner.Err()).Msg("[write] empty scan") - continue - } - - p.logger.Trace(ctx).Msgf("[write] scanned %v", scanner.Text()) - if err := p.conn.WriteMessage(websocket.TextMessage, scanner.Bytes()); err != nil { - p.logger.Warn(ctx).Err(err).Msg("[write] error writing websocket message") - return - } - } - - if sErr := scanner.Err(); sErr != nil { - p.logger.Err(ctx, sErr).Msg("failed reading data from original response") - if err := p.conn.WriteMessage(websocket.TextMessage, []byte(sErr.Error())); err != nil { - p.logger.Warn(ctx).Err(err).Msg("[write] failed writing scanner error") - } - } + go p.startReadLoop(ctx, cancelFn) + go p.pingWriteLoop(ctx) + p.startWriteLoop(ctx, responseR) } // startReadLoop starts a read loop on the proxy's WebSocket connection -func (p *webSocketProxy) startReadLoop(ctx context.Context) { +func (p *webSocketProxy) startReadLoop(ctx context.Context, cancelFn context.CancelFunc) { p.conn.SetReadLimit(512) // todo handle errors p.conn.SetReadDeadline(time.Now().Add(p.pongWait)) @@ -157,7 +137,19 @@ func (p *webSocketProxy) startReadLoop(ctx context.Context) { return nil }) + // The read loop will stop only if the request context was cancelled, + // or if there's been an error reading a message. + // Because of the latter, we need to cancel the request context. + defer func() { + cancelFn() + }() for { + select { + case <-ctx.Done(): + p.logger.Debug(ctx).Msg("read loop done because request context was cancelled") + return + default: + } // The only use we have for reads right now // is for ping, pong and close messages. // https://pkg.go.dev/github.com/gorilla/websocket#hdr-Control_Messages @@ -185,3 +177,47 @@ func (p *webSocketProxy) isClosedConnErr(err error) bool { websocket.CloseAbnormalClosure, ) } + +func (p *webSocketProxy) startWriteLoop(ctx context.Context, responseReader *io.PipeReader) { + scanner := bufio.NewScanner(responseReader) + + for scanner.Scan() { + if len(scanner.Bytes()) == 0 { + p.logger.Warn(ctx).Err(scanner.Err()).Msg("[write] empty scan") + continue + } + + p.logger.Trace(ctx).Msgf("[write] scanned %v", scanner.Text()) + if err := p.conn.WriteMessage(websocket.TextMessage, scanner.Bytes()); err != nil { + p.logger.Warn(ctx).Err(err).Msg("[write] error writing websocket message") + return + } + } + + if sErr := scanner.Err(); sErr != nil { + p.logger.Err(ctx, sErr).Msg("failed reading data from original response") + if err := p.conn.WriteMessage(websocket.TextMessage, []byte(sErr.Error())); err != nil { + p.logger.Warn(ctx).Err(err).Msg("[write] failed writing scanner error") + } + } +} + +func (p *webSocketProxy) pingWriteLoop(ctx context.Context) { + ticker := time.NewTicker(p.pingInterval) + defer func() { + ticker.Stop() + p.conn.Close() + }() + for { + select { + case <-ctx.Done(): + p.logger.Debug(ctx).Msg("stopped pinging write loop because request context was cancelled") + return + case <-ticker.C: + p.conn.SetWriteDeadline(time.Now().Add(p.pingWait)) + if err := p.conn.WriteMessage(websocket.PingMessage, nil); err != nil { + return + } + } + } +} From 2072f7d85a878e4422e690de72f1f01be963e2f8 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Wed, 21 Dec 2022 12:26:41 +0100 Subject: [PATCH 07/21] refactor --- pkg/foundation/grpcutil/websocket.go | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/pkg/foundation/grpcutil/websocket.go b/pkg/foundation/grpcutil/websocket.go index 2c28c02b1..221321fd7 100644 --- a/pkg/foundation/grpcutil/websocket.go +++ b/pkg/foundation/grpcutil/websocket.go @@ -129,20 +129,26 @@ func (p *webSocketProxy) proxy(w http.ResponseWriter, r *http.Request) { // startReadLoop starts a read loop on the proxy's WebSocket connection func (p *webSocketProxy) startReadLoop(ctx context.Context, cancelFn context.CancelFunc) { + // The read loop will stop only if the request context was cancelled, + // or if there's been an error reading a message. + // Because of the latter, we need to cancel the request context. + defer cancelFn() + p.conn.SetReadLimit(512) - // todo handle errors - p.conn.SetReadDeadline(time.Now().Add(p.pongWait)) + err := p.conn.SetReadDeadline(time.Now().Add(p.pongWait)) + if err != nil { + p.logger.Warn(ctx).Err(err).Msgf("couldn't set read deadline %v", p.pongWait) + return + } p.conn.SetPongHandler(func(string) error { - p.conn.SetReadDeadline(time.Now().Add(p.pongWait)) + err := p.conn.SetReadDeadline(time.Now().Add(p.pongWait)) + if err != nil { + // todo return err? + p.logger.Warn(ctx).Err(err).Msgf("couldn't set read deadline %v", p.pongWait) + } return nil }) - // The read loop will stop only if the request context was cancelled, - // or if there's been an error reading a message. - // Because of the latter, we need to cancel the request context. - defer func() { - cancelFn() - }() for { select { case <-ctx.Done(): @@ -214,6 +220,7 @@ func (p *webSocketProxy) pingWriteLoop(ctx context.Context) { p.logger.Debug(ctx).Msg("stopped pinging write loop because request context was cancelled") return case <-ticker.C: + // todo check error p.conn.SetWriteDeadline(time.Now().Add(p.pingWait)) if err := p.conn.WriteMessage(websocket.PingMessage, nil); err != nil { return From a16ca053c76e560673cb664dc52ccf506b2d9f02 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Wed, 21 Dec 2022 13:04:30 +0100 Subject: [PATCH 08/21] fix pingWriteLoop --- pkg/foundation/grpcutil/websocket.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/pkg/foundation/grpcutil/websocket.go b/pkg/foundation/grpcutil/websocket.go index 221321fd7..d960c4d82 100644 --- a/pkg/foundation/grpcutil/websocket.go +++ b/pkg/foundation/grpcutil/websocket.go @@ -123,7 +123,7 @@ func (p *webSocketProxy) proxy(w http.ResponseWriter, r *http.Request) { }() go p.startReadLoop(ctx, cancelFn) - go p.pingWriteLoop(ctx) + go p.pingWriteLoop(cancelFn) p.startWriteLoop(ctx, responseR) } @@ -208,17 +208,14 @@ func (p *webSocketProxy) startWriteLoop(ctx context.Context, responseReader *io. } } -func (p *webSocketProxy) pingWriteLoop(ctx context.Context) { +func (p *webSocketProxy) pingWriteLoop(cancelFn context.CancelFunc) { ticker := time.NewTicker(p.pingInterval) defer func() { ticker.Stop() - p.conn.Close() + cancelFn() }() for { select { - case <-ctx.Done(): - p.logger.Debug(ctx).Msg("stopped pinging write loop because request context was cancelled") - return case <-ticker.C: // todo check error p.conn.SetWriteDeadline(time.Now().Add(p.pingWait)) From 2ae9538edab0ceb0c5e4442f95d8f11485b3de32 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Wed, 21 Dec 2022 13:06:49 +0100 Subject: [PATCH 09/21] fixes --- pkg/foundation/grpcutil/websocket.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/pkg/foundation/grpcutil/websocket.go b/pkg/foundation/grpcutil/websocket.go index d960c4d82..fb9e91b19 100644 --- a/pkg/foundation/grpcutil/websocket.go +++ b/pkg/foundation/grpcutil/websocket.go @@ -214,14 +214,11 @@ func (p *webSocketProxy) pingWriteLoop(cancelFn context.CancelFunc) { ticker.Stop() cancelFn() }() - for { - select { - case <-ticker.C: - // todo check error - p.conn.SetWriteDeadline(time.Now().Add(p.pingWait)) - if err := p.conn.WriteMessage(websocket.PingMessage, nil); err != nil { - return - } + for range ticker.C { + // todo check error + p.conn.SetWriteDeadline(time.Now().Add(p.pingWait)) //nolint:errcheck // gorilla/websocket always returns nil here + if err := p.conn.WriteMessage(websocket.PingMessage, nil); err != nil { + return } } } From eb7e9863dde45c9889b6df23067960f7e6d080f4 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Wed, 21 Dec 2022 14:05:43 +0100 Subject: [PATCH 10/21] comments --- pkg/foundation/grpcutil/websocket.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/pkg/foundation/grpcutil/websocket.go b/pkg/foundation/grpcutil/websocket.go index fb9e91b19..050ecf281 100644 --- a/pkg/foundation/grpcutil/websocket.go +++ b/pkg/foundation/grpcutil/websocket.go @@ -58,13 +58,17 @@ func (w *inMemoryResponseWriter) Flush() {} // redirects the response data from the http.Handler // to a WebSocket connection. type webSocketProxy struct { - handler http.Handler - logger log.CtxLogger - upgrader websocket.Upgrader - conn *websocket.Conn + handler http.Handler + logger log.CtxLogger + upgrader websocket.Upgrader + conn *websocket.Conn + + // pingInterval is the interval at which the proxy sends pings pingInterval time.Duration - pongWait time.Duration - pingWait time.Duration + // pongWait is the maximum allowed wait for a pong response + pongWait time.Duration + // pingWait is the maximum allowed wait for writing a ping message + pingWait time.Duration } func newWebSocketProxy(handler http.Handler, logger log.CtxLogger) *webSocketProxy { From b584fcf2c7fa19b0330eeaba117b331e2ada66aa Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Wed, 21 Dec 2022 14:34:25 +0100 Subject: [PATCH 11/21] remove ping-pong --- pkg/foundation/grpcutil/websocket.go | 42 ---------------------------- 1 file changed, 42 deletions(-) diff --git a/pkg/foundation/grpcutil/websocket.go b/pkg/foundation/grpcutil/websocket.go index 050ecf281..8d5968122 100644 --- a/pkg/foundation/grpcutil/websocket.go +++ b/pkg/foundation/grpcutil/websocket.go @@ -20,7 +20,6 @@ import ( "io" "net/http" "strings" - "time" "github.com/conduitio/conduit/pkg/foundation/log" "github.com/gorilla/websocket" @@ -62,13 +61,6 @@ type webSocketProxy struct { logger log.CtxLogger upgrader websocket.Upgrader conn *websocket.Conn - - // pingInterval is the interval at which the proxy sends pings - pingInterval time.Duration - // pongWait is the maximum allowed wait for a pong response - pongWait time.Duration - // pingWait is the maximum allowed wait for writing a ping message - pingWait time.Duration } func newWebSocketProxy(handler http.Handler, logger log.CtxLogger) *webSocketProxy { @@ -80,9 +72,6 @@ func newWebSocketProxy(handler http.Handler, logger log.CtxLogger) *webSocketPro WriteBufferSize: 1024, }, } - proxy.pingInterval = 30 * time.Second - proxy.pongWait = (proxy.pingInterval * 10) / 9 - proxy.pingWait = proxy.pongWait / 6 return proxy } @@ -127,7 +116,6 @@ func (p *webSocketProxy) proxy(w http.ResponseWriter, r *http.Request) { }() go p.startReadLoop(ctx, cancelFn) - go p.pingWriteLoop(cancelFn) p.startWriteLoop(ctx, responseR) } @@ -138,21 +126,6 @@ func (p *webSocketProxy) startReadLoop(ctx context.Context, cancelFn context.Can // Because of the latter, we need to cancel the request context. defer cancelFn() - p.conn.SetReadLimit(512) - err := p.conn.SetReadDeadline(time.Now().Add(p.pongWait)) - if err != nil { - p.logger.Warn(ctx).Err(err).Msgf("couldn't set read deadline %v", p.pongWait) - return - } - p.conn.SetPongHandler(func(string) error { - err := p.conn.SetReadDeadline(time.Now().Add(p.pongWait)) - if err != nil { - // todo return err? - p.logger.Warn(ctx).Err(err).Msgf("couldn't set read deadline %v", p.pongWait) - } - return nil - }) - for { select { case <-ctx.Done(): @@ -211,18 +184,3 @@ func (p *webSocketProxy) startWriteLoop(ctx context.Context, responseReader *io. } } } - -func (p *webSocketProxy) pingWriteLoop(cancelFn context.CancelFunc) { - ticker := time.NewTicker(p.pingInterval) - defer func() { - ticker.Stop() - cancelFn() - }() - for range ticker.C { - // todo check error - p.conn.SetWriteDeadline(time.Now().Add(p.pingWait)) //nolint:errcheck // gorilla/websocket always returns nil here - if err := p.conn.WriteMessage(websocket.PingMessage, nil); err != nil { - return - } - } -} From 600ef2fdd02638235c8d19190c8ebe51263a27ca Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Thu, 22 Dec 2022 14:19:16 +0100 Subject: [PATCH 12/21] Squashed commit of the following: commit 5dad6312565c5d84f3053c512e1c9d721e9ecb4f Author: Haris Osmanagic Date: Thu Dec 22 13:34:34 2022 +0100 linter commit 3c9fef2b4a51d1ca08e2b76946a1122c6763cdbe Author: Haris Osmanagic Date: Thu Dec 22 13:07:45 2022 +0100 ctx check commit 7368327a2617fd6d61525261b348dfaf1101ca7d Author: Haris Osmanagic Date: Thu Dec 22 11:55:49 2022 +0100 ping pongs working commit 088852ff664b335e8777b54a90cbaf32ac4e61d0 Author: Haris Osmanagic Date: Wed Dec 21 14:55:49 2022 +0100 Revert "remove ping-pong" This reverts commit b584fcf2c7fa19b0330eeaba117b331e2ada66aa. --- pkg/foundation/grpcutil/websocket.go | 123 +++++++++++++++++----- pkg/foundation/grpcutil/websocket_test.go | 27 +++++ 2 files changed, 122 insertions(+), 28 deletions(-) diff --git a/pkg/foundation/grpcutil/websocket.go b/pkg/foundation/grpcutil/websocket.go index 8d5968122..41f790b7b 100644 --- a/pkg/foundation/grpcutil/websocket.go +++ b/pkg/foundation/grpcutil/websocket.go @@ -20,6 +20,7 @@ import ( "io" "net/http" "strings" + "time" "github.com/conduitio/conduit/pkg/foundation/log" "github.com/gorilla/websocket" @@ -60,7 +61,13 @@ type webSocketProxy struct { handler http.Handler logger log.CtxLogger upgrader websocket.Upgrader - conn *websocket.Conn + + // Time allowed to write a message to the peer. + writeWait time.Duration + // Time allowed to read the next pong message from the peer. + pongWait time.Duration + // Send pings to peer with this period. Must be less than pongWait. + pingPeriod time.Duration } func newWebSocketProxy(handler http.Handler, logger log.CtxLogger) *webSocketProxy { @@ -72,6 +79,9 @@ func newWebSocketProxy(handler http.Handler, logger log.CtxLogger) *webSocketPro WriteBufferSize: 1024, }, } + proxy.writeWait = 10 * time.Second + proxy.pongWait = 60 * time.Second + proxy.pingPeriod = (proxy.pongWait * 9) / 10 return proxy } @@ -96,8 +106,6 @@ func (p *webSocketProxy) proxy(w http.ResponseWriter, r *http.Request) { } defer conn.Close() - p.conn = conn - // We use a pipe to read the data being written to the underlying http.Handler // and then write it to the WebSocket connection. responseR, responseW := io.Pipe() @@ -115,16 +123,33 @@ func (p *webSocketProxy) proxy(w http.ResponseWriter, r *http.Request) { p.handler.ServeHTTP(response, r) }() - go p.startReadLoop(ctx, cancelFn) - p.startWriteLoop(ctx, responseR) + go p.startWebSocketRead(ctx, conn, cancelFn) + messages := p.readFromHTTPResponse(ctx, responseR) + p.startWebSocketWrite(ctx, messages, conn, cancelFn) } -// startReadLoop starts a read loop on the proxy's WebSocket connection -func (p *webSocketProxy) startReadLoop(ctx context.Context, cancelFn context.CancelFunc) { - // The read loop will stop only if the request context was cancelled, - // or if there's been an error reading a message. - // Because of the latter, we need to cancel the request context. - defer cancelFn() +// startWebSocketRead starts a read loop on the proxy's WebSocket connection +// The read loop will stop: +// 1. if the request context was cancelled, or +// 2. if there's been an error reading a message. +func (p *webSocketProxy) startWebSocketRead(ctx context.Context, conn *websocket.Conn, onDone func()) { + defer onDone() + + conn.SetReadLimit(512) + err := conn.SetReadDeadline(time.Now().Add(p.pongWait)) + if err != nil { + p.logger.Warn(ctx).Err(err).Msgf("couldn't set read deadline %v", p.pongWait) + return + } + + conn.SetPongHandler(func(string) error { + err := conn.SetReadDeadline(time.Now().Add(p.pongWait)) + if err != nil { + // todo return err? + p.logger.Warn(ctx).Err(err).Msgf("couldn't set read deadline %v", p.pongWait) + } + return nil + }) for { select { @@ -133,16 +158,19 @@ func (p *webSocketProxy) startReadLoop(ctx context.Context, cancelFn context.Can return default: } + // The only use we have for reads right now // is for ping, pong and close messages. // https://pkg.go.dev/github.com/gorilla/websocket#hdr-Control_Messages // Also, a read loop can detect client disconnects much quicker: // https://groups.google.com/g/golang-nuts/c/FFzQO26jEoE/m/mYhcsK20EwAJ - _, _, err := p.conn.ReadMessage() + _, _, err := conn.ReadMessage() if err != nil { if p.isClosedConnErr(err) { - p.logger.Warn(ctx).Err(err).Msg("read error") + p.logger.Debug(ctx).Err(err).Msg("closed connection") } + + p.logger.Warn(ctx).Err(err).Msg("read error") break } } @@ -161,26 +189,65 @@ func (p *webSocketProxy) isClosedConnErr(err error) bool { ) } -func (p *webSocketProxy) startWriteLoop(ctx context.Context, responseReader *io.PipeReader) { - scanner := bufio.NewScanner(responseReader) +func (p *webSocketProxy) readFromHTTPResponse(ctx context.Context, responseReader *io.PipeReader) chan []byte { + c := make(chan []byte) + go func() { + scanner := bufio.NewScanner(responseReader) + + for scanner.Scan() { + if len(scanner.Bytes()) == 0 { + p.logger.Warn(ctx).Err(scanner.Err()).Msg("[write] empty scan") + continue + } - for scanner.Scan() { - if len(scanner.Bytes()) == 0 { - p.logger.Warn(ctx).Err(scanner.Err()).Msg("[write] empty scan") - continue + p.logger.Trace(ctx).Msgf("[write] scanned %v", scanner.Text()) + c <- scanner.Bytes() } - p.logger.Trace(ctx).Msgf("[write] scanned %v", scanner.Text()) - if err := p.conn.WriteMessage(websocket.TextMessage, scanner.Bytes()); err != nil { - p.logger.Warn(ctx).Err(err).Msg("[write] error writing websocket message") - return + if sErr := scanner.Err(); sErr != nil { + p.logger.Err(ctx, sErr).Msg("failed reading data from original response") + c <- []byte(sErr.Error()) } - } - if sErr := scanner.Err(); sErr != nil { - p.logger.Err(ctx, sErr).Msg("failed reading data from original response") - if err := p.conn.WriteMessage(websocket.TextMessage, []byte(sErr.Error())); err != nil { - p.logger.Warn(ctx).Err(err).Msg("[write] failed writing scanner error") + p.logger.Debug(ctx).Msg("scanner reached end of input data") + close(c) + }() + + return c +} + +func (p *webSocketProxy) startWebSocketWrite(ctx context.Context, messages chan []byte, conn *websocket.Conn, onDone func()) { + ticker := time.NewTicker(p.pingPeriod) + defer func() { + ticker.Stop() + onDone() + }() + + for { + select { + case <-ctx.Done(): + p.logger.Debug(ctx).Msg("write loop done because request context was cancelled") + return + case message, ok := <-messages: + conn.SetWriteDeadline(time.Now().Add(p.writeWait)) //nolint:errcheck // always returns nil + if !ok { + // readFromHTTPResponse closed the channel. + err := conn.WriteMessage(websocket.CloseMessage, []byte{}) + if err != nil { + p.logger.Warn(ctx).Err(err).Msg("[write] failed sending close message") + } + return + } + + if err := conn.WriteMessage(websocket.TextMessage, message); err != nil { + p.logger.Warn(ctx).Err(err).Msg("[write] error writing websocket message") + return + } + case <-ticker.C: + conn.SetWriteDeadline(time.Now().Add(p.writeWait)) //nolint:errcheck // always returns nil + if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { + return + } } } } diff --git a/pkg/foundation/grpcutil/websocket_test.go b/pkg/foundation/grpcutil/websocket_test.go index 9440650e0..807cc268a 100644 --- a/pkg/foundation/grpcutil/websocket_test.go +++ b/pkg/foundation/grpcutil/websocket_test.go @@ -86,3 +86,30 @@ func TestWebSocket_UpgradeToWebSocket(t *testing.T) { is.Equal(h.response, string(bytes)) is.Equal(websocket.TextMessage, msgType) } + +func TestWebSocket_PingPong(t *testing.T) { + is := is.New(t) + + h := &testHandler{ + is: is, + response: "hi there", + } + s := httptest.NewServer(newWebSocketProxy(h, log.Nop())) + defer s.Close() + + // Convert http to ws + wsURL := "ws" + strings.TrimPrefix(s.URL, "http") + + // Connect to the server + ws, resp, err := websocket.DefaultDialer.Dial(wsURL, nil) + is.NoErr(err) + defer ws.Close() + defer resp.Body.Close() + + pinged := false + ws.SetPingHandler(func(appData string) error { + pinged = true + return nil + }) + is.True(pinged) +} From b55a906bbfc53a40f0cbe78c3b0ecb7a812e0624 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Thu, 22 Dec 2022 18:56:25 +0100 Subject: [PATCH 13/21] remove tests --- pkg/foundation/grpcutil/websocket_test.go | 54 ----------------------- 1 file changed, 54 deletions(-) diff --git a/pkg/foundation/grpcutil/websocket_test.go b/pkg/foundation/grpcutil/websocket_test.go index 807cc268a..a90b18e0e 100644 --- a/pkg/foundation/grpcutil/websocket_test.go +++ b/pkg/foundation/grpcutil/websocket_test.go @@ -19,11 +19,9 @@ import ( "io" "net/http" "net/http/httptest" - "strings" "testing" "github.com/conduitio/conduit/pkg/foundation/log" - "github.com/gorilla/websocket" "github.com/matryer/is" ) @@ -61,55 +59,3 @@ func TestWebSocket_NoUpgradeToWebSocket(t *testing.T) { is.NoErr(err) is.Equal(h.response, string(bytes)) } - -func TestWebSocket_UpgradeToWebSocket(t *testing.T) { - is := is.New(t) - - h := &testHandler{ - is: is, - response: "hi there", - } - s := httptest.NewServer(newWebSocketProxy(h, log.Nop())) - defer s.Close() - - // Convert http to ws - wsURL := "ws" + strings.TrimPrefix(s.URL, "http") - - // Connect to the server - ws, resp, err := websocket.DefaultDialer.Dial(wsURL, nil) - is.NoErr(err) - defer ws.Close() - defer resp.Body.Close() - - msgType, bytes, err := ws.ReadMessage() - is.NoErr(err) - is.Equal(h.response, string(bytes)) - is.Equal(websocket.TextMessage, msgType) -} - -func TestWebSocket_PingPong(t *testing.T) { - is := is.New(t) - - h := &testHandler{ - is: is, - response: "hi there", - } - s := httptest.NewServer(newWebSocketProxy(h, log.Nop())) - defer s.Close() - - // Convert http to ws - wsURL := "ws" + strings.TrimPrefix(s.URL, "http") - - // Connect to the server - ws, resp, err := websocket.DefaultDialer.Dial(wsURL, nil) - is.NoErr(err) - defer ws.Close() - defer resp.Body.Close() - - pinged := false - ws.SetPingHandler(func(appData string) error { - pinged = true - return nil - }) - is.True(pinged) -} From 47f6221899209482132b97622b19a79c84d66f1a Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Thu, 22 Dec 2022 18:58:53 +0100 Subject: [PATCH 14/21] refactor --- pkg/foundation/grpcutil/websocket.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/pkg/foundation/grpcutil/websocket.go b/pkg/foundation/grpcutil/websocket.go index 41f790b7b..18ef3a23b 100644 --- a/pkg/foundation/grpcutil/websocket.go +++ b/pkg/foundation/grpcutil/websocket.go @@ -54,6 +54,11 @@ func (w *inMemoryResponseWriter) CloseNotify() <-chan bool { } func (w *inMemoryResponseWriter) Flush() {} +var ( + defaultWriteWait = 10 * time.Second + defaultPongWait = 60 * time.Second +) + // webSocketProxy is a proxy around a http.Handler which // redirects the response data from the http.Handler // to a WebSocket connection. @@ -78,10 +83,10 @@ func newWebSocketProxy(handler http.Handler, logger log.CtxLogger) *webSocketPro ReadBufferSize: 1024, WriteBufferSize: 1024, }, + writeWait: defaultWriteWait, + pongWait: defaultPongWait, + pingPeriod: (defaultPongWait * 9) / 10, } - proxy.writeWait = 10 * time.Second - proxy.pongWait = 60 * time.Second - proxy.pingPeriod = (proxy.pongWait * 9) / 10 return proxy } From b0ff9581fbf00ec96e69da9cce863ed9ec54b45d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Fri, 23 Dec 2022 14:37:46 +0100 Subject: [PATCH 15/21] fix concurrency --- pkg/foundation/grpcutil/websocket.go | 79 +++++++++-------------- pkg/foundation/grpcutil/websocket_test.go | 45 +++++++++++++ 2 files changed, 75 insertions(+), 49 deletions(-) diff --git a/pkg/foundation/grpcutil/websocket.go b/pkg/foundation/grpcutil/websocket.go index 18ef3a23b..d8f31a8d2 100644 --- a/pkg/foundation/grpcutil/websocket.go +++ b/pkg/foundation/grpcutil/websocket.go @@ -29,14 +29,12 @@ import ( type inMemoryResponseWriter struct { io.Writer header http.Header - closed chan bool } func newInMemoryResponseWriter(writer io.Writer) *inMemoryResponseWriter { return &inMemoryResponseWriter{ Writer: writer, header: http.Header{}, - closed: make(chan bool, 1), } } @@ -49,9 +47,6 @@ func (w *inMemoryResponseWriter) Header() http.Header { func (w *inMemoryResponseWriter) WriteHeader(int) { // we don't have a use for the code } -func (w *inMemoryResponseWriter) CloseNotify() <-chan bool { - return w.closed -} func (w *inMemoryResponseWriter) Flush() {} var ( @@ -102,6 +97,7 @@ func (p *webSocketProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (p *webSocketProxy) proxy(w http.ResponseWriter, r *http.Request) { ctx, cancelFn := context.WithCancel(r.Context()) defer cancelFn() + r = r.WithContext(ctx) // upgrade connection to WebSocket conn, err := p.upgrader.Upgrade(w, r, http.Header{}) @@ -115,21 +111,17 @@ func (p *webSocketProxy) proxy(w http.ResponseWriter, r *http.Request) { // and then write it to the WebSocket connection. responseR, responseW := io.Pipe() response := newInMemoryResponseWriter(responseW) - go func() { - <-ctx.Done() - p.logger.Debug(ctx).Err(ctx.Err()).Msg("closing pipes") - responseW.CloseWithError(io.EOF) - response.closed <- true - }() // Start the "underlying" http.Handler go func() { - defer cancelFn() p.handler.ServeHTTP(response, r) + p.logger.Debug(ctx).Err(ctx.Err()).Msg("closing pipes") + responseW.CloseWithError(io.EOF) }() + messages := make(chan []byte) go p.startWebSocketRead(ctx, conn, cancelFn) - messages := p.readFromHTTPResponse(ctx, responseR) + go p.readFromHTTPResponse(ctx, responseR, messages) p.startWebSocketWrite(ctx, messages, conn, cancelFn) } @@ -137,8 +129,8 @@ func (p *webSocketProxy) proxy(w http.ResponseWriter, r *http.Request) { // The read loop will stop: // 1. if the request context was cancelled, or // 2. if there's been an error reading a message. -func (p *webSocketProxy) startWebSocketRead(ctx context.Context, conn *websocket.Conn, onDone func()) { - defer onDone() +func (p *webSocketProxy) startWebSocketRead(ctx context.Context, conn *websocket.Conn, cancelFn func()) { + defer cancelFn() conn.SetReadLimit(512) err := conn.SetReadDeadline(time.Now().Add(p.pongWait)) @@ -157,13 +149,6 @@ func (p *webSocketProxy) startWebSocketRead(ctx context.Context, conn *websocket }) for { - select { - case <-ctx.Done(): - p.logger.Debug(ctx).Msg("read loop done because request context was cancelled") - return - default: - } - // The only use we have for reads right now // is for ping, pong and close messages. // https://pkg.go.dev/github.com/gorilla/websocket#hdr-Control_Messages @@ -194,45 +179,41 @@ func (p *webSocketProxy) isClosedConnErr(err error) bool { ) } -func (p *webSocketProxy) readFromHTTPResponse(ctx context.Context, responseReader *io.PipeReader) chan []byte { - c := make(chan []byte) - go func() { - scanner := bufio.NewScanner(responseReader) +func (p *webSocketProxy) readFromHTTPResponse(ctx context.Context, responseReader io.Reader, c chan []byte) { + defer close(c) + scanner := bufio.NewScanner(responseReader) - for scanner.Scan() { - if len(scanner.Bytes()) == 0 { - p.logger.Warn(ctx).Err(scanner.Err()).Msg("[write] empty scan") - continue - } - - p.logger.Trace(ctx).Msgf("[write] scanned %v", scanner.Text()) - c <- scanner.Bytes() + for scanner.Scan() { + if len(scanner.Bytes()) == 0 { + p.logger.Warn(ctx).Err(scanner.Err()).Msg("[write] empty scan") + continue } - if sErr := scanner.Err(); sErr != nil { - p.logger.Err(ctx, sErr).Msg("failed reading data from original response") - c <- []byte(sErr.Error()) - } + p.logger.Trace(ctx).Msgf("[write] scanned %v", scanner.Text()) + c <- scanner.Bytes() + } - p.logger.Debug(ctx).Msg("scanner reached end of input data") - close(c) - }() + if sErr := scanner.Err(); sErr != nil { + p.logger.Err(ctx, sErr).Msg("failed reading data from original response") + c <- []byte(sErr.Error()) + } - return c + p.logger.Debug(ctx).Msg("scanner reached end of input data") } -func (p *webSocketProxy) startWebSocketWrite(ctx context.Context, messages chan []byte, conn *websocket.Conn, onDone func()) { - ticker := time.NewTicker(p.pingPeriod) +func (p *webSocketProxy) startWebSocketWrite(ctx context.Context, messages chan []byte, conn *websocket.Conn, cancelFn func()) { defer func() { - ticker.Stop() - onDone() + for range messages { + // throw away + } }() + defer cancelFn() + + ticker := time.NewTicker(p.pingPeriod) + defer ticker.Stop() for { select { - case <-ctx.Done(): - p.logger.Debug(ctx).Msg("write loop done because request context was cancelled") - return case message, ok := <-messages: conn.SetWriteDeadline(time.Now().Add(p.writeWait)) //nolint:errcheck // always returns nil if !ok { diff --git a/pkg/foundation/grpcutil/websocket_test.go b/pkg/foundation/grpcutil/websocket_test.go index a90b18e0e..abaca212b 100644 --- a/pkg/foundation/grpcutil/websocket_test.go +++ b/pkg/foundation/grpcutil/websocket_test.go @@ -19,9 +19,11 @@ import ( "io" "net/http" "net/http/httptest" + "strings" "testing" "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/gorilla/websocket" "github.com/matryer/is" ) @@ -59,3 +61,46 @@ func TestWebSocket_NoUpgradeToWebSocket(t *testing.T) { is.NoErr(err) is.Equal(h.response, string(bytes)) } + +func TestWebSocket_UpgradeToWebSocket(t *testing.T) { + is := is.New(t) + // ctx := context.Background() + + // handlerDone := make(chan struct{}) + h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // defer close(handlerDone) + _, err := w.Write([]byte("hi there\n")) + is.NoErr(err) + // <-r.Context().Done() + }) + s := httptest.NewServer(newWebSocketProxy(h, log.Nop())) + defer s.Close() + + // Convert http to ws + wsURL := "ws" + strings.TrimPrefix(s.URL, "http") + + // Connect to the server + ws, resp, err := websocket.DefaultDialer.Dial(wsURL, nil) + is.NoErr(err) + defer ws.Close() + defer resp.Body.Close() + + msgType, bytes, err := ws.ReadMessage() + is.NoErr(err) + is.Equal("hi there", string(bytes)) + is.Equal(websocket.TextMessage, msgType) + + // _, _, err = cchan.Chan[struct{}](handlerDone).RecvTimeout(ctx, time.Millisecond*10) + // is.Equal(err, context.DeadlineExceeded) + + _, _, err = ws.ReadMessage() + is.True(err != nil) + t.Log(err) + + // err = ws.Close() + // is.NoErr(err) + + // _, ok, err := cchan.Chan[struct{}](handlerDone).RecvTimeout(ctx, time.Second) + // is.True(!ok) // expected channel to be closed + // is.NoErr(err) +} From 7d4b410ebbfc19a1b61f3471b15853ff51bd7dd5 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Fri, 23 Dec 2022 15:39:28 +0100 Subject: [PATCH 16/21] comments --- pkg/foundation/grpcutil/websocket.go | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/pkg/foundation/grpcutil/websocket.go b/pkg/foundation/grpcutil/websocket.go index d8f31a8d2..558bad24c 100644 --- a/pkg/foundation/grpcutil/websocket.go +++ b/pkg/foundation/grpcutil/websocket.go @@ -94,12 +94,26 @@ func (p *webSocketProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { p.proxy(w, r) } +// proxy creates a "pipeline" from the underlying response +// to a WebSocket connection. The pipeline is constructed in +// the following way: +// +// underlying response +// -> inMemoryResponseWrite +// -> scanner +// -> messages channel +// -> connection writer +// +// In the case of an error due to which we need to abort the request +// and close the WebSocket connection, we need to cancel the request context +// and stop writing any data to the WebSocket connection. This will +// automatically halt all the "pipeline nodes" after the underlying response. func (p *webSocketProxy) proxy(w http.ResponseWriter, r *http.Request) { ctx, cancelFn := context.WithCancel(r.Context()) defer cancelFn() r = r.WithContext(ctx) - // upgrade connection to WebSocket + // Upgrade connection to WebSocket conn, err := p.upgrader.Upgrade(w, r, http.Header{}) if err != nil { p.logger.Err(ctx, err).Msg("error upgrading websocket") @@ -120,17 +134,17 @@ func (p *webSocketProxy) proxy(w http.ResponseWriter, r *http.Request) { }() messages := make(chan []byte) + // startWebSocketRead and startWebSocketWrite need to cancel the context + // if they encounter an error reading from or writing to the WS connection go p.startWebSocketRead(ctx, conn, cancelFn) go p.readFromHTTPResponse(ctx, responseR, messages) p.startWebSocketWrite(ctx, messages, conn, cancelFn) } -// startWebSocketRead starts a read loop on the proxy's WebSocket connection -// The read loop will stop: -// 1. if the request context was cancelled, or -// 2. if there's been an error reading a message. -func (p *webSocketProxy) startWebSocketRead(ctx context.Context, conn *websocket.Conn, cancelFn func()) { - defer cancelFn() +// startWebSocketRead starts a read loop on the proxy's WebSocket connection. +// The read loop will stop if there's been an error reading a message. +func (p *webSocketProxy) startWebSocketRead(ctx context.Context, conn *websocket.Conn, onDone func()) { + defer onDone() conn.SetReadLimit(512) err := conn.SetReadDeadline(time.Now().Add(p.pongWait)) From 9fac27d22a0730fac1870489ec2ee436ad202bcb Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Fri, 23 Dec 2022 16:31:36 +0100 Subject: [PATCH 17/21] test --- pkg/foundation/grpcutil/websocket_test.go | 70 +++++++++++++---------- 1 file changed, 41 insertions(+), 29 deletions(-) diff --git a/pkg/foundation/grpcutil/websocket_test.go b/pkg/foundation/grpcutil/websocket_test.go index abaca212b..f9c1018f5 100644 --- a/pkg/foundation/grpcutil/websocket_test.go +++ b/pkg/foundation/grpcutil/websocket_test.go @@ -16,36 +16,29 @@ package grpcutil import ( "context" + "github.com/conduitio/conduit/pkg/foundation/cchan" "io" "net/http" "net/http/httptest" "strings" "testing" + "time" "github.com/conduitio/conduit/pkg/foundation/log" "github.com/gorilla/websocket" "github.com/matryer/is" ) -type testHandler struct { - is *is.I - response string -} - -func (h *testHandler) ServeHTTP(w http.ResponseWriter, _ *http.Request) { - _, err := w.Write([]byte(h.response)) - h.is.NoErr(err) -} - func TestWebSocket_NoUpgradeToWebSocket(t *testing.T) { is := is.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - h := &testHandler{ - is: is, - response: "hi there", - } + msg := "hi there" + h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, err := w.Write([]byte(msg)) + is.NoErr(err) + }) s := httptest.NewServer(newWebSocketProxy(h, log.Nop())) defer s.Close() @@ -59,19 +52,16 @@ func TestWebSocket_NoUpgradeToWebSocket(t *testing.T) { bytes, err := io.ReadAll(resp.Body) is.NoErr(err) - is.Equal(h.response, string(bytes)) + is.Equal(msg, string(bytes)) } -func TestWebSocket_UpgradeToWebSocket(t *testing.T) { +func TestWebSocket_Read(t *testing.T) { is := is.New(t) - // ctx := context.Background() - // handlerDone := make(chan struct{}) h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // defer close(handlerDone) + // Data written to a WebSocket is new-line delimited _, err := w.Write([]byte("hi there\n")) is.NoErr(err) - // <-r.Context().Done() }) s := httptest.NewServer(newWebSocketProxy(h, log.Nop())) defer s.Close() @@ -90,17 +80,39 @@ func TestWebSocket_UpgradeToWebSocket(t *testing.T) { is.Equal("hi there", string(bytes)) is.Equal(websocket.TextMessage, msgType) - // _, _, err = cchan.Chan[struct{}](handlerDone).RecvTimeout(ctx, time.Millisecond*10) - // is.Equal(err, context.DeadlineExceeded) - _, _, err = ws.ReadMessage() is.True(err != nil) - t.Log(err) - // err = ws.Close() - // is.NoErr(err) + err = ws.Close() + is.NoErr(err) +} + +func TestWebSocket_Read_ClientClosed(t *testing.T) { + is := is.New(t) - // _, ok, err := cchan.Chan[struct{}](handlerDone).RecvTimeout(ctx, time.Second) - // is.True(!ok) // expected channel to be closed - // is.NoErr(err) + handlerDone := make(chan struct{}) + h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer close(handlerDone) + // Data written to a WebSocket is new-line delimited + _, err := w.Write([]byte("hi there\n")) + is.NoErr(err) + }) + s := httptest.NewServer(newWebSocketProxy(h, log.Nop())) + defer s.Close() + + // Convert http to ws + wsURL := "ws" + strings.TrimPrefix(s.URL, "http") + + // Connect to the server + ws, resp, err := websocket.DefaultDialer.Dial(wsURL, nil) + is.NoErr(err) + defer ws.Close() + defer resp.Body.Close() + + err = ws.Close() + is.NoErr(err) + + _, ok, err := cchan.Chan[struct{}](handlerDone).RecvTimeout(context.Background(), time.Second) + is.True(!ok) // expected channel to be closed + is.NoErr(err) } From b5da901b828d67a351e07bae5e4b98afbb61d0fa Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Fri, 23 Dec 2022 16:54:10 +0100 Subject: [PATCH 18/21] tests --- pkg/foundation/grpcutil/websocket_test.go | 46 +++++++++++++++++++++-- 1 file changed, 42 insertions(+), 4 deletions(-) diff --git a/pkg/foundation/grpcutil/websocket_test.go b/pkg/foundation/grpcutil/websocket_test.go index f9c1018f5..1959aba51 100644 --- a/pkg/foundation/grpcutil/websocket_test.go +++ b/pkg/foundation/grpcutil/websocket_test.go @@ -55,7 +55,7 @@ func TestWebSocket_NoUpgradeToWebSocket(t *testing.T) { is.Equal(msg, string(bytes)) } -func TestWebSocket_Read(t *testing.T) { +func TestWebSocket_Read_Single(t *testing.T) { is := is.New(t) h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -87,15 +87,53 @@ func TestWebSocket_Read(t *testing.T) { is.NoErr(err) } +func TestWebSocket_Read_Multiple(t *testing.T) { + is := is.New(t) + + h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Data written to a WebSocket is new-line delimited + _, err := w.Write([]byte("first message\n")) + is.NoErr(err) + + _, err = w.Write([]byte("second message\n")) + is.NoErr(err) + }) + s := httptest.NewServer(newWebSocketProxy(h, log.Nop())) + defer s.Close() + + // Convert http to ws + wsURL := "ws" + strings.TrimPrefix(s.URL, "http") + + // Connect to the server + ws, resp, err := websocket.DefaultDialer.Dial(wsURL, nil) + is.NoErr(err) + defer ws.Close() + defer resp.Body.Close() + + msgType, bytes, err := ws.ReadMessage() + is.NoErr(err) + is.Equal("first message", string(bytes)) + is.Equal(websocket.TextMessage, msgType) + + msgType, bytes, err = ws.ReadMessage() + is.NoErr(err) + is.Equal("second message", string(bytes)) + is.Equal(websocket.TextMessage, msgType) + + _, _, err = ws.ReadMessage() + is.True(err != nil) + + err = ws.Close() + is.NoErr(err) +} + func TestWebSocket_Read_ClientClosed(t *testing.T) { is := is.New(t) handlerDone := make(chan struct{}) h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { defer close(handlerDone) - // Data written to a WebSocket is new-line delimited - _, err := w.Write([]byte("hi there\n")) - is.NoErr(err) + <-r.Context().Done() }) s := httptest.NewServer(newWebSocketProxy(h, log.Nop())) defer s.Close() From 2959affffa6a825d902d31e794e839df245c4249 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Fri, 23 Dec 2022 16:57:28 +0100 Subject: [PATCH 19/21] lint --- pkg/foundation/grpcutil/websocket_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/foundation/grpcutil/websocket_test.go b/pkg/foundation/grpcutil/websocket_test.go index 1959aba51..b37a16955 100644 --- a/pkg/foundation/grpcutil/websocket_test.go +++ b/pkg/foundation/grpcutil/websocket_test.go @@ -16,7 +16,6 @@ package grpcutil import ( "context" - "github.com/conduitio/conduit/pkg/foundation/cchan" "io" "net/http" "net/http/httptest" @@ -24,6 +23,7 @@ import ( "testing" "time" + "github.com/conduitio/conduit/pkg/foundation/cchan" "github.com/conduitio/conduit/pkg/foundation/log" "github.com/gorilla/websocket" "github.com/matryer/is" From b8f1110727ad85f2d7e542ae63aa4e983aa5466f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Haris=20Osmanagi=C4=87?= Date: Fri, 23 Dec 2022 20:15:56 +0100 Subject: [PATCH 20/21] Update pkg/foundation/grpcutil/websocket.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Lovro Mažgon --- pkg/foundation/grpcutil/websocket.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/foundation/grpcutil/websocket.go b/pkg/foundation/grpcutil/websocket.go index 558bad24c..7e506bdf6 100644 --- a/pkg/foundation/grpcutil/websocket.go +++ b/pkg/foundation/grpcutil/websocket.go @@ -216,15 +216,14 @@ func (p *webSocketProxy) readFromHTTPResponse(ctx context.Context, responseReade } func (p *webSocketProxy) startWebSocketWrite(ctx context.Context, messages chan []byte, conn *websocket.Conn, cancelFn func()) { + ticker := time.NewTicker(p.pingPeriod) defer func() { + ticker.Stop() + cancelFn() for range messages { // throw away } }() - defer cancelFn() - - ticker := time.NewTicker(p.pingPeriod) - defer ticker.Stop() for { select { From 2601ad89130f58cd71ecf108284010e542f9f2b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Haris=20Osmanagi=C4=87?= Date: Fri, 23 Dec 2022 20:16:11 +0100 Subject: [PATCH 21/21] Update pkg/foundation/grpcutil/websocket.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Lovro Mažgon --- pkg/foundation/grpcutil/websocket.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/foundation/grpcutil/websocket.go b/pkg/foundation/grpcutil/websocket.go index 7e506bdf6..f2b1e360a 100644 --- a/pkg/foundation/grpcutil/websocket.go +++ b/pkg/foundation/grpcutil/websocket.go @@ -99,7 +99,7 @@ func (p *webSocketProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { // the following way: // // underlying response -// -> inMemoryResponseWrite +// -> inMemoryResponseWriter // -> scanner // -> messages channel // -> connection writer