Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 191 additions & 37 deletions pkg/sip/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,21 @@ package sip

import (
"context"
"errors"
"fmt"
"log/slog"
"net"
"net/netip"
"strconv"
"strings"
"sync"
"time"

"github.com/frostbyte73/core"
"golang.org/x/exp/maps"

esip "github.com/emiago/sipgo/sip"
Comment thread
dennwc marked this conversation as resolved.

"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
Expand Down Expand Up @@ -175,32 +181,179 @@ func (c *Client) getActiveCall(tag LocalTag) *outboundCall {
return c.activeCalls[tag]
}

func setUriTransport(p *sip.Uri, tr livekit.SIPTransport) {
if tr != livekit.SIPTransport_SIP_TRANSPORT_AUTO {
p.UriParams.Add("transport", tr.Name())
}
}

func buildLegacyURI(user, addr string, tr livekit.SIPTransport) (*sip.Uri, error) {
if user == "" {
return nil, fmt.Errorf("number must be set")
} else if strings.Contains(user, "@") {
return nil, fmt.Errorf("should be a phone number or SIP user, not a full SIP URI")
}
if addr == "" {
return nil, fmt.Errorf("address must be set")
}
if strings.HasPrefix(addr, "sip:") || strings.HasPrefix(addr, "sips:") {
return nil, fmt.Errorf("address must be a hostname without 'sip:' prefix")
} else if strings.Contains(addr, "transport=") {
return nil, fmt.Errorf("legacy address must not contain parameters; use transport field")
} else if strings.ContainsAny(addr, ";=") {
return nil, fmt.Errorf("legacy address must not contain parameters")
}
p := &sip.Uri{Scheme: "sip"}
setUriTransport(p, tr)

p.User = user
if host, sport, err := net.SplitHostPort(addr); err == nil && sport != "" {
p.Host = host
p.Port, err = strconv.Atoi(sport)
if err != nil {
return nil, fmt.Errorf("invalid port in hostname: %q", sport)
}
} else {
p.Host = addr
}
return p, nil
}

func buildRawURI(raw string, tr livekit.SIPTransport) (*sip.Uri, error) {
p := &sip.Uri{Scheme: "sip"}
if n := len(raw); n != 0 && raw[0] == '<' && raw[n-1] == '>' {
raw = raw[1 : n-1]
}
if err := esip.ParseUri(raw, p); err != nil {
Comment thread
dennwc marked this conversation as resolved.
return nil, errors.New("invalid request URI")
}
setUriTransport(p, tr)
return p, nil
}

func buildValuesURI(u *livekit.SIPUri, tr livekit.SIPTransport) (*sip.Uri, error) {
if tr != u.Transport {
if u.Transport == livekit.SIPTransport_SIP_TRANSPORT_AUTO {
//tr = tr
} else if tr == livekit.SIPTransport_SIP_TRANSPORT_AUTO {
tr = u.Transport
} else {
return nil, fmt.Errorf("different transports specified: %v vs %v", tr, u.Transport)
}
}
p := &sip.Uri{Scheme: "sip"}
setUriTransport(p, tr)
if u.User == "" {
return nil, fmt.Errorf("username or number must be set")
}
if u.Host == "" && u.Ip == "" {
return nil, fmt.Errorf("host or ip must be set")
}
p.User = u.User
p.Host = u.Host
if p.Host == "" {
p.Host = u.Ip
}
if _, sport, err := net.SplitHostPort(p.Host); err == nil && sport != "" {
return nil, fmt.Errorf("host or ip must not contain port")
}
p.Port = int(u.Port)
return p, nil
}

func buildRequestURI(u *livekit.SIPRequestDest, legacyUser, legacyAddr string, tr livekit.SIPTransport) (*sip.Uri, error) {
if u == nil {
return buildLegacyURI(legacyUser, legacyAddr, tr)
}
switch u := u.Uri.(type) {
default:
case *livekit.SIPRequestDest_Raw:
return buildRawURI(u.Raw, tr)
case *livekit.SIPRequestDest_Values:
return buildValuesURI(u.Values, tr)
}
return nil, fmt.Errorf("invalid request URI type")
}

func buildFromToURI(u *livekit.SIPNamedDest, legacyUser, legacyAddr string, tr livekit.SIPTransport) (*sip.Uri, error) {
if u == nil {
return buildLegacyURI(legacyUser, legacyAddr, tr)
}
switch u := u.Uri.(type) {
default:
case *livekit.SIPNamedDest_Raw:
return buildRawURI(u.Raw, tr)
case *livekit.SIPNamedDest_Values:
return buildValuesURI(u.Values, tr)
}
return nil, fmt.Errorf("invalid URI type")
}

func buildFromHeader(u *livekit.SIPNamedDest, legacyName *string, legacyUser, legacyAddr string, tr livekit.SIPTransport) (*sip.FromHeader, error) {
su, err := buildFromToURI(u, legacyUser, legacyAddr, tr)
if err != nil {
return nil, err
}
h := &sip.FromHeader{
Address: *su,
}
if u != nil {
h.DisplayName = u.DisplayName
} else if legacyName != nil {
h.DisplayName = *legacyName
} else {
// Nothing specified, preserve legacy behavior
h.DisplayName = su.User
}
return h, nil
}

func buildToHeader(u *livekit.SIPNamedDest, legacyUser, legacyAddr string, tr livekit.SIPTransport) (*sip.ToHeader, error) {
if u != nil && legacyUser != "" {
return nil, errors.New("cannot use both CallTo and SipToHeader")
}
su, err := buildFromToURI(u, legacyUser, legacyAddr, tr)
if err != nil {
return nil, err
}
h := &sip.ToHeader{
Address: *su,
}
if u != nil {
h.DisplayName = u.DisplayName
}
return h, nil
}

func buildOutboundHeaders(req *rpc.InternalCreateSIPParticipantRequest, defaultHost string) (*sip.Uri, *sip.FromHeader, *sip.ToHeader, error) {
uri, err := buildRequestURI(req.SipRequestUri, req.CallTo, req.Address, req.Transport)
if err != nil {
return nil, nil, nil, psrpc.NewError(psrpc.InvalidArgument, fmt.Errorf("invalid request URI: %w", err))
}
to, err := buildToHeader(req.SipToHeader, req.CallTo, req.Address, req.Transport)
if err != nil {
return nil, nil, nil, psrpc.NewError(psrpc.InvalidArgument, fmt.Errorf("invalid To header: %w", err))
}
fromHost := req.Hostname
if fromHost == "" {
fromHost = defaultHost
}
from, err := buildFromHeader(req.SipFromHeader, req.DisplayName, req.Number, fromHost, req.Transport)
if err != nil {
return nil, nil, nil, psrpc.NewError(psrpc.InvalidArgument, fmt.Errorf("invalid From header: %w", err))
}
return uri, from, to, nil
}

func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCreateSIPParticipantRequest) (resp *rpc.InternalCreateSIPParticipantResponse, retErr error) {
if c.mon.Health() != stats.HealthOK {
return nil, siperrors.ErrUnavailable
}
req.Upgrade()
if req.CallTo == "" {
return nil, psrpc.NewErrorf(psrpc.InvalidArgument, "call-to number must be set")
} else if req.Address == "" {
return nil, psrpc.NewErrorf(psrpc.InvalidArgument, "trunk adresss must be set")
} else if req.Number == "" {
return nil, psrpc.NewErrorf(psrpc.InvalidArgument, "trunk outbound number must be set")
} else if req.RoomName == "" {
if req.RoomName == "" {
return nil, psrpc.NewErrorf(psrpc.InvalidArgument, "room name must be set")
}
if strings.Contains(req.CallTo, "@") {
return nil, psrpc.NewErrorf(psrpc.InvalidArgument, "call_to should be a phone number or SIP user, not a full SIP URI")
}
if strings.HasPrefix(req.Address, "sip:") || strings.HasPrefix(req.Address, "sips:") {
return nil, psrpc.NewErrorf(psrpc.InvalidArgument, "address must be a hostname without 'sip:' prefix")
}
if strings.Contains(req.Address, "transport=") {
return nil, psrpc.NewErrorf(psrpc.InvalidArgument, "address must not contain parameters; use transport field")
}
if strings.ContainsAny(req.Address, ";=") {
return nil, psrpc.NewErrorf(psrpc.InvalidArgument, "address must not contain parameters")
}
defaultHost := c.ContactURI(TransportFrom(req.Transport)).GetHost()
log := c.log
if req.ProjectId != "" {
log = log.WithValues("projectID", req.ProjectId)
Expand All @@ -212,22 +365,28 @@ func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCrea
if err != nil {
return nil, err
}
uri, from, to, err := buildOutboundHeaders(req, defaultHost)
if err != nil {
return nil, err
}
tid := traceid.FromGUID(req.SipCallId)
log = log.WithValues(
"callID", req.SipCallId,
"traceID", tid.String(),
"room", req.RoomName,
"participant", req.ParticipantIdentity,
"participantName", req.ParticipantName,
"fromHost", req.Hostname,
"fromUser", req.Number,
"toHost", req.Address,
"toUser", req.CallTo,
"fromHost", from.Address.Host,
"fromUser", from.Address.User,
"toHost", to.Address.Host,
"toUser", to.Address.User,
"reqHost", uri.Host,
"reqUser", uri.User,
"direction", "outbound",
)

req.ParticipantAttributes = maps.Clone(req.ParticipantAttributes) // shallow clone - string/string map. Needed to avoid mutating psrpc req
initial := c.createSIPCallInfo(req)
initial := c.createSIPCallInfo(uri, from, to, req)
state := NewCallState(c.getStateHandler(req.ProjectId, req.Observability, initial), initial)

defer func() {
Expand Down Expand Up @@ -255,11 +414,10 @@ func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCrea
},
}
sipConf := sipOutboundConfig{
address: req.Address,
transport: req.Transport,
host: req.Hostname,
from: req.Number,
to: req.CallTo,
uri: uri,
from: from,
to: to,
user: req.Username,
pass: req.Password,
dtmf: req.Dtmf,
Expand All @@ -273,7 +431,6 @@ func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCrea
enabledFeatures: req.EnabledFeatures,
featureFlags: req.FeatureFlags,
mediaConfig: mconf,
displayName: req.DisplayName,
}
log.Infow("Creating SIP participant")
call, err := c.newCall(ctx, tid, c.conf, log, LocalTag(req.SipCallId), roomConf, sipConf, state, req.ProjectId)
Expand All @@ -299,13 +456,10 @@ func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCrea
return info, nil
}

func (c *Client) createSIPCallInfo(req *rpc.InternalCreateSIPParticipantRequest) *livekit.SIPCallInfo {
toUri := CreateURIFromUserAndAddress(req.CallTo, req.Address, TransportFrom(req.Transport))
fromiUri := URI{
User: req.Number,
Host: req.Hostname,
Addr: netip.AddrPortFrom(c.sconf.SignalingIP, uint16(c.conf.SIPPort)),
}
func (c *Client) createSIPCallInfo(uri *sip.Uri, from *sip.FromHeader, to *sip.ToHeader, req *rpc.InternalCreateSIPParticipantRequest) *livekit.SIPCallInfo {
toUri := ConvertURI(&to.Address)
fromUri := ConvertURI(&from.Address)
fromUri.Addr = netip.AddrPortFrom(c.sconf.SignalingIP, uint16(c.conf.SIPPort))

callInfo := &livekit.SIPCallInfo{
CallId: req.SipCallId,
Expand All @@ -316,7 +470,7 @@ func (c *Client) createSIPCallInfo(req *rpc.InternalCreateSIPParticipantRequest)
ParticipantAttributes: req.ParticipantAttributes,
CallDirection: livekit.SIPCallDirection_SCD_OUTBOUND,
ToUri: toUri.ToSIPUri(),
FromUri: fromiUri.ToSIPUri(),
FromUri: fromUri.ToSIPUri(),
CreatedAtNs: time.Now().UnixNano(),
MediaEncryption: req.MediaEncryption.String(),
EnabledFeatures: req.EnabledFeatures,
Expand Down
Loading
Loading