- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;
8 Q0 _: c2 P9 B8 j. s# M2 c - require_once __DIR__ . '/Workerman/Autoloader.php';
: p) r" ?8 y$ f v* a7 U1 x
" D+ Y# l8 e1 x1 @2 W- $worker = new Worker();
% T- p, D {6 N - // 4个进程
. b2 T [3 t7 `7 G- Q* R5 _1 x7 } - $worker->count = 4;
/ \; _- p5 A+ C' y2 p, @ - // 每个进程启动后在当前进程新增一个Worker监听: D M# |3 Q: D4 l
- $worker->onWorkerStart = function($worker)$ H. v: n4 [2 L4 ~9 a5 h$ }
- {
8 Z7 F2 ]+ q7 e* i - /** _0 k2 S; v0 u- |2 y" n. F
- * 4个进程启动的时候都创建2016端口的Worker% o' S: Q( X* r5 p& F* |7 r) [/ k
- * 当执行到worker->listen()时会报Address already in use错误7 g7 \4 r7 a: q- V" K$ x
- * 如果worker->count=1则不会报错: M/ e% u- b0 s5 E, n: p( z# W _
- */
8 C9 K( P* v- _( F& Y - $inner_worker = new Worker('http://0.0.0.0:2016');6 I! Q$ R8 d$ m1 W' b7 d) [
- $inner_worker->onMessage = 'on_message';" i2 t. ]& S5 G! w* Z w
- // 执行监听。这里会报Address already in use错误. k: q, l6 S o& R' _( V* x
- $inner_worker->listen();* ~8 n$ x9 p) f& N* E
- };( M7 ?9 i) t. I9 Y" t2 \
- 7 v& y- x4 F' n, Q
- $worker->onMessage = 'on_message';; F# u* c7 G" k
- 2 E3 {) w8 [ {) ~9 U z1 @! N
- function on_message($connection, $data)$ @* r! O; e8 b+ N. H4 @7 e7 P
- {
( t( P" `* t. h* ?1 s% o- l8 ? - $connection->send("hello\n");2 C* s$ |+ W: E$ z6 {, @) T( Q
- }# f+ \% |" G5 V) _7 J
: [0 C" N/ N4 }0 g- i* Z- // 运行worker( i t# y' G0 v+ g4 E
- Worker::runAll();
. T; B* j2 _, t, k: R5 ]6 X6 T& e. ] - 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
" u, d6 t$ o7 v( J
2 p: o& b8 }- p( C- use Workerman\Worker;) E9 d9 D/ ^9 V
- require_once './Workerman/Autoloader.php';
! |# c0 j$ t1 |( Q) l8 o1 r \ - ) D) \; c2 }! K/ g5 B7 d
- $worker = new Worker('text://0.0.0.0:2015');, n$ M( Y2 W9 x r8 S
- // 4个进程
. a( _% M# ~5 ?- o# \ - $worker->count = 4;- S& N1 r4 A- w9 Q2 u3 P
- // 每个进程启动后在当前进程新增一个Worker监听
2 v7 Y3 X/ a y2 _' D+ A2 c - $worker->onWorkerStart = function($worker)
7 G5 a* c5 |1 C \. [2 b - {, O$ E9 n$ M5 o! q* A0 r/ m
- $inner_worker = new Worker('http://0.0.0.0:2016');
8 @4 _7 k E0 d4 w( u% R( Y9 h- ? - // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)% |+ u# |# s* ^. G
- $inner_worker->reusePort = true;9 [4 B$ `1 H5 U6 `% x2 v' |. s
- $inner_worker->onMessage = 'on_message';- t# \; ^8 s- Y! v
- // 执行监听。正常监听不会报错
+ T+ K, z3 i: C7 u% {% n - $inner_worker->listen();
- |* q, N0 t3 q t! Y - }; F5 d6 c- r% u
* C3 K3 W3 R/ y- $worker->onMessage = 'on_message';
! ^# S" {, y6 Y7 P2 n! T1 U4 G/ t0 g, z - 1 O" J7 T/ X& W1 I6 h. ~7 _6 G' G
- function on_message($connection, $data)0 s# F3 h1 v' t9 o$ N8 B
- {
4 J/ A4 X( l9 [) `3 C; H: ]/ d3 w - $connection->send("hello\n");
v) C2 \* h: z0 r5 n - }% y9 P, X$ W- j6 g# o
- a& n& b; o) T3 L$ B- // 运行worker9 i5 V) Q- O& ?. W) v. s
- Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php
4 d) I A5 Z6 K5 l6 O2 r" W t - use Workerman\Worker;1 N) v T' G& q! E/ m
- require_once './Workerman/Autoloader.php';
; H K' G+ p) B- T _" r - // 初始化一个worker容器,监听1234端口1 _& ~% I1 I9 K. ~) ^1 j# J0 ^
- $worker = new Worker('websocket://0.0.0.0:1234');
- u. X- ], @0 l7 Y8 ^# w
, c; ~7 q! b# r2 H; V( c- /*$ N9 n, ~. }' U
- * 注意这里进程数必须设置为1,否则会报端口占用错误8 y1 z6 F' h, \3 S* B, h
- * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)) K% \( a, X- H8 m @" M
- */
; K5 ^2 a4 Q% e/ E2 l! O - $worker->count = 1;
, x: J% c& d0 w+ o) F8 [' x3 W. w - // worker进程启动后创建一个text Worker以便打开一个内部通讯端口. z" o: r# T' U
- $worker->onWorkerStart = function($worker)
, j p7 t* p. }: a. S: h - {
: F( A3 A3 W T: u) o& \! F+ a4 g - // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符/ `' U& M/ @ S8 S1 Q6 _4 F- R
- $inner_text_worker = new Worker('text://0.0.0.0:5678');# L7 {. u9 M+ N; u
- $inner_text_worker->onMessage = function($connection, $buffer)% v- Q O/ t" ?) p6 Q8 o4 u
- {1 Z7 Y, j( f7 b1 n9 I, c) q+ _
- // $data数组格式,里面有uid,表示向那个uid的页面推送数据/ K7 T5 m7 G/ X* b1 @8 |0 J$ g
- $data = json_decode($buffer, true);: K6 u# x2 @: S; F& u3 F
- $uid = $data['uid'];4 \! W0 I: u: T8 R! l7 H
- // 通过workerman,向uid的页面推送数据8 K8 f6 i; L4 _1 w
- $ret = sendMessageByUid($uid, $buffer);* S" T$ y+ L6 ?% z5 a
- // 返回推送结果
" k3 l1 X) i6 C5 j, V9 r - $connection->send($ret ? 'ok' : 'fail');
) i8 w+ ^3 c+ \" N) j" g - };( m. k- b# J+ t
- // ## 执行监听 ##2 p0 i/ N2 \) b5 i) s" P
- $inner_text_worker->listen();
* _3 l1 _9 z7 {* z - };$ c( m9 L, c; r; m
- // 新增加一个属性,用来保存uid到connection的映射0 O5 {4 \- T6 x: p% [/ _" p
- $worker->uidConnections = array();
$ K$ W$ K1 @( W2 M6 b# _ - // 当有客户端发来消息时执行的回调函数
) P( y+ O! a9 N F6 ? - $worker->onMessage = function($connection, $data)0 u' C( _% m) g
- {6 g; x* J8 ?- o6 o
- global $worker;4 B) n0 W$ i; i
- // 判断当前客户端是否已经验证,既是否设置了uid
2 D( e0 ^. m6 T" N0 t( ] - if(!isset($connection->uid))
; i8 ` [7 m8 j9 }+ O4 p. P" a - {; v' g4 t) [9 x }* c
- // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)2 W9 O5 i0 b- E4 o _& S
- $connection->uid = $data;
" e X5 H. K5 V# ~4 V - /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,6 K P; L8 G5 j/ M
- * 实现针对特定uid推送数据1 M9 z; W! u, w* g4 h% U
- */
" f A$ [4 ~0 n6 F: p9 }0 N. w - $worker->uidConnections[$connection->uid] = $connection;
! r# `6 c9 C4 p B) e# } - return;: [# O( h- a1 b5 ^
- }
- P0 \5 H4 L7 D - };, i2 Y$ R8 G, j# ^4 o
5 S( |5 c8 }$ ^( Q# j- // 当有客户端连接断开时7 E, S. k2 l( H) H
- $worker->onClose = function($connection)
* V9 a% R u1 s4 E8 s - {
, a- B/ d; a5 L( O9 P/ b6 k) R& P - global $worker;7 W/ R4 M: f8 L2 l( ~$ [9 g
- if(isset($connection->uid))
! Z# R. P/ G$ Y4 y - {! f' }9 V% ^( P/ `" y" }
- // 连接断开时删除映射4 S8 c9 {% v& @; p; Q; M- l+ ~
- unset($worker->uidConnections[$connection->uid]);& P* w8 P2 W/ ^( y% m% i) S
- }9 M; C0 |+ i* t/ }5 j! q
- };' t. w9 \+ K* n: A' r* N V3 t- w- @
- 5 U3 r) l9 W9 Y2 b
- // 向所有验证的用户推送数据/ @7 S" j! N" Z& s1 p, m/ Y
- function broadcast($message)
' e3 ]& H' C& ]) H3 p/ ^2 p( s& S" \ - {8 J0 G' \9 d! Y' ]' V6 ^
- global $worker;1 @6 U, g5 i7 H, d/ _" g
- foreach($worker->uidConnections as $connection)
5 F0 i/ ^% ], p/ E4 m5 r7 ^# [ - {
) v* ^1 F; G" E- ]# \6 y4 J3 Y - $connection->send($message);
! ^, k/ s1 ]8 ]# X: U, }1 U5 N - }9 i6 z6 m9 G& j' r
- }
: H: V( j9 o8 ]& @6 N4 t
0 |3 Y' ~% |1 c8 N' O( M- // 针对uid推送数据% c3 ~3 B( x9 M' A% T
- function sendMessageByUid($uid, $message)
" |( V( O3 ?+ S; v/ A - { ~7 Z1 [% ~. y3 W" P
- global $worker;
3 \% U4 b* j% J - if(isset($worker->uidConnections[$uid])), u S$ ?3 K6 s6 C [# ?. f
- {
- L& g2 n" g k: D: z& u/ I x% j - $connection = $worker->uidConnections[$uid];; @( {- F% i5 J( Q: D
- $connection->send($message);
3 ]* ]2 {! F _( q& @" a6 N - return true;
3 e. |0 M, Q+ }/ h' @/ K - }
; W& p/ i- O) K+ h6 d! q - return false;
, A& {$ w& S; E* F7 [6 p - }( Q' x% }8 x, Z( Y
- 8 H0 R! g2 x9 `' c0 @$ h
- // 运行所有的worker) ^) H8 w, U7 Y" ]: ~& K5 Z! S
- Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');
2 h' `9 E# {" b1 q9 V - ws.onopen = function(){4 }/ u0 n; f2 C; s6 @' [/ }
- var uid = 'uid1';' G. g' W! h1 K7 I9 @ Y2 f* w
- ws.send(uid);
6 q% S5 I2 n4 D- w6 j: ~* R+ g - };
$ J9 \% i b$ A( r6 p9 f# c8 Q4 v) V - ws.onmessage = function(e){
: B! y& h# g" r" M% u/ y5 ?) L - alert(e.data);
3 ~& C7 ^1 D Z7 ` - };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口
4 b& ?0 X4 X n+ S' z - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
2 O& N; t/ P' t& s3 k7 k8 [" V8 _ - // 推送的数据,包含uid字段,表示是给这个uid推送1 ~$ S: N: T/ p9 r& J
- $data = array('uid'=>'uid1', 'percent'=>'88%');( U# V6 Z' F8 g* w& t% O1 z6 b
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
; E% n E& C$ D7 E/ t( P ]# r - fwrite($client, json_encode($data)."\n");
2 ]' _5 @& H. ` - // 读取推送结果8 X5 d3 \/ K# _" o/ c% _) C
- echo fread($client, 8192);
复制代码
; [/ m1 d9 _! |& }* W3 T1 [3 F* d% K/ R, k U& U$ |8 ^9 ~
|