From 6595ae58e14e22b1df9023af53f783fff77460b8 Mon Sep 17 00:00:00 2001 From: FZambia Date: Thu, 7 May 2026 23:15:06 +0300 Subject: [PATCH 1/2] Extend HTTP API methods to match current proto and clean up Client Extend existing methods with optional parameters introduced in Centrifugo's HTTP API since this client was last updated: * publish/broadcast: tags, idempotency_key, delta, version, version_epoch * subscribe: expire_at, info, data, recover_since, override, session * unsubscribe: session * disconnect: disconnect (code/reason), whitelist, session New parameters are positional optional arguments at the end and only included in the JSON payload when set, so existing callers see byte-identical request bodies. Drive-by fixes: * setSafety docblock: @param was incorrectly named caPath * _json_last_error_msg: drop dead PHP < 5.5 fallback (composer requires >= 7.0); public signature preserved for BC * request(): replace pointless elseif (\$this->safety) with else * getUrl(): rtrim(\$this->url, '/') so a trailing slash on the constructor URL doesn't produce a double slash in the request URL * README: API methods list was missing subscribe and batch --- README.md | 2 + src/Client.php | 139 ++++++++++++++++++++++++++++++++++++------------- 2 files changed, 106 insertions(+), 35 deletions(-) diff --git a/README.md b/README.md index 50fc92e..d530e62 100644 --- a/README.md +++ b/README.md @@ -64,6 +64,7 @@ All available API methods: ```php $response = $client->publish($channel, $data); $response = $client->broadcast($channels, $data); +$response = $client->subscribe($channel, $userId); $response = $client->unsubscribe($channel, $userId); $response = $client->disconnect($userId); $response = $client->presence($channel); @@ -72,6 +73,7 @@ $response = $client->history($channel); $response = $client->historyRemove($channel); $response = $client->channels(); $response = $client->info(); +$response = $client->batch($data); ``` To use `assoc` option while decoding JSON in response: diff --git a/src/Client.php b/src/Client.php index c194f00..fd4d6c8 100644 --- a/src/Client.php +++ b/src/Client.php @@ -61,7 +61,7 @@ public function setSecret($secret) } /** - * @param bool $caPath + * @param bool $safety * @return Client */ public function setSafety($safety) @@ -137,15 +137,36 @@ public function forceIpResolveV4() * @param string $channel * @param array $data * @param boolean $skipHistory (optional) + * @param array $tags (optional) map of string key/value tags attached to the publication + * @param string $idempotencyKey (optional) idempotency key for the publication + * @param boolean $delta (optional) enable delta compression for the publication + * @param int $version (optional) publication version (0 means not set) + * @param string $versionEpoch (optional) publication version epoch * @return mixed */ - public function publish($channel, $data, $skipHistory = false) + public function publish($channel, $data, $skipHistory = false, $tags = array(), $idempotencyKey = '', $delta = false, $version = 0, $versionEpoch = '') { - return $this->send('publish', array( + $params = array( 'channel' => $channel, 'data' => $data, 'skip_history' => $skipHistory, - )); + ); + if (!empty($tags)) { + $params['tags'] = $tags; + } + if ($idempotencyKey !== '') { + $params['idempotency_key'] = $idempotencyKey; + } + if ($delta) { + $params['delta'] = $delta; + } + if ($version) { + $params['version'] = $version; + } + if ($versionEpoch !== '') { + $params['version_epoch'] = $versionEpoch; + } + return $this->send('publish', $params); } /** @@ -154,15 +175,36 @@ public function publish($channel, $data, $skipHistory = false) * @param array $channels * @param array $data * @param boolean $skipHistory (optional) + * @param array $tags (optional) map of string key/value tags attached to the publication + * @param string $idempotencyKey (optional) idempotency key for the publication + * @param boolean $delta (optional) enable delta compression for the publication + * @param int $version (optional) publication version (0 means not set) + * @param string $versionEpoch (optional) publication version epoch * @return mixed */ - public function broadcast($channels, $data, $skipHistory = false) + public function broadcast($channels, $data, $skipHistory = false, $tags = array(), $idempotencyKey = '', $delta = false, $version = 0, $versionEpoch = '') { - return $this->send('broadcast', array( + $params = array( 'channels' => $channels, 'data' => $data, 'skip_history' => $skipHistory, - )); + ); + if (!empty($tags)) { + $params['tags'] = $tags; + } + if ($idempotencyKey !== '') { + $params['idempotency_key'] = $idempotencyKey; + } + if ($delta) { + $params['delta'] = $delta; + } + if ($version) { + $params['version'] = $version; + } + if ($versionEpoch !== '') { + $params['version_epoch'] = $versionEpoch; + } + return $this->send('broadcast', $params); } /** @@ -171,15 +213,40 @@ public function broadcast($channels, $data, $skipHistory = false) * @param string $channel * @param string $user * @param string $client (optional) + * @param int $expireAt (optional) unix seconds in the future when subscription should expire + * @param array $info (optional) channel info attached to the subscription + * @param array $data (optional) custom data delivered to the client on subscribe + * @param array $recoverSince (optional) StreamPosition: ['offset' => int, 'epoch' => string] + * @param array $override (optional) SubscribeOptionOverride, e.g. ['presence' => ['value' => true]] + * @param string $session (optional) target a specific session of the user * @return mixed */ - public function subscribe($channel, $user, $client = '') + public function subscribe($channel, $user, $client = '', $expireAt = 0, $info = array(), $data = array(), $recoverSince = array(), $override = array(), $session = '') { - return $this->send('subscribe', array( + $params = array( 'channel' => $channel, 'user' => $user, 'client' => $client, - )); + ); + if ($expireAt) { + $params['expire_at'] = $expireAt; + } + if (!empty($info)) { + $params['info'] = $info; + } + if (!empty($data)) { + $params['data'] = $data; + } + if (!empty($recoverSince)) { + $params['recover_since'] = $recoverSince; + } + if (!empty($override)) { + $params['override'] = $override; + } + if ($session !== '') { + $params['session'] = $session; + } + return $this->send('subscribe', $params); } /** @@ -188,15 +255,20 @@ public function subscribe($channel, $user, $client = '') * @param string $channel * @param string $user * @param string $client (optional) + * @param string $session (optional) target a specific session of the user * @return mixed */ - public function unsubscribe($channel, $user, $client = '') + public function unsubscribe($channel, $user, $client = '', $session = '') { - return $this->send('unsubscribe', array( + $params = array( 'channel' => $channel, 'user' => $user, 'client' => $client, - )); + ); + if ($session !== '') { + $params['session'] = $session; + } + return $this->send('unsubscribe', $params); } /** @@ -204,14 +276,27 @@ public function unsubscribe($channel, $user, $client = '') * * @param string $user * @param string $client (optional) + * @param array $disconnect (optional) Disconnect object: ['code' => int, 'reason' => string] + * @param array $whitelist (optional) channels to keep connected even when disconnecting the user + * @param string $session (optional) target a specific session of the user * @return mixed */ - public function disconnect($user, $client = '') + public function disconnect($user, $client = '', $disconnect = array(), $whitelist = array(), $session = '') { - return $this->send('disconnect', array( + $params = array( 'user' => $user, 'client' => $client, - )); + ); + if (!empty($disconnect)) { + $params['disconnect'] = $disconnect; + } + if (!empty($whitelist)) { + $params['whitelist'] = $whitelist; + } + if ($session !== '') { + $params['session'] = $session; + } + return $this->send('disconnect', $params); } /** @@ -372,25 +457,9 @@ public function generateSubscriptionToken($userId, $channel, $exp = 0, $info = a return implode('.', $segments); } - /* - * Function added for backward compatibility with PHP version < 5.5 - */ public function _json_last_error_msg() { - if (function_exists('json_last_error_msg')) { - return json_last_error_msg(); - } - static $ERRORS = array( - JSON_ERROR_NONE => 'No error', - JSON_ERROR_DEPTH => 'Maximum stack depth exceeded', - JSON_ERROR_STATE_MISMATCH => 'State mismatch (invalid or malformed JSON)', - JSON_ERROR_CTRL_CHAR => 'Control character error, possibly incorrectly encoded', - JSON_ERROR_SYNTAX => 'Syntax error', - JSON_ERROR_UTF8 => 'Malformed UTF-8 characters, possibly incorrectly encoded' - ); - - $error = json_last_error(); - return isset($ERRORS[$error]) ? $ERRORS[$error] : 'Unknown error'; + return json_last_error_msg(); } private function send($method, $params = array()) @@ -429,7 +498,7 @@ private function request($method, $params) if (!$this->safety) { curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false); curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, 0); - } elseif ($this->safety) { + } else { curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, true); curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, 2); if ($this->cert) { @@ -465,7 +534,7 @@ private function request($method, $params) private function getUrl($method) { - return $this->url.'/'.$method; + return rtrim($this->url, '/') . '/' . $method; } private function getHeaders() From 7c73f5a6ea149c383b6dceaa9b6bf6ac7afc3e1e Mon Sep 17 00:00:00 2001 From: FZambia Date: Thu, 7 May 2026 23:21:37 +0300 Subject: [PATCH 2/2] ci: rename --api_insecure to --http_api.insecure The flag was renamed in Centrifugo and centrifugo/centrifugo:latest now exits immediately with "unknown flag: --api_insecure", which made the "Wait for server to be ready" step hang indefinitely. --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 52c59c6..2b64dfc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -23,7 +23,7 @@ jobs: uses: "ramsey/composer-install@v1" - name: Start Centrifugo - run: docker run -d -p 8000:8000 centrifugo/centrifugo:latest centrifugo --api_insecure + run: docker run -d -p 8000:8000 centrifugo/centrifugo:latest centrifugo --http_api.insecure - name: Check container status run: docker ps