Websocket в продакшені

10 місяців тому я почав робити браузерну іграшку. Вибір припав на cocos js в якості графіки і websocket як спілкування з сервером. Технологія дуже сподобалася і я на ній організував все спілкування гри з сервером. Використовував для цього цю статтю. Але, на жаль, той код, який наведений в тій статті, не можна використовувати в продакшені. Як з'ясувалося, рівень проблеми навіть не критичний, а блокуючий. Все настільки погано, що мені довелося переписувати все спілкування з сервером з вебсокетов на longpooling. У підсумку я залишив варіант «якщо у нас браузер не сафарі, то використовувати websocket, інакше longpolling» і ще трохи розгалуження на цю тему.

Так що досвід використання вебсокет в продакшені накопичився пристойний. І ось нещодавно сталася подія, яка змусила мене написати першу статтю на Хабре.

Після того, як іграшка була опублікована в соціальній мережі, я поправив всі знайдені критичні/блокуючі баги і почав приводити все в порядок в спокійному режимі. Я хочу звернути увагу на те, що ось приклад — це взагалі єдиний в інтернеті гайд, який містить серверний код, який можна вставити собі в код і використовувати його. Ну от набрати в пошуковику «php websocket server» — спробуйте знайти що-небудь, що можна собі поставити.

Раптово я перечитую зазначену вище статті і на самому початку виявляю посилання на «phpdaemon» і «ratchet». Ну, думаю, давай в спокійному режимі подивлюся на код тамтешній. У PhpDeamon в надрах обробки WebSocket з'єднання невелике, але дуже важливе розгалуження на протоколи WebSocket. І там прямо написано для одного case «Safari5 and many non-browser clients». Сказати, що я офігів — це нічого не сказати. Перед очима промайнуло кілька сотень годин, тонни нервування і страждання, які поставили під питання взагалі проект. Я не повірив, вирішив перевірити.

Протягом ~15 годин я витягнув з PhpDeamon мінімальний код, пов'язаний з WebSocket (який працює у всіх браузерах останньої версії, а сам серверний код може працювати під навантаженням) і його постараюся опублікувати з поясненнями. Щоб інші люди не відчули ті муки, через що мені довелося пройти. Так, шматок коду вийшов не маленький, але вибачте: WebSocket він на клієнтської частини дуже простий, а на стороні сервера все досить об'ємно (скажімо окреме спасибі розробникам Сафарі). Також у зв'язку з тим, що область застосування WebSocket — це в першу чергу ігри, важливе питання неблокірующіх використання серверного сокета — це бонусна складність, яка ніяк тут не розглядається, хоча і дуже важлива.

Тестове додаток я хотів написати без об'єктів, щоб було зрозуміліше. Але, на жаль, такий підхід у даному прикладі розплодить багато повторюваного коду, тому довелося додати 1 клас і 3 його спадкоємця. Решта все без об'єктів.

Для початку клиентсная частина
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
< meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
<title>WebSocket test page</title>
</head>
<body onload="create();">
<script type="text/javascript">
create function() {
// Example
ws = new WebSocket('ws://'+document.domain+':8081/');
ws.onopen = function () {document.getElementById('log').innerHTML += 'WebSocket opened <br/>';}
ws.onmessage = function (e) {document.getElementById('log').innerHTML += 'WebSocket message: '+e.data+' <br/>';}
ws.onclose = function () {document.getElementById('log').innerHTML += 'WebSocket closed <br/>';}
}
</script>
<button onclick="create();">Create WebSocket</button>
<button onclick="ws.send('ping');">Send ping</button>
<button onclick="ws.close();">Close WebSocket</button>
<div id="log" style="width:300px; height: 300px; border: 1px solid #999999; overflow:auto;"></div>
</body>
</html>


У моїй грі мені довелося використовувати 3 сокет серверу. Для websocket, для worker'ів і для longpooling. У грі дуже багато математики, тому треба було робити веркеры і видавати їм завдання на обчислення. Так ось до чого це. Що stream_select для них всіх має бути загальний, інакше будуть лаги чи шалене використання процесора. Це знання теж було отримано в обмін купи витрачених нервів.

Основний цикл сервісу
$master = stream_socket_server("tcp://127.0.0.1:8081", $errno, $errstr);
if (!$master) die("$errstr ($errno)\n");
$sockets = array($master);
stream_set_blocking($master, false); // Щодо цієї команди я не впевнений, тому що майстер з сокетів читає тільки нові сполуки, і для читання використовується "stream_socket_accept". Варіант, що весь сервіс буде підвішений на кілька секунд з-за того, що клієнт не поспішає з'єднаються - категорично неприйнятно.
while (true) {
$read = $sockets;
$write = $except = array();
if (($num_changed_streams = stream_select($read, $write, $except, 0, 1000000)) === false) {
var_dump('stream_select error');
break;
// Зробити вихід з циклу, а не "die", тому що в продакшине скоріше всього цей код буде виконуватися як сервіс і при команді "./etc/init.d/game restart" тут 100% буде цей case, так от треба дати "pcntl" код нормально відпрацювати і не заважати йому.
}
foreach ($read as $socket) {
$index_socket = array_search($socket, $sockets);
if ($index_socket == 0) {
// Нове з'єднання
continue;
}
// Тут буде обробка повідомлень клієнтів
}
}


З'єднання з новими клієнтами цілком собі стандартний код, але ось з-за того, що сокети у нас в неблокирующем режимі, потрібно написати купу коду, який по шматочках збере всі вхідні дані і, коли даних буде достатньо, обробить їх, зрозуміє який протокол треба використовувати і переключиться на використання цього протоколу. Одна ця задача — вже гора коду, і в PhpDeamon нагородили багато коду, який до WebSocket відношення не має (вони ж там 8 різних серверів вміють піднімати). Вдалося багато відрізати і спростити в цій темі. Залишив тільки те, що відноситься до WebSocket.

Файл урізаний <ws.php>
class ws {
const MAX_BUFFER_SIZE = 1024 * 1024;

protected $socket;

/**
* @var array _SERVER
*/
public $server = [];

protected $headers = [];

protected $closed = false;
protected $unparsed_data = ";
private $current_header;
private $unread_lines = array();

protected $extensions = [];
protected $extensionsCleanRegex = '/(?:^|\W)x-webkit-/iS';

/**
* @var integer Current state
*/
protected $state = 0; // stream state of the connection (application protocol level)

/**
* Alias of STATE_STANDBY
*/
const STATE_ROOT = 0;

/**
* Standby state (default state)
*/
const STATE_STANDBY = 0;

/**
* State: first line
*/
const STATE_FIRSTLINE = 1;

/**
* State: headers
*/
const STATE_HEADERS = 2;

/**
* State: content
*/
const STATE_CONTENT = 3;

/**
* State: prehandshake
*/
const STATE_PREHANDSHAKE = 5;

/**
* State: handshaked
*/
const STATE_HANDSHAKED = 6;

public function get_state() {
return $this->state;
}

public function closed() {
return $this->closed;
}

protected function close() {
if ($this->closed) return;
var_dump('self close');
fclose($this->socket);
$this->closed = true;
}
public function __construct($socket) {
stream_set_blocking($socket, false);
$this->socket = $socket;
}

private function read_line() {
$lines = explode(PHP_EOL, $this->unparsed_data);
$last_line = $lines[count($lines)-1];
unset($lines[count($lines) - 1]);
foreach ($lines as $line) {
$this->unread_lines[] = $line;
}
$this->unparsed_data = $last_line;
if (count($this->unread_lines) != 0) {
return array_shift($this->unread_lines);
} else {
return null;
}
}
public function on_receive_data(){
if ($this->closed) return;
$data = stream_socket_recvfrom($this->socket, MAX_BUFFER_SIZE);
if (is_string($data)) {
$this->unparsed_data .= $data;
}
}
/**
* Called when new data received.
* @return void
*/
public function on_read() {
if ($this->closed) return;
if ($this->state === self::STATE_STANDBY) {
$this->state = self::STATE_FIRSTLINE;
}
if ($this->state === self::STATE_FIRSTLINE) {
if (!$this->http_read_first_line()) {
return;
}
$this->state = self::STATE_HEADERS;
}

if ($this->state === self::STATE_HEADERS) {
if (!$this->http_read_headers()) {
return;
}
if (!$this->http_process_headers()) {
$this->close();
return;
}
$this->state = self::STATE_CONTENT;
}
if ($this->state === self::STATE_CONTENT) {
$this->state = self::STATE_PREHANDSHAKE;
}
}
/**
* Read first line of HTTP request
* @return boolean|null Success
*/
protected function http_read_first_line() {
if (($l = $this->read_line()) === null) {
return null;
}
$e = explode(' ', $l);
$u = isset($e[1]) ? parse_url($e[1]) : false;
if ($u === false) {
$this->bad_request();
return false;
}
if (!isset($u['path'])) {
$u['path'] = null;
}
if (isset($u['host'])) {
$this->server['HTTP_HOST'] = $u['host'];
}
$srv = & $this->server;
$srv['REQUEST_METHOD'] = $e[0];
$srv['REQUEST_TIME'] = time();
$srv['REQUEST_TIME_FLOAT'] = microtime(true);
$srv['REQUEST_URI'] = $u['path'] . (isset($u['query']) ? '?' . $u['query'] : ");
$srv['DOCUMENT_URI'] = $u['path'];
$srv['PHP_SELF'] = $u['path'];
$srv['QUERY_STRING'] = isset($u['query']) ? $u['query'] : null;
$srv['SCRIPT_NAME'] = $srv['DOCUMENT_URI'] = isset($u['path']) ? $u['path'] : '/';
$srv['SERVER_PROTOCOL'] = isset($e[2]) ? $e[2] : 'HTTP/1.1';
$srv['REMOTE_ADDR'] = null;
$srv['REMOTE_PORT'] = null;
return true;
}
/**
* Read headers line-by-line
* @return boolean|null Success
*/
protected function http_read_headers() {
while (($l = $this->read_line()) !== null) {
if ($l === ") {
return true;
}
$e = explode(': ', $l);
if (isset($e[1])) {
$this->current_header = 'HTTP_' . strtoupper(strtr($e[0], ['-' => '_']));
$this->server[$this->current_header] = $e[1];
} elseif (($e[0][0] === "\t" || $e[0][0] === "\x20") && $this->current_header) {
// multiline header continued
$this->server[$this->current_header] .= $e[0];
} else {
// whatever client speaks is not HTTP anymore
$this->bad_request();
return false;
}
}
}
/**
* Process headers
* @return bool
*/
protected function http_process_headers() {
$this->state = self::STATE_PREHANDSHAKE;
if (isset($this->server['HTTP_SEC_WEBSOCKET_EXTENSIONS'])) {
$str = strtolower($this->server['HTTP_SEC_WEBSOCKET_EXTENSIONS']);
$str = preg_replace($this->extensionsCleanRegex, ", $str);
$this->extensions = explode(', ', $str);
}
if (!isset($this->server['HTTP_CONNECTION'])
|| (!preg_match('~(?:^|\W)Upgrade(?:\W|$)~i', $this->server['HTTP_CONNECTION'])) // "Upgrade" is not always alone (ie. "Connection: Keep-alive, Upgrade")
|| !isset($this->server['HTTP_UPGRADE'])
|| (strtolower($this->server['HTTP_UPGRADE']) !== 'websocket') // Lowercase comparison iss important
) {
$this->close();
return false;
}
if (isset($this->server['HTTP_COOKIE'])) {
self::parse_str(strtr($this->server['HTTP_COOKIE'], self::$hvaltr), $this->cookie);
}
if (isset($this->server['QUERY_STRING'])) {
self::parse_str($this->server['QUERY_STRING'], $this->get);
}
// ----------------------------------------------------------
// Discovery Protocol, based on HTTP headers...
// ----------------------------------------------------------
if (isset($this->server['HTTP_SEC_WEBSOCKET_VERSION'])) { // HYBI
if ($this->server['HTTP_SEC_WEBSOCKET_VERSION'] === '8') { // Version 8 (FF7, Chrome14)
$this->switch_to_protocol('v13');
} elseif ($this->server['HTTP_SEC_WEBSOCKET_VERSION'] === '13') { // newest protocol
$this->switch_to_protocol('v13');
} else {
error_log(get_class($this) . '::' . __METHOD__ . " : Websocket protocol version " . $this->server['HTTP_SEC_WEBSOCKET_VERSION'] . ' is not yet supported for client "addr"'); // $this->addr
$this->close();
return false;
}
} elseif (!isset($this->server['HTTP_SEC_WEBSOCKET_KEY1']) || !isset($this->server['HTTP_SEC_WEBSOCKET_KEY2'])) {
$this->switch_to_protocol('ve');
} else { // Defaulting to HIXIE (Safari5 and many non-browser clients...)
$this->switch_to_protocol('v0');
}
// ----------------------------------------------------------
// End of discovery protocol
// ----------------------------------------------------------
return true;
}
private function switch_to_protocol($protocol) {
$class = 'ws_'.$protocol;
$this->new_instance = new $class($this->socket);
$this->new_instance->state = $this->state;
$this->new_instance->unparsed_data = $this->unparsed_data;
$this->new_instance->server = $this->server;
}
/**
* Send Bad request
* @return void
*/
public function bad_request() {
$this->write("400 Bad Request\r\n\r\n<html><head><title>400 Bad Request</title></head><body bgcolor=\"white\"><center><h1>400 Bad Request</h1></center></body></html>");
$this->close();
}
/**
* Replacement for default parse_str(), it supoorts UCS-2 like this: %uXXXX
* @param string s, String to parse
* @param array &$var Reference to the resulting array
* @param boolean $header Header-style string
* @return void
*/
public static function parse_str($s, &$var, $header = false)
{
static $cb;
if ($cb === null) {
$cb = function ($m) {
return urlencode(html_entity_decode('&#' . hexdec($m[1]) . ';', ENT_NOQUOTES, 'utf-8'));
};
}
if ($header) {
$s = strtr($s, self::$hvaltr);
}
if (
(stripos($s, '%u') !== false)
&& preg_match('~(%u[a-f\d]{4}|%[c-f][a-f\d](?!%[89a-f][a-f\d]))~is', $s $m)
) {
$s = preg_replace_callback('~%(u[a-f\d]{4}|[a-f\d]{2})~i', $c, $s);
}
parse_str($s, $var);
}
/**
* Send data to the connection. Note that it just writes buffer to that flushes at every baseloop
* @param string $data Data to send
* @return boolean Success
*/
public function write($data) {
if ($this->closed) return false;
return stream_socket_sendto($this->socket, $data) == 0;
}
}


Сенс цього класу в такому урізаному вигляді — в конструкторі встановити неблокуючий режим для з'єднання з клієнтом. Далі в основному циклі, кожен раз, коли приходять дані — відразу їх прочитати і покласти (доповнити) «unparsed_data» змінну (це метод on_receive_data). Важливо розуміти, що якщо ми вийдемо за розміри MAX_BUFFER_SIZE, то взагалі нічого страшного не трапиться. Можна в підсумковому прикладі, що тут буде, поставити його значення, скажімо, «5» і переконається, що все ще працює. Просто дані з буфера на першому кроці будуть проігноровані, адже вони будуть неповні, і з другого або п'ятого чи сотого заходу наберуться, нарешті, всі прийняті дані будуть оброблені. При цьому stream_select в основному циклі чекати не буде навіть мікросекунди, поки дані не будуть вилучені. Константу треба підібрати таку, щоб 95% очікуваних даних читалися
Далі в основному циклі (після отримання чергової порції даних) ми пробуємо накопичені дані обробити (це метод on_read). У класі «ws» метод «on_read» складається по суті з трьох кроків: «читаємо перший рядок і готуємо змінні оточення», «читаємо всі заголовки», «обробляємо всі заголовки». Перші 2 пояснювати не треба, але написані вони досить об'ємно тому, що ми в неблокирующем режимі і треба бути готовим до того, що дані обірвані в будь-якому місці. Обробка заголовків спочатку перевіряє формат запиту правильний чи ні, а потім по заголовкам визначає протокол, за яким буде спілкуватися з клієнтом. В підсумку повинні смикнути метод switch_to_protocol. Цей метод всередині себе сформує екземпляр класу «ws_<протокол>» і підготує його для віддачі в основний цикл.

В основному циклі далі треба власне перевірити: а чи не треба підмінити об'єкт (якщо хтось може запропонувати реалізацію цього місця краще — завжди будь ласка).

Далі в основному циклі треба поставити перевірку: а не закритий сокет. Якщо закрите, то очистити пам'ять (про це далеке в наступному блоці).

Тепер повна версія файлу <deamon.php>
require('ws.php');
require('ws_v0.php');
require('ws_v13.php');
require('ws_ve.php');
$master = stream_socket_server("tcp://127.0.0.1:8081", $errno, $errstr);
if (!$master) die("$errstr ($errno)\n");
$sockets = array($master);
/**
* @var ws[] $connections
*/
$connections = array();
stream_set_blocking($master, false);
/**
* @param ws $connection
* @param $data
* @param $type
*/
$my_callback = function($connection, $data, $type) {
var_dump('my ws data: ['.$data.'/'.$type.']');
$connection->send_frame('test '.time());
};
while (true) {
$read = $sockets;
$write = $except = array();
if (($num_changed_streams = stream_select($read, $write, $except, 0, 1000000)) === false) {
var_dump('stream_select error');
break;
}
foreach ($read as $socket) {
$index_socket = array_search($socket, $sockets);
if ($index_socket == 0) {
// Нове з'єднання
if ($socket_new = stream_socket_accept($master, -1)) {
$connection = new ws($socket_new, $my_callback);
$sockets[] = $socket_new;
$index_new_socket = array_search($socket_new, $sockets);
$connections[$index_new_socket] = &$connection;
$index_socket = $index_new_socket;
} else {
// Я так і не зрозумів що в цьому випадку треба робити
error_log('stream_socket_accept');
var_dump('error stream_socket_accept');
continue;
}
}
$connection = &$connections[$index_socket];
$connection->on_receive_data();
$connection->on_read();
if ($connection->get_state() == ws::STATE_PREHANDSHAKE) {
$connection = $connection->get_new_instance();
$connections[$index_socket] = &$connection;
$connection->on_read();
}
if ($connection->closed()) {
unset($sockets[$index_socket]);
unset($connections[$index_socket]);
unset($connection);
var_dump('close '.$index_socket);
}
}
}


Тут доданий "$my_callback" — це наш custom обробник повідомлень від клієнта. Зрозуміло в продакшине можна загорнути це все в об'єкти всякі, а тут щоб було зрозуміліше просто змінна-функція. Про неї трохи пізніше докладніше.

Реалізована обробка нового з'єднання і реалізовано основне тіло циклу, про який я трохи вище писав.

Я хочу звернути увагу на код сервера тут. Якщо прочитані дані з сокета — це порожній рядок (так, звичайно я бачив там update перевірку на порожній рядок), то сокет треба закрити. Ох, я навіть не знаю, скільки цей момет попив мені кровиночки і скількох користувачів я втратив. Внезапнейшим чином Сафарі відправляє пустий рядок і вважає це нормою, а цей код бере і закриває з'єднання користувачеві. Яндекс-браузер іноді веде себе так само. Вже не знаю чому, але в цьому випадку для Сафарі WebSocket залишається завислим, тобто він не закривається, не відкривається — просто висить і все. Ви вже помітили, що я небайдужий до цього чарівного браузеру? Мені пригадується, як я верстав під IE6 — приблизно такі ж відчуття.

Тепер про те, навіщо я використовую array_search і синхронизирую масив $sockets і масив $connections. Справа в тому, що stream_select життєво необхідний чистий масив $sockets і ніяк інакше. Але якось треба пов'язати конкретний сокет з масиву $sockets з об'єктом «ws». Перепробував купу варіантів — в підсумку зупинився на такому варіанті, що є 2 масиву, які постійно синхронізовані по ключам. В одному масиві необхідні чисті сокети для stream_select, а в другому екземпляри класу «ws» або його спадкоємці. Якщо хтось може запропонувати це місце краще — пропонуйте.

Ще окремо треба відзначити випадок, коли stream_socket_accept зафэйлился. Я так розумію, теоретично це може бути тільки в тому випадку, якщо майстер сокет у нас в неблокирующем режимі, і приїхало недостатньо даних для з'єднання клієнта. Тому просто нічого не робимо.

Повна версія файлу <ws.php>
class ws {
private static $hvaltr = ['; ' => '&', ';' => '&', '' => '%20'];

const maxAllowedPacket = 1024 * 1024 * 1024;
const MAX_BUFFER_SIZE = 1024 * 1024;

protected $socket;

/**
* @var array _SERVER
*/
public $server = [];

protected $on_frame_user = null;

protected $handshaked = false;

protected $headers = [];
protected $headers_sent = false;

protected $closed = false;
protected $unparsed_data = ";
private $current_header;
private $unread_lines = array();
/**
* @var ws|null
*/
private $new_instance = null;

protected $extensions = [];
protected $extensionsCleanRegex = '/(?:^|\W)x-webkit-/iS';

/**
* @var integer Current state
*/
protected $state = 0; // stream state of the connection (application protocol level)

/**
* Alias of STATE_STANDBY
*/
const STATE_ROOT = 0;

/**
* Standby state (default state)
*/
const STATE_STANDBY = 0;

/**
* State: first line
*/
const STATE_FIRSTLINE = 1;

/**
* State: headers
*/
const STATE_HEADERS = 2;

/**
* State: content
*/
const STATE_CONTENT = 3;

/**
* State: prehandshake
*/
const STATE_PREHANDSHAKE = 5;

/**
* State: handshaked
*/
const STATE_HANDSHAKED = 6;

public function get_state() {
return $this->state;
}

public function get_new_instance() {
return $this->new_instance;
}

public function closed() {
return $this->closed;
}

protected function close() {
if ($this->closed) return;
var_dump('self close');
fclose($this->socket);
$this->closed = true;
}
public function __construct($socket, $on_frame_user = null) {
stream_set_blocking($socket, false);
$this->socket = $socket;
$this->on_frame_user = $on_frame_user;
}

private function read_line() {
$lines = explode(PHP_EOL, $this->unparsed_data);
$last_line = $lines[count($lines)-1];
unset($lines[count($lines) - 1]);
foreach ($lines as $line) {
$this->unread_lines[] = $line;
}
$this->unparsed_data = $last_line;
if (count($this->unread_lines) != 0) {
return array_shift($this->unread_lines);
} else {
return null;
}
}
public function on_receive_data() {
if ($this->closed) return;
$data = stream_socket_recvfrom($this->socket, self::MAX_BUFFER_SIZE);
if (is_string($data)) {
$this->unparsed_data .= $data;
}
}
/**
* Called when new data received.
* @return void
*/
public function on_read() {
if ($this->closed) return;
if ($this->state === self::STATE_STANDBY) {
$this->state = self::STATE_FIRSTLINE;
}
if ($this->state === self::STATE_FIRSTLINE) {
if (!$this->http_read_first_line()) {
return;
}
$this->state = self::STATE_HEADERS;
}

if ($this->state === self::STATE_HEADERS) {
if (!$this->http_read_headers()) {
return;
}
if (!$this->http_process_headers()) {
$this->close();
return;
}
$this->state = self::STATE_CONTENT;
}
if ($this->state === self::STATE_CONTENT) {
$this->state = self::STATE_PREHANDSHAKE;
}
}
/**
* Read first line of HTTP request
* @return boolean|null Success
*/
protected function http_read_first_line() {
if (($l = $this->read_line()) === null) {
return null;
}
$e = explode(' ', $l);
$u = isset($e[1]) ? parse_url($e[1]) : false;
if ($u === false) {
$this->bad_request();
return false;
}
if (!isset($u['path'])) {
$u['path'] = null;
}
if (isset($u['host'])) {
$this->server['HTTP_HOST'] = $u['host'];
}
$address = explode(':', stream_socket_get_name($this->socket, true)); //отримуємо адресу клієнта
$srv = & $this->server;
$srv['REQUEST_METHOD'] = $e[0];
$srv['REQUEST_TIME'] = time();
$srv['REQUEST_TIME_FLOAT'] = microtime(true);
$srv['REQUEST_URI'] = $u['path'] . (isset($u['query']) ? '?' . $u['query'] : ");
$srv['DOCUMENT_URI'] = $u['path'];
$srv['PHP_SELF'] = $u['path'];
$srv['QUERY_STRING'] = isset($u['query']) ? $u['query'] : null;
$srv['SCRIPT_NAME'] = $srv['DOCUMENT_URI'] = isset($u['path']) ? $u['path'] : '/';
$srv['SERVER_PROTOCOL'] = isset($e[2]) ? $e[2] : 'HTTP/1.1';
$srv['REMOTE_ADDR'] = $address[0];
$srv['REMOTE_PORT'] = $address[1];
return true;
}
/**
* Read headers line-by-line
* @return boolean|null Success
*/
protected function http_read_headers() {
while (($l = $this->read_line()) !== null) {
if ($l === ") {
return true;
}
$e = explode(': ', $l);
if (isset($e[1])) {
$this->current_header = 'HTTP_' . strtoupper(strtr($e[0], ['-' => '_']));
$this->server[$this->current_header] = $e[1];
} elseif (($e[0][0] === "\t" || $e[0][0] === "\x20") && $this->current_header) {
// multiline header continued
$this->server[$this->current_header] .= $e[0];
} else {
// whatever client speaks is not HTTP anymore
$this->bad_request();
return false;
}
}
}
/**
* Process headers
* @return bool
*/
protected function http_process_headers() {
$this->state = self::STATE_PREHANDSHAKE;
if (isset($this->server['HTTP_SEC_WEBSOCKET_EXTENSIONS'])) {
$str = strtolower($this->server['HTTP_SEC_WEBSOCKET_EXTENSIONS']);
$str = preg_replace($this->extensionsCleanRegex, ", $str);
$this->extensions = explode(', ', $str);
}
if (!isset($this->server['HTTP_CONNECTION'])
|| (!preg_match('~(?:^|\W)Upgrade(?:\W|$)~i', $this->server['HTTP_CONNECTION'])) // "Upgrade" is not always alone (ie. "Connection: Keep-alive, Upgrade")
|| !isset($this->server['HTTP_UPGRADE'])
|| (strtolower($this->server['HTTP_UPGRADE']) !== 'websocket') // Lowercase comparison iss important
) {
$this->close();
return false;
}
/*
if (isset($this->server['HTTP_COOKIE'])) {
self::parse_str(strtr($this->server['HTTP_COOKIE'], self::$hvaltr), $this->cookie);
}
if (isset($this->server['QUERY_STRING'])) {
self::parse_str($this->server['QUERY_STRING'], $this->get);
}
*/
// ----------------------------------------------------------
// Discovery Protocol, based on HTTP headers...
// ----------------------------------------------------------
if (isset($this->server['HTTP_SEC_WEBSOCKET_VERSION'])) { // HYBI
if ($this->server['HTTP_SEC_WEBSOCKET_VERSION'] === '8') { // Version 8 (FF7, Chrome14)
$this->switch_to_protocol('v13');
} elseif ($this->server['HTTP_SEC_WEBSOCKET_VERSION'] === '13') { // newest protocol
$this->switch_to_protocol('v13');
} else {
error_log(get_class($this) . '::' . __METHOD__ . " : Websocket protocol version " . $this->server['HTTP_SEC_WEBSOCKET_VERSION'] . ' is not yet supported for client "addr"'); // $this->addr
$this->close();
return false;
}
} elseif (!isset($this->server['HTTP_SEC_WEBSOCKET_KEY1']) || !isset($this->server['HTTP_SEC_WEBSOCKET_KEY2'])) {
$this->switch_to_protocol('ve');
} else { // Defaulting to HIXIE (Safari5 and many non-browser clients...)
$this->switch_to_protocol('v0');
}
// ----------------------------------------------------------
// End of discovery protocol
// ----------------------------------------------------------
return true;
}
private function switch_to_protocol($protocol) {
$class = 'ws_'.$protocol;
$this->new_instance = new $class($this->socket);
$this->new_instance->state = $this->state;
$this->new_instance->unparsed_data = $this->unparsed_data;
$this->new_instance->server = $this->server;
$this->new_instance->on_frame_user = $this->on_frame_user;
}
/**
* Send Bad request
* @return void
*/
public function bad_request() {
$this->write("400 Bad Request\r\n\r\n<html><head><title>400 Bad Request</title></head><body bgcolor=\"white\"><center><h1>400 Bad Request</h1></center></body></html>");
$this->close();
}
/**
* Replacement for default parse_str(), it supoorts UCS-2 like this: %uXXXX
* @param string s, String to parse
* @param array &$var Reference to the resulting array
* @param boolean $header Header-style string
* @return void
*/
public static function parse_str($s, &$var, $header = false) {
static $cb;
if ($cb === null) {
$cb = function ($m) {
return urlencode(html_entity_decode('&#' . hexdec($m[1]) . ';', ENT_NOQUOTES, 'utf-8'));
};
}
if ($header) {
$s = strtr($s, self::$hvaltr);
}
if (
(stripos($s, '%u') !== false)
&& preg_match('~(%u[a-f\d]{4}|%[c-f][a-f\d](?!%[89a-f][a-f\d]))~is', $s $m)
) {
$s = preg_replace_callback('~%(u[a-f\d]{4}|[a-f\d]{2})~i', $c, $s);
}
parse_str($s, $var);
}
/**
* Send data to the connection. Note that it just writes buffer to that flushes at every baseloop
* @param string $data Data to send
* @return boolean Success
*/
public function write($data) {
if ($this->closed) return false;
return stream_socket_sendto($this->socket, $data) == 0;
}

/**
* Будьте люб'язні в отнаследованном класі реалізувати цей метод
* @return bool
*/
protected function send_handshake_reply() {
return false;
}
/**
* Called when we're going to handshake.
* @return boolean Handshake status
*/
public function handshake() {
$extra_headers = ";
foreach ($this- > headers as $k => $line) {
if ($k !== 'STATUS') {
$extra_headers .= $line . "\r\n";
}
}

if (!$this->send_handshake_reply($extra_headers)) {
error_log(get_class($this) . '::' . __METHOD__ . ': Handshake protocol failure for client ""'); // $this->addr
$this->close();
return false;
}

$this->handshaked = true;
$this->headers_sent = true;
$this->state = static::STATE_HANDSHAKED;
return true;
}
/**
* Read from buffer without draining
* @param integer $n Number of bytes to read
* @param integer $o Offset
* @return string|false
*/
public function look($n, $o = 0) {
if (strlen($this->unparsed_data) <= $o) {
return ";
}
return substr($this->unparsed_data, $o $n);
}
/**
* Convert bytes into integer
* @param string $str Bytes
* @param boolean $l Little endian? Default is false
* @return integer
*/
public static function bytes2int($str, $l = false) {
if ($l) {
$str = strrev($str);
}
$dec = 0;
$len = strlen($str);
for ($i = 0; $i < $len; ++$i) {
$dec += ord(substr($str, $i, 1)) * pow(0x100, $len - $i - 1);
}
return $dec;
}
/**
* Drains buffer
* @param integer $n Numbers of bytes to drain
* @return boolean Success
*/
public function drain($n) {
$ret = substr($this->unparsed_data, 0, $n);
$this->unparsed_data = substr($this->unparsed_data, $n);
return $ret;
}
/**
* Read data from the connection's buffer
* @param integer $n Max. number of bytes to read
* @return string|false Readed data
*/
public function read($n) {
if ($n <= 0) {
return ";
}
$read = $this->drain($n);
if ($read === ") {
return false;
}
return $read;
}
/**
* Reads all data from the connection's buffer
* @return string Readed data
*/
public function read_unlimited() {
$ret = $this->unparsed_data;
$this->unparsed_data = ";
return $ret;
}
/**
* Searches first occurence of the string in input buffer
* @param string $what Needle
* @param integer $start Offset start
* @param integer $end Offset end
* @return integer Position
*/
public function search($what, $start = 0, $end = -1) {
return strpos($this->unparsed_data, $what, $start);
}
/**
* Called when new frame received.
* @param string $data Frame's data.
* @param string $type Frame's type ("STRING" АБО "BINARY").
* @return boolean Success.
*/
public function on_frame($data, $type) {
if (is_callable($this->on_frame_user)) {
call_user_func($this->on_frame_user, $this, $data, $type);
}
return true;
}
public function send_frame($data, $type = null, $cb = null) {
return false;
}
/**
* Get real frame type identificator
* @param $type
* @return integer
*/
public function get_frame_type($type) {
if (is_int($type)) {
return $type;
}
if ($type === null) {
$type = 'STRING';
}
$frametype = @constant(get_class($this) . '::' . $type);
if ($frametype === null){
error_log(__METHOD__ . ': Undefined frametype "' . $type . '"');
}
return $frametype;
}
}


По суті тут додані 3 речі: «з'єднання з клієнтом на рівні веб сокета», «отримання повідомлення від клієнта», «відправка повідомлення клієнту».

Для початку трохи теорії та термінології. «Handshake» — це з точки зору веб сокетів процедура встановлення з'єднання поверх http. Адже треба вирішити купу питань: як пробитися крізь гущу проксі і кэшэй, як захиститися від злих хакерів. І термін «frame» — це шматок даних в розшифрованому вигляді, це повідомлення від клієнта або повідомлення для клієнта. Можливо, про це варто було написати на початку статті, але із-за цих «frame» робити сокет сервер у блокувальному режимі сокетів імхо безглуздо. Те, як зроблено цей момент ось тут — це позбавило мене сну не на одну ніч. В тій статті не розглядається варіант, що frame приїхав не повністю або їх приїхало одразу два. І то і то, між іншим, цілком типова ситуація, як показали логи гри.

Тепер до деталей.

З'єднання з клієнтом на рівні веб сокета — передбачається, що протокол (наприклад, ws_v0) перекриє метод «on_read» і всередині себе смикне «handshake», коли даних буде достатньо. Далі шматок «handshake» у батька. Далі смикається метод «send_handshake_reply», який повинен бути реалізований в протоколі. Цей ось «send_handshake_reply» має таке відповісти клієнту, щоб той зрозумів, що «з'єднання встановлено», нормальним браузерам — нормальний відповідь, а для Сафарі — особливий відповідь.

Отримання повідомлення від клієнта. Звертаю увагу, що дурні клієнти можуть реалізувати такий варіант, що з'єднання не встановлено, а повідомлення від користувача вже настав. Тому треба бережно ставиться до «unparsed_data» змінної. У кожному протоколі метод «on_read» повинен зрозуміти розмір переданого frame, переконатися, що frame цілком приїхав, розшифрувати який приїхав frame повідомлення користувача. У кожному протоколі це робиться дуже по-різному і дуже кучеряво (ми ж не знаємо, приїхав frame повністю або немає, плюс не можна відкусити ні байта наступного frame). Далі всередині «on_read», коли дані клієнта отримані і розшифровані та визначено їх тип (так-так і таке передбачено), смикаємо метод «on_frame», який всередині класу «ws», той, у свою чергу, смикне custom callback (функція $my_callback, перед основним циклом яка). І в підсумку $my_callback отримує повідомлення від клієнта.

Надсилання повідомлення клієнту. Просто смикається метод «send_frame», який повинен бути реалізований всередині протоколу. Тут просто шифруємо повідомлення і відправляємо користувачеві. Різні протоколи шифрують по-різному.

Тепер додаю 3 протоколу «v13», «v0», «ve»:

Файл <ws_v13.php>
class ws_v13 extends ws {
const CONTINUATION = 0;
const STRING = 0x1;
const BINARY = 0x2;
const CONNCLOSE = 0x8;
const PING = 0x9;
const PONG = 0xA;
protected static $opcodes = [
0 => 'CONTINUATION',
0x1 => 'STRING',
0x2 => 'BINARY',
0x8 => 'CONNCLOSE',
0x9 => 'PING',
0xA => 'PONG',
];
protected $outgoingCompression = 0;

protected $framebuf = ";

/**
* Apply mask
* @param $data
* @param string|false $mask
* @return mixed
*/
public function mask($data, $mask) {
for ($i = 0, $l = strlen($data), $ml = strlen($mask); $i < $l; $i++) {
$data[$i] = $data[$i] ^ $mask[$i % $ml];
}
return $data;
}

/**
* Sends a frame.
* @param string $data Frame's data.
* @param string $type Frame's type. ("STRING" АБО "BINARY")
* @param callable $cb Optional. Callback called when the frame is received by client.
* @callback $cb ( )
* @return boolean Success.
*/
public function send_frame($data, $type = null, $cb = null) {
if (!$this->handshaked) {
return false;
}

if ($this->closed && $type !== 'CONNCLOSE') {
return false;
}

/*if (in_array($type, ['STRING', 'BINARY']) && ($this->outgoingCompression > 0) && in_array('deflate-frame', $this->extensions)) {
//$data = gzcompress($data, $this->outgoingCompression);
//$rsv1 = 1;
}*/

$fin = 1;
$rsv1 = 0;
$rsv2 = 0;
$rsv3 = 0;
$this->write(chr(bindec($fin . $rsv1 . $rsv2 . $rsv3 . str_pad(decbin($this->get_frame_type($type)), 4, '0', STR_PAD_LEFT))));
$dataLength = strlen($data);
$isMasked = false;
$isMaskedInt = $isMasked ? 128 : 0;
if ($dataLength <= 125) {
$this->write(chr($dataLength + $isMaskedInt));
} elseif ($dataLength <= 65535) {
$this->write(chr(126 + $isMaskedInt) . // 126 + 128
chr($dataLength >> 8) .
chr($dataLength & 0xFF));
} else {
$this->write(chr(127 + $isMaskedInt) . // 127 + 128
chr($dataLength >> 56) .
chr($dataLength >> 48) .
chr($dataLength >> 40) .
chr($dataLength >> 32) .
chr($dataLength >> 24) .
chr($dataLength >> 16) .
chr($dataLength >> 8) .
chr($dataLength & 0xFF));
}
if ($isMasked) {
$mask = chr(mt_rand(0, 0xFF)) .
chr(mt_rand(0, 0xFF)) .
chr(mt_rand(0, 0xFF)) .
chr(mt_rand(0, 0xFF));
$this->write($mask . $this->mask($data, $mask));
} else {
$this->write($data);
}
if ($cb !== null) {
$cb();
}
return true;
}

/**
* Sends a handshake message reply
* @param string Received data (no use in this class)
* @return boolean OK?
*/
public function send_handshake_reply($extraHeaders = ") {
if (!isset($this->server['HTTP_SEC_WEBSOCKET_KEY']) || !isset($this->server['HTTP_SEC_WEBSOCKET_VERSION'])) {
return false;
}
if ($this->server['HTTP_SEC_WEBSOCKET_VERSION'] !== '13' && $this->server['HTTP_SEC_WEBSOCKET_VERSION'] !== '8') {
return false;
}

if (isset($this->server['HTTP_ORIGIN'])) {
$this->server['HTTP_SEC_WEBSOCKET_ORIGIN'] = $this->server['HTTP_ORIGIN'];
}
if (!isset($this->server['HTTP_SEC_WEBSOCKET_ORIGIN'])) {
$this->server['HTTP_SEC_WEBSOCKET_ORIGIN'] = ";
}
$this->write("HTTP/1.1 101 Switching Protocols\r\n"
. "Upgrade: WebSocket\r\n"
. "Connection: Upgrade\r\n"
. "Date: " . date('r') . "\r\n"
. "Sec-WebSocket-Origin: " . $this->server['HTTP_SEC_WEBSOCKET_ORIGIN'] . "\r\n"
. "Sec-WebSocket-Location: ws://" . $this->server['HTTP_HOST'] . $this->server['REQUEST_URI'] . "\r\n"
. "Sec-WebSocket-Accept: " . base64_encode(sha1(trim($this->server['HTTP_SEC_WEBSOCKET_KEY']) . "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", true)) . "\r\n"
);
if (isset($this->server['HTTP_SEC_WEBSOCKET_PROTOCOL'])) {
$this->write("Sec-WebSocket-Protocol: " . $this->server['HTTP_SEC_WEBSOCKET_PROTOCOL']."\r\n");
}

$this->write($extraHeaders."\r\n");

return true;
}
/**
* Called when new data received
* @see http://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-10#page-16
* @return void
*/
public function on_read() {
if ($this->closed) return;
if ($this->state === self::STATE_PREHANDSHAKE) {
if (!$this->handshake()) {
return;
}
}
if ($this->state === self::STATE_HANDSHAKED) {
while (($buflen = strlen($this->unparsed_data)) >= 2) {
$first = ord($this->look(1)); // first byte integer (fin, opcode)
$firstBits = decbin($first);
$opcode = (int)bindec(substr($firstBits, 4, 4));
if ($opcode === 0x8) { // CLOSE
$this->close();
return;
}
$opcodeName = isset(static::$opcodes[$opcode]) ? static::$opcodes[$opcode] : false;
if (!$opcodeName) {
error_log(get_class($this) . ': Undefined opcode ' . $opcode);
$this->close();
return;
}
$second = ord($this->look(1, 1)); // second byte integer (masked, payload length)
$fin = (bool)($first >> 7);
$isMasked = (bool)($second >> 7);
$dataLength = $second & 0x7f;
$p = 2;
if ($dataLength === 0x7e) { // 2 bytes-length
if ($buflen < $p + 2) {
return; // not enough data yet
}
$dataLength = self::bytes2int($this->look(2, $p), false);
$p += 2;
} elseif ($dataLength === 0x7f) { // 8 байт-length
if ($buflen < $p + 8) {
return; // not enough data yet
}
$dataLength = self::bytes2int($this->look(8, $p));
$p += 8;
}
if (self::maxAllowedPacket <= $dataLength) {
// Too big packet
$this->close();
return;
}
if ($isMasked) {
if ($buflen < $p + 4) {
return; // not enough data yet
}
$mask = $this->look(4, $p);
$p += 4;
}
if ($buflen < $p + $dataLength) {
return; // not enough data yet
}
$this->drain($p);
$data = $this->read($dataLength);
if ($isMasked) {
$data = $this->mask($data, $mask);
}
//Daemon::log(Debug::dump(array('ext' => $this->extensions, 'rsv1' => $firstBits[1], 'data' => Debug::exportBytes($data))));
/*if ($firstBits[1] && in_array('deflate-frame', $this->extensions)) { // deflate frame
$data = gzuncompress($data, $this->pool->maxAllowedPacket);
}*/
if (!$fin) {
$this->framebuf .= $data;
} else {
$this->on_frame($this->framebuf . $data, $opcodeName);
$this->framebuf = ";
}
}
}
}
}


Файл <ws_v0.php>
class ws_v0 extends ws {
const STRING = 0x00;
const BINARY = 0x80;

protected $key;

/**
* Sends a handshake message reply
* @param string Received data (no use in this class)
* @return boolean OK?
*/
public function send_handshake_reply($extraHeaders = ") {
if (!isset($this->server['HTTP_SEC_WEBSOCKET_KEY1']) || !isset($this->server['HTTP_SEC_WEBSOCKET_KEY2'])) {
return false;
}
$final_key = $this->_computeFinalKey($this->server['HTTP_SEC_WEBSOCKET_KEY1'], $this->server['HTTP_SEC_WEBSOCKET_KEY2'], $this->key);
$this->key = null;

if (!$final_key) {
return false;
}

if (!isset($this->server['HTTP_SEC_WEBSOCKET_ORIGIN'])) {
$this->server['HTTP_SEC_WEBSOCKET_ORIGIN'] = ";
}

$this->write("HTTP/1.1 101 Web Socket Handshake Protocol\r\n"
. "Upgrade: WebSocket\r\n"
. "Connection: Upgrade\r\n"
. "Sec-WebSocket-Origin: " . $this->server['HTTP_ORIGIN'] . "\r\n"
. "Sec-WebSocket-Location: ws://" . $this->server['HTTP_HOST'] . $this->server['REQUEST_URI'] . "\r\n");
if (isset($this->server['HTTP_SEC_WEBSOCKET_PROTOCOL'])) {
$this->write("Sec-WebSocket-Protocol: " . $this->server['HTTP_SEC_WEBSOCKET_PROTOCOL']."\r\n");
}
$this->write($extraHeaders . "\r\n" . $final_key);
return true;
}

/**
* Computes final key for Sec-WebSocket.
* @param string Key1
* @param string Key2
* @param string Data
* @return Result string
*/
protected function _computeFinalKey($key1, $key2, $data) {
if (strlen($data) < 8) {
error_log(get_class($this) . '::' . __METHOD__ . ': Invalid handshake data for client ""'); // $this->addr
return false;
}
return md5($this->_computeKey($key1) . $this->_computeKey($key2) . substr($data, 0, 8), true);
}

/**
* Computes key for Sec-WebSocket.
* @param string Key
* @return Result string
*/
protected function _computeKey($key) {
$spaces = 0;
$digits = ";

for ($i = 0, s = strlen($key); $i < $s; ++$i) {
$c = substr($key, $i, 1);

if ($c === "\x20") {
++$spaces;
} elseif (ctype_digit($c)) {
$digits .= $c;
}
}

if ($spaces > 0) {
$result = (float)floor($digits / $spaces);
} else {
$result = (float)$digits;
}

return pack('N', $result);
}

/**
* Sends a frame.
* @param string $data Frame's data.
* @param string $type Frame's type. ("STRING" АБО "BINARY")
* @param callable $cb Optional. Callback called when the frame is received by client.
* @callback $cb ( )
* @return boolean Success.
*/
public function send_frame($data, $type = null, $cb = null) {
if (!$this->handshaked) {
return false;
}

if ($this->closed && $type !== 'CONNCLOSE') {
return false;
}
if ($type === 'CONNCLOSE') {
if ($cb !== null) {
$cb$this);
return true;
}
}

$type = $this->get_frame_type($type);
// Binary
if (($type & self::BINARY) === self::BINARY) {
$n = strlen($data);
$len = ";
$pos = 0;

char:

++$pos;
$c = $n >> 0 & 0x7F;
$n >>= 7;

if ($pos !== 1) {
$c += 0x80;
}

if ($c !== 0x80) {
$len = chr($c) . $len;
goto char;
};

$this->write(chr(self::BINARY) . $len . $data);
}
// String
else {
$this->write(chr(self::STRING) . $data . "\xFF");
}
if ($cb !== null) {
$cb();
}
return true;
}

/**
* Called when new data received
* @return void
*/
public function on_read() {
if ($this->state === self::STATE_PREHANDSHAKE) {
if (strlen($this->unparsed_data) < 8) {
return;
}
$this->key = $this->read_unlimited();
$this->handshake();
}
if ($this->state === self::STATE_HANDSHAKED) {
while (($buflen = strlen($this->unparsed_data)) >= 2) {
$hdr = $this->look(10);
$frametype = ord(substr($hdr, 0, 1));
if (($frametype & 0x80) === 0x80) {
$len = 0;
$i = 0;
do {
if ($buflen < $i + 1) {
// not enough data yet
return;
}
$b = ord(substr($hdr, ++$i, 1));
$n = $b & 0x7F;
$len *= 0x80;
$len += $n;
} while ($b > 0x80);

if (self::maxAllowedPacket <= $len) {
// Too big packet
$this->close();
return;
}

if ($buflen < $len + $i + 1) {
// not enough data yet
return;
}
$this->drain($i + 1);
$this->on_frame($this->read($len), 'BINARY');
} else {
if (($p = $this->search("\xFF")) !== false) {
if (self::maxAllowedPacket <= $p - 1) {
// Too big packet
$this->close();
return;
}
$this->drain(1);
$data = $this->read($p);
$this->drain(1);
$this->on_frame($data, 'STRING');
} else {
if (self::maxAllowedPacket < $buflen - 1) {
// Too big packet
$this->close();
return;
}
// not enough data yet
return;
}
}
}
}
}
}


Файл <ws_ve.php>
class ws_ve extends ws {
const STRING = 0x00;
const BINARY = 0x80;

/**
* Sends a handshake message reply
* @param string Received data (no use in this class)
* @return boolean OK?
*/
public function send_handshake_reply($extraHeaders = ") {
if (!isset($this->server['HTTP_SEC_WEBSOCKET_ORIGIN'])) {
$this->server['HTTP_SEC_WEBSOCKET_ORIGIN'] = ";
}

$this->write("HTTP/1.1 101 Web Socket Handshake Protocol\r\n"
. "Upgrade: WebSocket\r\n"
. "Connection: Upgrade\r\n"
. "Sec-WebSocket-Origin: " . $this->server['HTTP_ORIGIN'] . "\r\n"
. "Sec-WebSocket-Location: ws://" . $this->server['HTTP_HOST'] . $this->server['REQUEST_URI'] . "\r\n"
);
if (isset($this->server['HTTP_SEC_WEBSOCKET_PROTOCOL'])) {
$this->write("Sec-WebSocket-Protocol: " . $this->server['HTTP_SEC_WEBSOCKET_PROTOCOL']."\r\n");
}
$this->write($extraHeaders."\r\n");
return true;
}

/**
* Computes key for Sec-WebSocket.
* @param string Key
* @return Result string
*/
protected function _computeKey($key) {
$spaces = 0;
$digits = ";

for ($i = 0, s = strlen($key); $i < $s; ++$i) {
$c = substr($key, $i, 1);

if ($c === "\x20") {
++$spaces;
} elseif (ctype_digit($c)) {
$digits .= $c;
}
}

if ($spaces > 0) {
$result = (float)floor($digits / $spaces);
} else {
$result = (float)$digits;
}

return pack('N', $result);
}

/**
* Sends a frame.
* @param string $data Frame's data.
* @param string $type Frame's type. ("STRING" АБО "BINARY")
* @param callable $cb Optional. Callback called when the frame is received by client.
* @callback $cb ( )
* @return boolean Success.
*/
public function send_frame($data, $type = null, $cb = null) {
if (!$this->handshaked) {
return false;
}

if ($this->closed && $type !== 'CONNCLOSE') {
return false;
}

if ($type === 'CONNCLOSE') {
if ($cb !== null) {
$cb$this);
return true;
}
}

// Binary
$type = $this->get_frame_type($type);
if (($type & self::BINARY) === self::BINARY) {
$n = strlen($data);
$len = ";
$pos = 0;

char:

++$pos;
$c = $n >> 0 & 0x7F;
$n >>= 7;

if ($pos !== 1) {
$c += 0x80;
}

if ($c !== 0x80) {
$len = chr($c) . $len;
goto char;
};

$this->write(chr(self::BINARY) . $len . $data);
}
// String
else {
$this->write(chr(self::STRING) . $data . "\xFF");
}
if ($cb !== null) {
$cb();
}
return true;
}

/**
* Called when new data received
* @return void
*/
public function on_read() {
while (($buflen = strlen($this->unparsed_data)) >= 2) {
$hdr = $this->look(10);
$frametype = ord(substr($hdr, 0, 1));
if (($frametype & 0x80) === 0x80) {
$len = 0;
$i = 0;
do {
if ($buflen < $i + 1) {
return;
}
$b = ord(substr($hdr, ++$i, 1));
$n = $b & 0x7F;
$len *= 0x80;
$len += $n;
} while ($b > 0x80);

if (self::maxAllowedPacket <= $len) {
// Too big packet
$this->close();
return;
}

if ($buflen < $len + $i + 1) {
// not enough data yet
return;
}

$this->drain($i + 1);
$this->on_frame($this->read($len), $frametype);
} else {
if (($p = $this->search("\xFF")) !== false) {
if (self::maxAllowedPacket <= $p - 1) {
// Too big packet
$this->close();
return;
}
$this->drain(1);
$data = $this->read($p);
$this->drain(1);
$this->on_frame($data, 'STRING');
} else {
if (self::maxAllowedPacket < $buflen - 1) {
// Too big packet
$this->close();
return;
}
}
}
}
}
}


Відразу хочу зазначити, що протокол VE не тестував — поняття не маю хто його використовує. Але сумлінно конвертував і урізав код з PhpDeamon.

Протокол V13 використовують всі нормальні браузери FireFox, Opera, Chrome, Яндекс). Навіть IE його використовує (вибачте, після IE6 — для мене IE ніколи не буде «оглядача», навіть команда розробник IE заявляли, що це «не браузер, а тонкий клієнт»). Протокол V0 використовує браузер «Сафарі».

Замість висновку

Спасибі за увагу, використовуйте на здоров'я весь наведений вище код (зрозуміло, я раджу загорнути його в нормальні об'єкти, тут все спрощено виключно для розуміння. Особливо callback на який прийшов від користувача frame раджу зробити по-нормальному). Якщо ви будете використовувати цей код, напишіть будь-ласка де в коді «Спасибі Anlide і PhpDeamon». У підсумку сокет-сервер, наведений тут, сумісний з усіма сучасними браузерами. Працює без витоків пам'яті і годиться для використання в високонавантажених системах.
Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.