Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
uses: "ramsey/composer-install@v1"

- name: Start Centrifugo
run: docker run -d -p 8000:8000 centrifugo/centrifugo:latest centrifugo --api_insecure
run: docker run -d -p 8000:8000 centrifugo/centrifugo:latest centrifugo --http_api.insecure

- name: Check container status
run: docker ps
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ All available API methods:
```php
$response = $client->publish($channel, $data);
$response = $client->broadcast($channels, $data);
$response = $client->subscribe($channel, $userId);
$response = $client->unsubscribe($channel, $userId);
$response = $client->disconnect($userId);
$response = $client->presence($channel);
Expand All @@ -72,6 +73,7 @@ $response = $client->history($channel);
$response = $client->historyRemove($channel);
$response = $client->channels();
$response = $client->info();
$response = $client->batch($data);
```

To use `assoc` option while decoding JSON in response:
Expand Down
139 changes: 104 additions & 35 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public function setSecret($secret)
}

/**
* @param bool $caPath
* @param bool $safety
* @return Client
*/
public function setSafety($safety)
Expand Down Expand Up @@ -137,15 +137,36 @@ public function forceIpResolveV4()
* @param string $channel
* @param array $data
* @param boolean $skipHistory (optional)
* @param array $tags (optional) map of string key/value tags attached to the publication
* @param string $idempotencyKey (optional) idempotency key for the publication
* @param boolean $delta (optional) enable delta compression for the publication
* @param int $version (optional) publication version (0 means not set)
* @param string $versionEpoch (optional) publication version epoch
* @return mixed
*/
public function publish($channel, $data, $skipHistory = false)
public function publish($channel, $data, $skipHistory = false, $tags = array(), $idempotencyKey = '', $delta = false, $version = 0, $versionEpoch = '')
{
return $this->send('publish', array(
$params = array(
'channel' => $channel,
'data' => $data,
'skip_history' => $skipHistory,
));
);
if (!empty($tags)) {
$params['tags'] = $tags;
}
if ($idempotencyKey !== '') {
$params['idempotency_key'] = $idempotencyKey;
}
if ($delta) {
$params['delta'] = $delta;
}
if ($version) {
$params['version'] = $version;
}
if ($versionEpoch !== '') {
$params['version_epoch'] = $versionEpoch;
}
return $this->send('publish', $params);
}

/**
Expand All @@ -154,15 +175,36 @@ public function publish($channel, $data, $skipHistory = false)
* @param array $channels
* @param array $data
* @param boolean $skipHistory (optional)
* @param array $tags (optional) map of string key/value tags attached to the publication
* @param string $idempotencyKey (optional) idempotency key for the publication
* @param boolean $delta (optional) enable delta compression for the publication
* @param int $version (optional) publication version (0 means not set)
* @param string $versionEpoch (optional) publication version epoch
* @return mixed
*/
public function broadcast($channels, $data, $skipHistory = false)
public function broadcast($channels, $data, $skipHistory = false, $tags = array(), $idempotencyKey = '', $delta = false, $version = 0, $versionEpoch = '')
{
return $this->send('broadcast', array(
$params = array(
'channels' => $channels,
'data' => $data,
'skip_history' => $skipHistory,
));
);
if (!empty($tags)) {
$params['tags'] = $tags;
}
if ($idempotencyKey !== '') {
$params['idempotency_key'] = $idempotencyKey;
}
if ($delta) {
$params['delta'] = $delta;
}
if ($version) {
$params['version'] = $version;
}
if ($versionEpoch !== '') {
$params['version_epoch'] = $versionEpoch;
}
return $this->send('broadcast', $params);
}

/**
Expand All @@ -171,15 +213,40 @@ public function broadcast($channels, $data, $skipHistory = false)
* @param string $channel
* @param string $user
* @param string $client (optional)
* @param int $expireAt (optional) unix seconds in the future when subscription should expire
* @param array $info (optional) channel info attached to the subscription
* @param array $data (optional) custom data delivered to the client on subscribe
* @param array $recoverSince (optional) StreamPosition: ['offset' => int, 'epoch' => string]
* @param array $override (optional) SubscribeOptionOverride, e.g. ['presence' => ['value' => true]]
* @param string $session (optional) target a specific session of the user
* @return mixed
*/
public function subscribe($channel, $user, $client = '')
public function subscribe($channel, $user, $client = '', $expireAt = 0, $info = array(), $data = array(), $recoverSince = array(), $override = array(), $session = '')
{
return $this->send('subscribe', array(
$params = array(
'channel' => $channel,
'user' => $user,
'client' => $client,
));
);
if ($expireAt) {
$params['expire_at'] = $expireAt;
}
if (!empty($info)) {
$params['info'] = $info;
}
if (!empty($data)) {
$params['data'] = $data;
}
if (!empty($recoverSince)) {
$params['recover_since'] = $recoverSince;
}
if (!empty($override)) {
$params['override'] = $override;
}
if ($session !== '') {
$params['session'] = $session;
}
return $this->send('subscribe', $params);
}

/**
Expand All @@ -188,30 +255,48 @@ public function subscribe($channel, $user, $client = '')
* @param string $channel
* @param string $user
* @param string $client (optional)
* @param string $session (optional) target a specific session of the user
* @return mixed
*/
public function unsubscribe($channel, $user, $client = '')
public function unsubscribe($channel, $user, $client = '', $session = '')
{
return $this->send('unsubscribe', array(
$params = array(
'channel' => $channel,
'user' => $user,
'client' => $client,
));
);
if ($session !== '') {
$params['session'] = $session;
}
return $this->send('unsubscribe', $params);
}

/**
* Disconnect user.
*
* @param string $user
* @param string $client (optional)
* @param array $disconnect (optional) Disconnect object: ['code' => int, 'reason' => string]
* @param array $whitelist (optional) channels to keep connected even when disconnecting the user
* @param string $session (optional) target a specific session of the user
* @return mixed
*/
public function disconnect($user, $client = '')
public function disconnect($user, $client = '', $disconnect = array(), $whitelist = array(), $session = '')
{
return $this->send('disconnect', array(
$params = array(
'user' => $user,
'client' => $client,
));
);
if (!empty($disconnect)) {
$params['disconnect'] = $disconnect;
}
if (!empty($whitelist)) {
$params['whitelist'] = $whitelist;
}
if ($session !== '') {
$params['session'] = $session;
}
return $this->send('disconnect', $params);
}

/**
Expand Down Expand Up @@ -372,25 +457,9 @@ public function generateSubscriptionToken($userId, $channel, $exp = 0, $info = a
return implode('.', $segments);
}

/*
* Function added for backward compatibility with PHP version < 5.5
*/
public function _json_last_error_msg()
{
if (function_exists('json_last_error_msg')) {
return json_last_error_msg();
}
static $ERRORS = array(
JSON_ERROR_NONE => 'No error',
JSON_ERROR_DEPTH => 'Maximum stack depth exceeded',
JSON_ERROR_STATE_MISMATCH => 'State mismatch (invalid or malformed JSON)',
JSON_ERROR_CTRL_CHAR => 'Control character error, possibly incorrectly encoded',
JSON_ERROR_SYNTAX => 'Syntax error',
JSON_ERROR_UTF8 => 'Malformed UTF-8 characters, possibly incorrectly encoded'
);

$error = json_last_error();
return isset($ERRORS[$error]) ? $ERRORS[$error] : 'Unknown error';
return json_last_error_msg();
}

private function send($method, $params = array())
Expand Down Expand Up @@ -429,7 +498,7 @@ private function request($method, $params)
if (!$this->safety) {
curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, 0);
} elseif ($this->safety) {
} else {
curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, true);
curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, 2);
if ($this->cert) {
Expand Down Expand Up @@ -465,7 +534,7 @@ private function request($method, $params)

private function getUrl($method)
{
return $this->url.'/'.$method;
return rtrim($this->url, '/') . '/' . $method;
}

private function getHeaders()
Expand Down
Loading