- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;) |: ^ i8 Y9 p
- require_once __DIR__ . '/Workerman/Autoloader.php';
8 a2 V) w7 o/ d6 m, @5 \
3 A5 E% V( \# U& {) U- $worker = new Worker();9 D" J2 s( V/ P; d6 A
- // 4个进程9 l$ {$ ~; U2 H2 v0 z
- $worker->count = 4;) J5 {. l. v/ \% _" T8 m
- // 每个进程启动后在当前进程新增一个Worker监听
# [1 ?3 r2 ]# l5 s/ V; W - $worker->onWorkerStart = function($worker)
8 l6 w7 [' ?+ }3 [# _ - {
% {) J7 a) E c - /**
8 H$ i6 S) Q4 _: J - * 4个进程启动的时候都创建2016端口的Worker& X7 P$ [4 _# l9 ~2 ?' L4 L3 O
- * 当执行到worker->listen()时会报Address already in use错误
7 Q( M( \. o% |6 ~' J5 i1 W - * 如果worker->count=1则不会报错+ W' c, N! X1 }1 l1 k6 v8 P
- */
6 ~1 d9 v& ]: l! p5 c& K2 t - $inner_worker = new Worker('http://0.0.0.0:2016');0 p+ h; V3 k3 Q' @
- $inner_worker->onMessage = 'on_message';
, }# Z7 Q# X, b/ Y - // 执行监听。这里会报Address already in use错误( F6 O0 H+ ?+ H* w) O$ `: {, r9 P
- $inner_worker->listen();6 b4 I9 i2 o" _
- };7 q8 M" S$ x/ B. X& R2 n$ v& L+ B* @
- 2 j' u" ~0 e% O: p( s5 Q( g( l
- $worker->onMessage = 'on_message';* j6 L6 v+ A1 y4 @1 J$ |: G
- : Y' B0 p i+ v; Z" x
- function on_message($connection, $data)
* L' V8 j+ |. w2 h( ~ - { o# W( ^+ m! y: @# f' v8 J& V2 x E+ x8 s
- $connection->send("hello\n");6 N% l* v& a; p, Z8 f; C' }) |
- }
% U3 ]9 D& `* _ n0 v/ t9 D# S - ! [7 U* M* |' l
- // 运行worker
. A) Q2 K @! b- M6 r - Worker::runAll();
0 I u, ~) B. |7 m, @8 V - 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:1 f3 W4 u6 D9 y1 _8 d
- 1 X7 I6 W: F" U/ q' ~
- use Workerman\Worker;
. U2 @! l! K' U5 ~- w* H W - require_once './Workerman/Autoloader.php';1 z' S" A/ O7 S2 \$ t' j1 i# h7 m
- 5 y0 N$ J' K3 s6 u. x+ D
- $worker = new Worker('text://0.0.0.0:2015');* a: |9 D& l$ ^, n3 E7 D' ~ u
- // 4个进程 i( S5 w# \$ i- C* A6 t8 g; }( a
- $worker->count = 4;( D$ z& T6 J I) |& n; W
- // 每个进程启动后在当前进程新增一个Worker监听& y- b. o6 t1 K# l9 B
- $worker->onWorkerStart = function($worker)
" r) Q: @9 a( }# n0 b8 i/ _) f - {
. F4 s+ Q) y8 `- t% o - $inner_worker = new Worker('http://0.0.0.0:2016');3 V8 X# _8 ^2 G0 k/ z
- // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)6 e! U! q" B o: h6 c: F
- $inner_worker->reusePort = true;
( w' @0 D# \" f8 F& i+ j" {# i - $inner_worker->onMessage = 'on_message';
* h% f& }9 X) z+ o7 H, G" Y8 Q, p - // 执行监听。正常监听不会报错6 y7 u( N% g' q; P; ]7 E4 h% E
- $inner_worker->listen();
$ A/ l$ d- R7 d4 U9 y - };0 g$ b! X. _& \. N8 ?2 A$ v
- 4 e" T! v! `6 T2 g: b- ^
- $worker->onMessage = 'on_message';
4 r; _8 c) p. h
" J1 J$ R1 M Q- function on_message($connection, $data), P) a! }" w* ?7 R; q% ?) m2 s o
- {
% N% E7 |; A/ k6 h) o - $connection->send("hello\n");* w3 @1 |+ L; m; l* l
- }
) Z1 ~2 z2 u5 K! [% |% P! {% K! p1 ]+ G - 6 N2 R7 k. |5 u- G1 _9 t% w- z
- // 运行worker
3 I4 J7 W) n" k# M - Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php: h2 {$ `' V8 P7 l% Y
- use Workerman\Worker;
4 ~! m1 K) s0 B8 Y, R& ]' j5 P - require_once './Workerman/Autoloader.php';6 U& n# D5 N$ {$ w" T x
- // 初始化一个worker容器,监听1234端口
, W9 L0 N7 t) {$ m8 h8 J0 x - $worker = new Worker('websocket://0.0.0.0:1234');
6 O' \! ~* N5 W' K: d6 c6 D
; `! j( k8 D* H2 J+ \- /** ]! y* e" k2 e4 f' t7 K7 e
- * 注意这里进程数必须设置为1,否则会报端口占用错误
# m3 q) w. O: C# R4 t - * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
7 ~5 _( J5 [4 S: Q - */
( n. }! s/ s+ V1 | - $worker->count = 1;
, E3 [7 W1 a: \% o7 \. W - // worker进程启动后创建一个text Worker以便打开一个内部通讯端口, k& ]) {) f0 s5 D- C0 K. j9 g' \
- $worker->onWorkerStart = function($worker)
- S* u, U; f# }5 p7 a$ G - {
+ Z0 F% E8 }7 n9 r - // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符/ {' q8 w5 X c% ]- p3 ~
- $inner_text_worker = new Worker('text://0.0.0.0:5678');1 |6 i% E: `& H( B& N, ]' ` v' q
- $inner_text_worker->onMessage = function($connection, $buffer)
1 A2 `% ]1 \, x+ [' O$ |7 B* o - {
* e2 l) U2 c& G - // $data数组格式,里面有uid,表示向那个uid的页面推送数据, {; d" O6 C- V3 u$ A- g. z
- $data = json_decode($buffer, true);, G; D5 a6 @ ?6 g
- $uid = $data['uid'];) u6 H; W7 c$ c1 ~( `* i
- // 通过workerman,向uid的页面推送数据% l3 \4 V# g+ ]& ^+ t. t! h/ o0 H! G4 @
- $ret = sendMessageByUid($uid, $buffer);$ U5 T5 U4 m3 o! _
- // 返回推送结果
, \5 {% k$ G" Y* S$ H - $connection->send($ret ? 'ok' : 'fail');' \* o8 v4 x! {* ]6 G
- };
d6 B; J1 r; e" Q% @; G- ]$ Y - // ## 执行监听 ##! S8 o6 M7 Z9 k& {7 H: y8 o! f2 Y# q. X
- $inner_text_worker->listen();5 Y: b& T3 E5 o9 s
- };+ R* Z) T2 d- t* ]! r+ L, ]- y
- // 新增加一个属性,用来保存uid到connection的映射
( K, [5 d! ^/ P( T! Z' l - $worker->uidConnections = array();2 t7 t' Q/ U* g. j
- // 当有客户端发来消息时执行的回调函数. W# d* Q) E9 I& N1 x! Y
- $worker->onMessage = function($connection, $data)
( A; L& F' x S3 H, M6 y0 C - {
" u/ e8 Z# j, i& J - global $worker;5 X0 z4 R- Y% `6 G) w( I! s
- // 判断当前客户端是否已经验证,既是否设置了uid3 w4 k* T) X6 |8 @ ]( |: o) R- {4 M
- if(!isset($connection->uid))
) j# K. _+ P3 V& f% z - {
/ k2 r/ ?0 l6 r0 P - // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
% h; F0 [ R/ P/ N Q9 I9 U - $connection->uid = $data;8 N" q7 m' l9 E
- /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,! g0 m2 c9 ~: O+ n; X: ^
- * 实现针对特定uid推送数据
3 A& V, d" u2 X8 m; o% u - */
( d; G, j4 Y" W6 ~, F7 r0 u5 f% W - $worker->uidConnections[$connection->uid] = $connection;0 h/ R9 S" @2 ]7 [8 ?- `5 N
- return;
: m+ X( d* S/ {9 K - }
/ ~( C# K* [4 Z# ~! l - };; I5 P$ ?4 V$ G3 |
6 \1 n" z& o h+ T" f, s( M/ N9 s$ d- // 当有客户端连接断开时. F" G8 @* `- R! |. t+ S9 X
- $worker->onClose = function($connection)
7 D1 |+ ]9 M6 B5 F2 L) v - {
5 l" N3 w: L* E E6 \7 i& y @: A - global $worker;# g& s; z$ I$ s0 |: e4 |
- if(isset($connection->uid))) u' F" ^1 s" j' a6 f
- {6 U0 U1 x4 L+ {* d a1 s
- // 连接断开时删除映射; j( `1 F" |$ f, Z
- unset($worker->uidConnections[$connection->uid]);
) u8 g) Y: F" r- m# F9 r - }
: ]% _: Y; D& w: | - };
8 q* i# L5 i' H4 \4 D* U
) }9 n; E% ~6 E$ V% Q- // 向所有验证的用户推送数据: y/ k+ }; | n5 N- a
- function broadcast($message)$ y# U% i1 h3 {/ ^2 T0 l! k, i# X: Y( t
- {
) o- D5 A, s% Z, b% ~$ t) | - global $worker;
1 Q# r" B! ?6 x/ _$ H - foreach($worker->uidConnections as $connection)4 n% s% u9 g6 n4 `7 ?
- {; u9 W! K/ E5 V: R: f+ Y+ E
- $connection->send($message);2 Y4 i: [6 K" }1 v
- }
' Q) a8 N/ n# B) o9 j" K: z - }% D6 T9 U4 k6 b8 O: s
- " p3 G% M2 t; M6 c& t0 |8 P4 ]" ^
- // 针对uid推送数据
/ p7 H! Z' e( y% c* c- P" D0 P. J - function sendMessageByUid($uid, $message)
% U A) t, ?) v0 R3 E) Q: ] - {6 T: L4 ~% p3 ?/ M! M
- global $worker;+ }4 D& |: D/ E+ R% \0 Q2 C
- if(isset($worker->uidConnections[$uid]))
1 g9 d' }- z0 j" U x, k - {
/ H6 y; \. k# D& n: Q5 U - $connection = $worker->uidConnections[$uid];+ M0 ~1 f1 r- J# }
- $connection->send($message);7 d/ v/ B+ T1 _- P
- return true;6 y. K! X3 s2 p! P1 f8 F
- }+ F' N( Y2 U- e+ k: w
- return false;
' D+ |' ^ w/ J: L, J3 R9 S0 H9 p - }$ u m" ]$ w5 `) o+ c2 t+ u
$ v6 z8 L h6 U- // 运行所有的worker+ F4 l% ] O, C( J
- Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234'); S" m" X: o# P9 G+ X* f8 y
- ws.onopen = function(){& Z! r, x/ o" t3 O7 d1 N
- var uid = 'uid1';% k5 Q, I% R6 t# i8 L6 P
- ws.send(uid);
3 B2 d: `2 O O% J$ h - };; I2 a6 w0 S# h% m1 u! g9 Y. v/ g7 p
- ws.onmessage = function(e){
* A1 V" T. z' e - alert(e.data);
* I' D1 a. c' W7 G- T) C! C# W - };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口# z& h: ?. C( t* F
- $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
3 {. s. e5 M; h3 L& u - // 推送的数据,包含uid字段,表示是给这个uid推送 }6 S6 P3 P- ^0 S% L; H1 M
- $data = array('uid'=>'uid1', 'percent'=>'88%');
) o/ G7 ^: T/ D2 j v: q$ m0 h - // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
" X, A( m2 ^5 \: Q - fwrite($client, json_encode($data)."\n");
7 J1 D& l4 z, T/ ], `4 K - // 读取推送结果
5 S4 U; d: x, O( J" j3 w/ P - echo fread($client, 8192);
复制代码
% X0 D5 l& S q* a$ N' M( B: D. L, l
|