- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;3 B6 I5 N" U9 m$ D6 x
- require_once __DIR__ . '/Workerman/Autoloader.php';( X& n+ R- [8 `; p1 V4 B6 f
- " T9 U6 |6 i! M1 h/ Q
- $worker = new Worker();, K3 C$ g; A' E& [( Z* x( n5 ?
- // 4个进程
* Z1 Y8 z# H! P# E4 A4 z T& h - $worker->count = 4;
( z/ V& h, _( G$ u! E2 ] - // 每个进程启动后在当前进程新增一个Worker监听
: m/ w1 v5 I2 ] - $worker->onWorkerStart = function($worker)
3 H& f/ P5 w+ a* h - { x/ m7 I. A3 Y* B6 o
- /**# ~4 L" ^( o* a
- * 4个进程启动的时候都创建2016端口的Worker
8 x2 C, V, h: M( N5 P! |! D$ b( d/ Y! Z - * 当执行到worker->listen()时会报Address already in use错误
|( S& @+ n' z+ H - * 如果worker->count=1则不会报错
) T, @6 V# Y; b - */7 a$ m0 c6 R$ s; k
- $inner_worker = new Worker('http://0.0.0.0:2016');! a1 r) ]) { t$ x8 U& q# d
- $inner_worker->onMessage = 'on_message';
; M& u) g5 f* F - // 执行监听。这里会报Address already in use错误- I1 ?. c" H7 D) `2 w: x0 u* X
- $inner_worker->listen();
+ p2 x0 D, x7 f f9 j+ M `. R% B - };( s6 E9 [, O' U. y7 a2 q
5 R0 ]' x. ?' C# W5 G# @- $worker->onMessage = 'on_message';
2 ]# T. v) g) a: J6 b$ _+ r7 ?
( ^0 J7 Y3 b0 Q4 ~- function on_message($connection, $data)
: l* l! I Q# p$ b - {0 }/ v+ h9 B" O- J% | f
- $connection->send("hello\n");# f W b( {2 {1 E. O" p4 ?" {
- }
9 ]0 W6 h3 T5 I* U# |! _0 p
/ l# v6 v; l5 z- X* M$ K2 Q& X- // 运行worker6 W ?) N1 ?4 V& Z& x" p
- Worker::runAll();
4 H' C1 Y' E3 u7 T: P" V# J! R - 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
6 u$ b2 `! w9 ~! p( h - ) b) w7 f* M) X C2 }
- use Workerman\Worker;
/ p- Y% Z d2 k! w - require_once './Workerman/Autoloader.php';% J1 ` r5 y! c: {) u8 I
7 L0 Z# a p9 c! I( t7 z" D7 Z- $worker = new Worker('text://0.0.0.0:2015');/ f g& R% v, F+ Y) ~6 `
- // 4个进程
1 D, m; }; X2 k8 U# X j - $worker->count = 4;7 H: I; S0 p4 U6 G. x+ y' y: p
- // 每个进程启动后在当前进程新增一个Worker监听
2 G' v5 w! y8 A - $worker->onWorkerStart = function($worker)
: H2 N' c m. C" J/ ~" _ - {
3 p4 M$ b( U( D! j( F1 b. E8 g- n - $inner_worker = new Worker('http://0.0.0.0:2016');' D, y4 k: v+ r
- // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
2 R% I( K7 S' `! {$ H - $inner_worker->reusePort = true;. @" A( A) X3 k# h
- $inner_worker->onMessage = 'on_message';
; f1 U7 K- T( I0 H2 T X2 x8 t - // 执行监听。正常监听不会报错7 ?) y) `; s' @5 s |) Z) A
- $inner_worker->listen();
: F9 T. I& c2 ]& l3 @+ u0 E - };
3 J1 S( B! F( _( w! p
. U# ?: {5 f7 \. `' y4 f/ g: B- $worker->onMessage = 'on_message';6 V( `' c/ M9 j% r: y# |, s
- 4 H) H& T2 I7 s! l
- function on_message($connection, $data): p6 I' t+ B/ K& c" v" u
- {1 {6 ]7 x5 T: Q) s, \
- $connection->send("hello\n");5 N, [3 h$ H' Z
- }
" a* f6 M" d/ A! s
6 V2 U( t# g# d7 \3 X- // 运行worker; G; `3 |6 v$ J$ U& g! S$ k) R" V& H
- Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php3 K+ x$ ]# E ~( }2 M7 ~6 Q; [9 ~$ h
- use Workerman\Worker;( S- d- A1 t( W- _; e( ]4 V
- require_once './Workerman/Autoloader.php';
: w7 E2 r* d" i. O s7 r - // 初始化一个worker容器,监听1234端口* H9 n, B9 g2 _0 N q0 K
- $worker = new Worker('websocket://0.0.0.0:1234');) a. \2 y9 L9 ^
' l d) d0 f! k1 e( w- /*
& W: @" y( ^4 B; j5 \: b: {! D% V# A - * 注意这里进程数必须设置为1,否则会报端口占用错误
/ z- ?! ~& M3 p& |. @ M/ l2 } - * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
" _- B9 B6 {+ @- ]4 |8 t - */! n) M# L' Q" ~
- $worker->count = 1;
# Z# ]$ r* s* V. Y2 m/ W) z - // worker进程启动后创建一个text Worker以便打开一个内部通讯端口& q1 @# y k; E x) x5 P3 [
- $worker->onWorkerStart = function($worker)+ a" B7 L1 i: T) G$ ^- p+ ]
- {
% q2 J |% T( {" P$ q" [, G8 N - // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符/ k) j8 X) k# k; S, B2 F
- $inner_text_worker = new Worker('text://0.0.0.0:5678');0 K/ Z3 v$ G; K# f" `
- $inner_text_worker->onMessage = function($connection, $buffer). ?8 }& I M& ^
- {
- K9 u4 s8 c" Z7 N4 j; t# v - // $data数组格式,里面有uid,表示向那个uid的页面推送数据
+ s7 |: R e3 F2 D/ i8 y" z3 s - $data = json_decode($buffer, true);
+ X- M/ ]2 L5 v- P. C - $uid = $data['uid'];
6 q* s! b9 h. O( T6 L. K$ ~+ D - // 通过workerman,向uid的页面推送数据% Z: o( A" S5 F1 o
- $ret = sendMessageByUid($uid, $buffer);# b, I- E4 g0 Y# s( [
- // 返回推送结果
- ~, Z- Z' y: Q; I/ ~; y. t9 S' D7 P - $connection->send($ret ? 'ok' : 'fail');
& Y x- |+ p/ }. @. h2 J1 J. N8 v - };
' G: g3 ]; v" K - // ## 执行监听 ##
6 H# a0 N7 L0 V1 f - $inner_text_worker->listen();
7 O6 _" ^ l0 Y5 m/ {# Y2 I - };) X; J' q# p9 K$ g. s
- // 新增加一个属性,用来保存uid到connection的映射
# ~( s t6 L6 A q8 b) q1 s - $worker->uidConnections = array();. ~0 T0 @) e) @1 F
- // 当有客户端发来消息时执行的回调函数
+ u& H# H* x U; W - $worker->onMessage = function($connection, $data)7 m$ i7 u9 z" Y7 @% r
- {/ d; i# l c5 V/ A) k
- global $worker;
; m: w# E$ I$ c: q8 d2 h - // 判断当前客户端是否已经验证,既是否设置了uid
9 N2 b9 i$ {! Y& j6 {1 W - if(!isset($connection->uid))
/ U, ]6 p; x# V/ A% E" p. [. h- ` - {
0 I C2 m0 Q, F5 s$ U- H - // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
8 q, g7 _) \7 n# `7 k! W% U! a - $connection->uid = $data;0 M0 ]+ L' ]( [* j; k) C% @; B
- /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,% A7 _; T6 s0 a z% _$ _( N
- * 实现针对特定uid推送数据* a$ @: X9 `) w: _% Y
- */- j ^: @0 I2 A \+ N0 H, N
- $worker->uidConnections[$connection->uid] = $connection;
3 p5 y( `! K }9 S - return;
x( K8 n* P- X - }& m- s; N/ R0 w0 D* G
- };
( J: ]# n; a/ L$ u- H& g" \& ] - 6 ]. H% x- P, [: `
- // 当有客户端连接断开时4 ]$ u7 O/ x' Z; X8 w
- $worker->onClose = function($connection)/ e. X( [3 S' r/ H% d* F; @
- {
9 H# ?' \" ?" P - global $worker;
/ K/ a, Z% Y( f( K+ C2 O' ^6 O' L) _& ^% D - if(isset($connection->uid))& r; ~3 n- [5 i d# T
- {
9 y; }/ T8 I; M: `+ I - // 连接断开时删除映射
. N# U2 Q6 w5 T1 N: j/ ] - unset($worker->uidConnections[$connection->uid]);
0 n9 U/ I/ M3 B* o7 B6 O$ c3 @ - }
9 G, r9 g: p+ [2 F& q R - };
+ I0 }2 |" j! }% h: F7 H W4 ? - - u8 |" ?0 I5 _/ \
- // 向所有验证的用户推送数据! _: w# c; O5 }+ Y
- function broadcast($message)
) o% p, t- N. p9 i6 J7 m - {
: y# K9 s0 K. v U - global $worker;
# c' Z2 ^* R) P! r _% X0 h - foreach($worker->uidConnections as $connection)
, ?" X+ `9 M9 I6 A2 N4 f6 W ?' F - {9 O5 T1 h5 e8 Y6 j" {/ z; M
- $connection->send($message);
8 V$ N1 T2 z$ ~3 a! V - }9 p* I* d2 v. n4 R- V$ v( g
- }1 f- P7 Z7 ~6 N$ u2 B( ^
- " v, S d) x0 {: `2 _ i
- // 针对uid推送数据3 E6 i# G# ]) o( [6 ]+ n: K# y% C
- function sendMessageByUid($uid, $message)
/ U9 C7 P" L2 K% K3 g - {
# }1 L: g; M' K" s& B) X9 x; O# T - global $worker;
" ^ g4 J3 }# Q( v" f% |$ @ - if(isset($worker->uidConnections[$uid]))1 T# E/ o E- r/ b( j& i! T
- {( o i& `' |1 U. X
- $connection = $worker->uidConnections[$uid];9 t+ g: ^2 y* e) i0 j# v, {, o- i5 R
- $connection->send($message);1 \" Q. ?0 p) b! t, a; O, g
- return true;
) x5 M8 A3 }8 A3 D% X/ [ - }
e% e* F/ p/ v$ F+ r! D0 C7 R - return false;
. R& X- a+ U _$ ~+ [% I - }* C# g$ ~/ [- Q( L9 _
- 3 d; w& r0 c: V3 O) ~- q
- // 运行所有的worker
% I2 v9 F( z- w6 a* ~; ^2 P1 a1 @% f - Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');
! P; U1 J6 C( C6 c - ws.onopen = function(){" Y' n- |& J1 W; G3 ?
- var uid = 'uid1';
5 i+ i& K1 Z) |4 O4 P* J2 ~+ W - ws.send(uid);5 \8 a" G! c8 d$ B' \6 |- A
- };* `8 X; P; f! q& F% }+ x- q L
- ws.onmessage = function(e){
5 t# Q, b$ | c* n$ X6 ^: } - alert(e.data);
5 ]9 c U1 w1 _: t# ]/ l# `. T - };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口9 v6 c; K( D$ G
- $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);/ N2 i5 ?" b3 T0 G6 R3 n
- // 推送的数据,包含uid字段,表示是给这个uid推送: o6 B, s; w. @0 @9 P! U6 l
- $data = array('uid'=>'uid1', 'percent'=>'88%');0 y8 B! w! h4 Q" h3 y+ q
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符% i5 O+ @: Y6 d9 f. k
- fwrite($client, json_encode($data)."\n");
4 @; x* i7 G$ a6 Y, p - // 读取推送结果% |/ n- z" C2 j- b
- echo fread($client, 8192);
复制代码 & S* Q, d/ j0 i/ l$ v5 @
3 X' i9 F0 e; [- O& c1 f, B5 L0 I
|