- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;+ q# ]$ h: v/ o
- require_once __DIR__ . '/Workerman/Autoloader.php';
& x1 e3 M# e2 k0 e5 ^" v& a& _" x - ; b5 v. B9 M7 ]7 E2 ^( o9 q
- $worker = new Worker();
3 T# E; g* p. x4 ~% x: k - // 4个进程4 y* b( U5 a! h- D* |* a1 b
- $worker->count = 4;4 Y/ K- d+ h1 ] w: R+ {+ A: R
- // 每个进程启动后在当前进程新增一个Worker监听
5 h' Y! b X% M% f - $worker->onWorkerStart = function($worker); r4 n7 \) a. f. [; G1 _
- {
" l2 G5 E& H8 b# w# s, u - /**
9 P$ _3 \" p% q# Y! A+ i- } - * 4个进程启动的时候都创建2016端口的Worker0 ^3 z3 V2 o0 d( c
- * 当执行到worker->listen()时会报Address already in use错误
8 H7 x: K8 R9 x- g. D" ? o - * 如果worker->count=1则不会报错
2 M6 Q$ P$ Q( C( ~' O S2 ` - */, o* l. z5 m$ ]7 [3 k. x' Y; a
- $inner_worker = new Worker('http://0.0.0.0:2016');- @. e6 p: v% S0 i8 e
- $inner_worker->onMessage = 'on_message';
; ~/ b; |: J$ k9 u- |- a" { - // 执行监听。这里会报Address already in use错误
; G/ [; ^8 ~5 f - $inner_worker->listen();
: D; b3 J& [% P: |/ I - };. Z4 c& D2 A2 Q6 E0 O1 o
- 2 c& E: g: O2 J4 n: V! G
- $worker->onMessage = 'on_message';
m9 ^6 v0 |) T6 {+ N% X3 X! _ - 7 O9 E! {5 g3 A( {& `. N
- function on_message($connection, $data)7 A. i" h6 e3 t4 X
- {
8 [ s- A0 @" ^ j- P - $connection->send("hello\n");: \9 |$ g+ G7 M" ? e
- }
, C6 }( I' [$ Y. M# t$ O, _
/ j" W$ o9 G& I' F- // 运行worker
5 ]/ C% ]) ~: S4 T) H+ N - Worker::runAll();: q# L- o, ?" n# E: {
- 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:9 K! E: F4 j8 ~8 Z8 c2 B3 n1 G
- ) I, _5 O, l: F' c. u* |
- use Workerman\Worker;9 u- Q7 k6 w7 [3 O1 o* Y+ }
- require_once './Workerman/Autoloader.php';
! I4 P* s% f4 T+ t" C+ g" E9 H - 4 i: l& w. [( S
- $worker = new Worker('text://0.0.0.0:2015');
) N4 w8 f3 o# Y9 o4 e; [ - // 4个进程: L- @5 l- K; | B% r
- $worker->count = 4;
' S7 Q L3 n' X - // 每个进程启动后在当前进程新增一个Worker监听; g7 P+ U% C# E) w$ N
- $worker->onWorkerStart = function($worker)& \' y6 r: S; k f2 V$ u
- {, d8 u# q0 z0 {$ p; \) y* o8 _
- $inner_worker = new Worker('http://0.0.0.0:2016');/ M( J0 t# v. ~2 W. N9 P$ [
- // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)- z2 ^- e2 P/ {( x, @$ h
- $inner_worker->reusePort = true;' x" ^& Q# E# B% E, B
- $inner_worker->onMessage = 'on_message';& y$ m! P, Y6 D! ? J, h( Q
- // 执行监听。正常监听不会报错
9 X/ p, S [, B. ?8 o. \. o - $inner_worker->listen();
" K8 D/ E1 ^! p3 u - };
6 N3 [7 K( W/ X
# n( L2 \( i7 h- $worker->onMessage = 'on_message';' Q0 o" b! y f
* F2 n1 k- { D6 F+ J- function on_message($connection, $data) ?0 Z: D2 ?# b& o
- {* H- j G0 N1 E; C' s* C
- $connection->send("hello\n");1 |& O( S [1 w- U8 S4 |
- }
+ [3 N2 E$ g1 N1 q( z
. X7 `: d7 f+ N9 @- // 运行worker+ O; |- B3 t- n1 F$ h" X" w
- Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php
# \6 \" z( O i; n( p$ g - use Workerman\Worker;# Z/ I9 s6 d0 {% p4 o2 L
- require_once './Workerman/Autoloader.php';
( L( u# o6 Q/ x3 h) w+ ?8 n! m - // 初始化一个worker容器,监听1234端口
8 B$ ]% b) o5 J& p - $worker = new Worker('websocket://0.0.0.0:1234');
' I' I5 x2 m( m5 d3 I9 V - $ [% f' g. U' r# x& L
- /*
( t8 n9 `4 v6 E" h - * 注意这里进程数必须设置为1,否则会报端口占用错误
6 c1 `% e# K0 D* ]& o# d- `+ h# A - * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true); d; o$ Q) _- i* k
- */
2 f: p) Q4 u& `8 Z' J/ f: ? - $worker->count = 1;
9 i3 j+ S6 B2 O+ S - // worker进程启动后创建一个text Worker以便打开一个内部通讯端口$ [3 C5 @9 M8 p: u( m) n( ]
- $worker->onWorkerStart = function($worker)
! N1 k r. I. s+ l& p6 h - {' }% T# H% B4 r
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符6 ]3 v: E; P+ M! K6 ~- t
- $inner_text_worker = new Worker('text://0.0.0.0:5678');* b% w" N* \: ^8 v
- $inner_text_worker->onMessage = function($connection, $buffer)
0 Z- B T2 f5 X: J: w - {
5 I; I+ w+ t. S" e" e - // $data数组格式,里面有uid,表示向那个uid的页面推送数据2 ~* d, b, l9 H
- $data = json_decode($buffer, true);5 ^7 d6 u- n5 k; U$ O
- $uid = $data['uid'];6 ?- \1 b8 S1 n+ y4 P& X$ z
- // 通过workerman,向uid的页面推送数据7 k$ H9 ^9 u: z) e m
- $ret = sendMessageByUid($uid, $buffer);
4 q. ~3 D, z) }; t5 [0 `' t - // 返回推送结果& W1 p& U6 u) b
- $connection->send($ret ? 'ok' : 'fail');
. E# P0 F4 A- n - };5 l- P1 T; U/ S: B
- // ## 执行监听 ##
+ q+ r. k9 @+ I% G- r- a - $inner_text_worker->listen();+ r* c' a8 O0 {$ H4 `/ ^, R5 n( N
- };
# y2 z& M! A1 R3 X - // 新增加一个属性,用来保存uid到connection的映射
1 u. Z3 n1 g' O! G6 {% b - $worker->uidConnections = array();
& z$ O7 @9 r( E - // 当有客户端发来消息时执行的回调函数
4 M' B3 }) z4 f" B, u3 X - $worker->onMessage = function($connection, $data)
1 r8 M b- Y5 n' C% I: ] - {
! p2 y& @" c+ S, m2 a+ V" z - global $worker;
7 J9 E0 a1 p1 t$ x - // 判断当前客户端是否已经验证,既是否设置了uid+ ~0 W |, J# G, I
- if(!isset($connection->uid))5 D" K& U8 Z# f5 i! C* r) H- P7 H7 {% c
- {
4 I: z; y+ b9 j3 I5 X - // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)4 i }2 |% J% F! A& O+ s
- $connection->uid = $data; L2 ]; C/ G) Z4 _& f
- /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,2 R1 C5 X* U2 f9 j4 W! K
- * 实现针对特定uid推送数据# L% p8 _* @5 K. B
- */8 N& J; \* m# g4 @# r
- $worker->uidConnections[$connection->uid] = $connection;. u* o8 P _4 C3 W) e
- return;
8 s5 T7 j" g6 W" H - }
2 \- o9 o W( L- e7 f$ I& c - };
& P. C0 @( D w& g/ j0 h
. \9 ~ ~9 Z7 ]% ~- `: U; u- // 当有客户端连接断开时, h3 Q4 [4 t2 e+ E
- $worker->onClose = function($connection)% y N2 t0 n. |: u) z% {# \% h. h
- {# S; {1 m7 h! J# N: [
- global $worker;
2 Y) U; a- ^: a6 m/ t1 I( d - if(isset($connection->uid))) T$ z% e$ Y: K
- {
; j. X+ T0 _0 O0 |+ A- W4 Z& B - // 连接断开时删除映射
* S- |& N8 m* o" w) j4 B+ Q7 ^ - unset($worker->uidConnections[$connection->uid]);
, v! Y# {/ F$ A& o& Q2 a1 \0 p - }
: U7 Z2 t9 I# k! u; ~# C - };6 r% o/ s* t- v
( {3 E# U" }# v+ i# @2 b t* I- // 向所有验证的用户推送数据
! b) O" L7 h. w4 M! f: c - function broadcast($message)
1 l* S1 r5 R& W2 s" U0 b - {( l* `7 P% p) S
- global $worker;
7 I @5 Q; z. G/ G, K& J - foreach($worker->uidConnections as $connection)
: l: T$ ^8 H: Q4 d - {
$ a) H! H6 z. Z6 x( G* m- X' g+ ~, M - $connection->send($message);
( `# g d) h9 A( s1 o - }
1 D8 {. j+ _' n1 [ - }- B3 g) d' l/ U
- ! y1 ^* }+ F6 z4 K
- // 针对uid推送数据
% t+ p+ J, i# V% H% v/ b - function sendMessageByUid($uid, $message)5 H- w/ V: W* |/ U) g" h
- {
* s4 g7 z- }( E" P. H4 ~ - global $worker;3 Z9 k+ i% c, i' L% V3 q2 U
- if(isset($worker->uidConnections[$uid]))/ U3 W, W$ S, m1 S% Y0 l
- {+ Q9 y1 J' k9 a/ i4 {) Y
- $connection = $worker->uidConnections[$uid];
7 C5 I" J- w/ r* q) t( ]/ c; d - $connection->send($message);
1 R0 k/ Y: F4 M7 g3 }8 }8 ] - return true;
; v# F( h4 Q4 q3 }/ l; l7 R - }+ q |: h: m; ^% C4 w
- return false;* H& d( ]! M: E x6 K/ n, s
- }! c% [2 [% E/ ~$ h! C4 N$ W8 b0 f
/ w6 B- w8 O, q: P( F4 s$ `# {- // 运行所有的worker$ n; j4 W# u, O) [9 D
- Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');
6 j2 i7 q3 J9 f2 y' z6 b" k& b - ws.onopen = function(){
9 W* f$ ?3 h* E- s5 E* T: i - var uid = 'uid1';6 K0 F8 e2 R+ Y; J
- ws.send(uid);
& X9 \9 d) Q- z' H - };
- Z2 C+ C) X/ s) p - ws.onmessage = function(e){
4 Q( S( S, g/ B7 F5 e( M - alert(e.data);
2 }( C! T/ p I3 ? - };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口
. P3 S. k0 {9 t0 Y# ^+ S7 @7 a - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
8 L Z, C, M, U' T) m. B4 h, A" r - // 推送的数据,包含uid字段,表示是给这个uid推送: a: E4 L$ M, k0 i4 G7 {
- $data = array('uid'=>'uid1', 'percent'=>'88%');
8 w3 G3 r4 i# E - // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
( F1 _1 h4 ~6 U- i3 J1 g4 l8 N - fwrite($client, json_encode($data)."\n");
' z5 _8 b/ D& P' }' f3 ?& I - // 读取推送结果
' w( {& J2 W( T8 b& Q - echo fread($client, 8192);
复制代码 ( f! \& Z- G% E
1 v5 n3 S+ a; W! F4 } |