- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;
, T! N: _2 k. G- Q: b9 g; Z - require_once __DIR__ . '/Workerman/Autoloader.php';
7 w2 p' i% n! y0 D' \' q& N
2 {' r2 w* Q. P" e- n; a% s7 Y" ^& C- $worker = new Worker();6 U# h2 d. _1 r8 Z
- // 4个进程
2 N3 d9 ?( Q3 p* s) [' p - $worker->count = 4;
, D6 \, e# T/ Q: D( V% u1 C - // 每个进程启动后在当前进程新增一个Worker监听
7 B% E- T- o0 d* d+ P$ O4 t - $worker->onWorkerStart = function($worker)
; m$ f$ }9 H. r5 a! }6 V2 x - {
" T' Y, v9 L |6 P7 z - /**# }. Q" ]( n2 |8 k
- * 4个进程启动的时候都创建2016端口的Worker: a `- D. V+ s& l
- * 当执行到worker->listen()时会报Address already in use错误, x# O) R5 W+ L1 M+ r0 W* ~) [. t
- * 如果worker->count=1则不会报错
6 c/ s3 [! }9 f0 [ Y/ D3 B - */* i5 z8 C: \- J4 c' z2 b! N9 B7 v6 V
- $inner_worker = new Worker('http://0.0.0.0:2016');# J9 {0 s, \% R; d& |) a- Z& G
- $inner_worker->onMessage = 'on_message';
( n: e! y1 \0 F% c; ~& Y - // 执行监听。这里会报Address already in use错误
0 z3 J5 O) e6 O' |' l% s - $inner_worker->listen();
' y7 p+ o! |0 h) O - };
2 I( t+ E& C( e. P1 h; m3 p9 b - 3 h* T! U' H7 c
- $worker->onMessage = 'on_message';; d' h0 K: F1 o# l% ?1 k1 _
- % ?4 E/ x' u& J% E
- function on_message($connection, $data)
& v0 Y8 R& b- |0 z# Z - {
: m o4 p9 j+ X6 g1 E0 K - $connection->send("hello\n");
* ~! s# F' I8 ]) ?# _9 j - }* J! n) x; t( t. n
- % A' z2 `$ E2 C
- // 运行worker
" f( e# G7 L$ q. M - Worker::runAll();
/ m: g7 ~& r2 l8 B% e - 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:" e* J9 O4 ]( x& e: p& p" Q
2 ~6 R# n8 Z8 b9 x; {- use Workerman\Worker;
0 c4 e" w D3 s2 D( m0 }: G: g, }. M - require_once './Workerman/Autoloader.php';
$ x z3 [ n! y: u+ Q/ S
# [6 a/ R+ z. s# t- $worker = new Worker('text://0.0.0.0:2015');6 c" J/ K6 g0 Y' W% {
- // 4个进程
9 @$ X" K, @! [# P; D - $worker->count = 4;+ f. g/ [5 h8 i! W$ G
- // 每个进程启动后在当前进程新增一个Worker监听+ p, Y7 q" L4 X; @5 c
- $worker->onWorkerStart = function($worker), w, [ k' u" G) g
- {
' p, g: f5 b3 w! d - $inner_worker = new Worker('http://0.0.0.0:2016');. \" ~8 k0 u4 ?7 N8 ^) G( u
- // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0); Z# w, {9 F% |1 G- r
- $inner_worker->reusePort = true;
% Z) N4 R4 q) u# n6 i7 w - $inner_worker->onMessage = 'on_message';
+ p+ [) T3 j' Q" R; }4 x3 G0 H - // 执行监听。正常监听不会报错
~1 @2 I# g2 R) O - $inner_worker->listen();) f% l4 i. {3 g e Z
- };
: ~8 f" W( X- |5 \! c1 }# h
' m, f2 B+ q3 A$ X% z1 C2 {- M- $worker->onMessage = 'on_message';
6 x( ~3 L" n$ d2 C: S( K - " Y/ K# a5 n. `
- function on_message($connection, $data) R1 ~5 f. w( F! V/ [: l6 Z
- {, w, B7 _1 t/ S0 ^; U6 o% Q
- $connection->send("hello\n");- V$ p3 f# S0 j8 d1 c
- }! ~& w A" q, `9 q
% E `; n \) l6 e! l- l- // 运行worker
- A# E/ _) N6 t! s$ a# u" [ - Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php
1 u& H5 g- v- S6 l; I: z - use Workerman\Worker;3 g# ?8 V# u8 c( Q
- require_once './Workerman/Autoloader.php';; C W) e1 s( H/ U% ]6 Y
- // 初始化一个worker容器,监听1234端口
& j# h& _6 t" v5 Z$ q" G- x - $worker = new Worker('websocket://0.0.0.0:1234');
3 }) T+ y) w& [8 G6 r - ; o, ^2 P" V$ G* g
- /*2 p3 h& Y$ i8 A$ s- o& ]% f3 o4 e
- * 注意这里进程数必须设置为1,否则会报端口占用错误% ~& @9 q+ m2 k
- * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true). K2 W7 f, ` r$ g" A+ Y
- */- W. i0 I: l( J* W0 V* E0 J; w; f
- $worker->count = 1;+ y7 E+ q% M' t: l; @1 [) L
- // worker进程启动后创建一个text Worker以便打开一个内部通讯端口5 p5 ^- V% R- Y) k4 A
- $worker->onWorkerStart = function($worker)
8 G9 G& I/ D$ p, I' p: e - {
% j+ b) I7 x8 {; }, _( |9 o - // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
' Y9 s5 R, Q: E; `" f1 m* G - $inner_text_worker = new Worker('text://0.0.0.0:5678');+ s3 w* h) z! v# F
- $inner_text_worker->onMessage = function($connection, $buffer)9 C2 t3 c9 f6 J" B. _8 g6 [- b/ x
- {/ b1 D* L! O: t/ k
- // $data数组格式,里面有uid,表示向那个uid的页面推送数据: a1 k! R. i1 R& ^
- $data = json_decode($buffer, true);
5 v8 L; P+ ^1 j/ b2 R! ^ - $uid = $data['uid'];) w- M# E5 |5 r# m( r
- // 通过workerman,向uid的页面推送数据
M: m* s; V( R" c: s3 e - $ret = sendMessageByUid($uid, $buffer);3 S% d; P6 p5 p( V
- // 返回推送结果
) V) }6 n0 C- B2 L* g - $connection->send($ret ? 'ok' : 'fail');) O3 M! r. T$ c6 y0 C2 x9 _
- };* V6 j1 I0 r7 L. `* l
- // ## 执行监听 ##
+ C* m8 H1 n1 n: M6 n* r0 ? - $inner_text_worker->listen(); N. d* Q; ]9 {
- };- w x9 p7 Z0 @$ x2 m
- // 新增加一个属性,用来保存uid到connection的映射
: }. u2 w x0 m+ z1 O; M5 l - $worker->uidConnections = array();7 i) g$ o- A7 f7 `
- // 当有客户端发来消息时执行的回调函数
. L/ _2 k$ u0 q: R+ `& U* d - $worker->onMessage = function($connection, $data)8 X/ J" e2 j8 v7 c
- {0 B( y9 i: y/ a" ?
- global $worker;
9 A" m: R3 p5 c4 F4 n1 ^3 ^ - // 判断当前客户端是否已经验证,既是否设置了uid: i3 E, y/ F7 b
- if(!isset($connection->uid))' c. n) Y7 }, A7 z
- {
" @9 x" T; m) E+ K9 S- s - // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
f: E1 N* O, B) d - $connection->uid = $data;
5 @- P7 ?+ J/ x9 q+ Y - /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
5 L! E1 t& u( f* t9 } - * 实现针对特定uid推送数据8 ]" r5 E$ L# k; w5 |. {0 M; {' z
- */
3 b: P7 @# g4 q5 r1 R( y - $worker->uidConnections[$connection->uid] = $connection;4 i4 t& P# Y# Y5 I* s5 ]0 B
- return;& m2 }! X5 f: I7 ]
- }- l: x( a- c4 ~+ X5 m
- };0 M7 H/ m. i/ k& {! h
' D1 z5 r* v; D* h' L" D; v( }- // 当有客户端连接断开时" I/ E2 t, A& g
- $worker->onClose = function($connection), {# a3 m I+ M
- {
/ t/ d7 _3 w" z) `, S: E - global $worker;
2 b) O# r6 B" i; ~- W7 l - if(isset($connection->uid))
+ d( K8 m" Y+ t$ a$ l' w - {
) t) g7 Q! F4 y: I, d7 N6 g9 J - // 连接断开时删除映射 h. N9 |* n: j& F! G
- unset($worker->uidConnections[$connection->uid]);! J3 b+ H% `4 ^ Y9 ~- \3 p
- }
& G. y% A) }! v: x" q, V0 @/ |) | - };& s% d/ f0 {9 o" w x& b
6 }8 y* j1 w K- Q- // 向所有验证的用户推送数据3 K, M5 s! p5 T- `/ |
- function broadcast($message)/ d2 g9 Z2 }: L& \( f
- {1 X# [ H1 T. a- P3 j
- global $worker;
2 |9 b7 F& b! _9 b' U - foreach($worker->uidConnections as $connection)
6 C1 O3 z/ O2 }7 q9 D! p5 @ - {8 H: L& H: Z6 v& X0 `) z/ v
- $connection->send($message);
" H" ^) h4 z c% G6 L% R2 } - }
\& v4 ]' U% |+ C: e$ I - }
0 r, |) H; I6 l3 W$ `0 V* E
, t8 m! }4 Q' v1 {# g- // 针对uid推送数据. n; e/ a% K# ?% Y# w) o; a
- function sendMessageByUid($uid, $message)
3 _5 B1 l# c9 \* v H - {2 S0 y) [8 l7 p+ m
- global $worker;
2 L8 K, [0 ^8 r2 [ A3 ]5 o, I - if(isset($worker->uidConnections[$uid]))
" W' T) c4 Z8 S4 T5 M8 E) X - {
, I1 B. C! D$ Q$ W% w& K$ d' K - $connection = $worker->uidConnections[$uid];' M L& ^5 c- J; Y) u& }9 B
- $connection->send($message);
2 J9 O5 ^+ Z9 R9 i* v ?, _ - return true;
4 B$ r) r/ h# d# w - }
" Q6 b# V$ M4 W: G) U% H. d - return false;. a2 o2 E/ v- o4 r! b( D) R
- }2 N. {/ F; J0 | V+ B; W
) |/ l' U* R6 L8 V6 V% O0 _* _- f- // 运行所有的worker
* i( B9 r( j7 f" [, A2 | - Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');
6 L3 m0 b! z! d3 ~1 u* q! g% e - ws.onopen = function(){6 y0 m* m/ r+ V
- var uid = 'uid1';
% q% T5 O! v" U% `% C* W. o - ws.send(uid);) w% [% L, g& f7 q
- };. y) @! @! |- E" i' u
- ws.onmessage = function(e){
! n5 Q, ^3 N; ?3 M( X$ A: j - alert(e.data);& G$ U% t% q& W
- };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口
) i) s2 \. Q5 ]' u% i' M7 o% v - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
1 Q5 n1 G* z+ U" x( Z4 ^ - // 推送的数据,包含uid字段,表示是给这个uid推送! d* q8 j+ o) F$ f" z! G
- $data = array('uid'=>'uid1', 'percent'=>'88%');
6 I+ Y0 |& k4 z- b3 b+ n - // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符8 j9 ], C/ K3 u( x" ]* ]' ^! O
- fwrite($client, json_encode($data)."\n"); W1 s5 e5 w) b" Y6 f/ c; Q! b% k
- // 读取推送结果
/ N' C* K5 D3 f" {% ]5 k C8 S - echo fread($client, 8192);
复制代码
* H+ z$ [% `% J: B3 f- ?
+ T% I F& t# i* | |