您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 12173|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |正序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;3 B6 I5 N" U9 m$ D6 x
  2. require_once __DIR__ . '/Workerman/Autoloader.php';( X& n+ R- [8 `; p1 V4 B6 f
  3. " T9 U6 |6 i! M1 h/ Q
  4. $worker = new Worker();, K3 C$ g; A' E& [( Z* x( n5 ?
  5. // 4个进程
    * Z1 Y8 z# H! P# E4 A4 z  T& h
  6. $worker->count = 4;
    ( z/ V& h, _( G$ u! E2 ]
  7. // 每个进程启动后在当前进程新增一个Worker监听
    : m/ w1 v5 I2 ]
  8. $worker->onWorkerStart = function($worker)
    3 H& f/ P5 w+ a* h
  9. {  x/ m7 I. A3 Y* B6 o
  10.     /**# ~4 L" ^( o* a
  11.      * 4个进程启动的时候都创建2016端口的Worker
    8 x2 C, V, h: M( N5 P! |! D$ b( d/ Y! Z
  12.      * 当执行到worker->listen()时会报Address already in use错误
      |( S& @+ n' z+ H
  13.      * 如果worker->count=1则不会报错
    ) T, @6 V# Y; b
  14.      */7 a$ m0 c6 R$ s; k
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');! a1 r) ]) {  t$ x8 U& q# d
  16.     $inner_worker->onMessage = 'on_message';
    ; M& u) g5 f* F
  17.     // 执行监听。这里会报Address already in use错误- I1 ?. c" H7 D) `2 w: x0 u* X
  18.     $inner_worker->listen();
    + p2 x0 D, x7 f  f9 j+ M  `. R% B
  19. };( s6 E9 [, O' U. y7 a2 q

  20. 5 R0 ]' x. ?' C# W5 G# @
  21. $worker->onMessage = 'on_message';
    2 ]# T. v) g) a: J6 b$ _+ r7 ?

  22. ( ^0 J7 Y3 b0 Q4 ~
  23. function on_message($connection, $data)
    : l* l! I  Q# p$ b
  24. {0 }/ v+ h9 B" O- J% |  f
  25.     $connection->send("hello\n");# f  W  b( {2 {1 E. O" p4 ?" {
  26. }
    9 ]0 W6 h3 T5 I* U# |! _0 p

  27. / l# v6 v; l5 z- X* M$ K2 Q& X
  28. // 运行worker6 W  ?) N1 ?4 V& Z& x" p
  29. Worker::runAll();
    4 H' C1 Y' E3 u7 T: P" V# J! R
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    6 u$ b2 `! w9 ~! p( h
  31. ) b) w7 f* M) X  C2 }
  32. use Workerman\Worker;
    / p- Y% Z  d2 k! w
  33. require_once './Workerman/Autoloader.php';% J1 `  r5 y! c: {) u8 I

  34. 7 L0 Z# a  p9 c! I( t7 z" D7 Z
  35. $worker = new Worker('text://0.0.0.0:2015');/ f  g& R% v, F+ Y) ~6 `
  36. // 4个进程
    1 D, m; }; X2 k8 U# X  j
  37. $worker->count = 4;7 H: I; S0 p4 U6 G. x+ y' y: p
  38. // 每个进程启动后在当前进程新增一个Worker监听
    2 G' v5 w! y8 A
  39. $worker->onWorkerStart = function($worker)
    : H2 N' c  m. C" J/ ~" _
  40. {
    3 p4 M$ b( U( D! j( F1 b. E8 g- n
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');' D, y4 k: v+ r
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    2 R% I( K7 S' `! {$ H
  43.     $inner_worker->reusePort = true;. @" A( A) X3 k# h
  44.     $inner_worker->onMessage = 'on_message';
    ; f1 U7 K- T( I0 H2 T  X2 x8 t
  45.     // 执行监听。正常监听不会报错7 ?) y) `; s' @5 s  |) Z) A
  46.     $inner_worker->listen();
    : F9 T. I& c2 ]& l3 @+ u0 E
  47. };
    3 J1 S( B! F( _( w! p

  48. . U# ?: {5 f7 \. `' y4 f/ g: B
  49. $worker->onMessage = 'on_message';6 V( `' c/ M9 j% r: y# |, s
  50. 4 H) H& T2 I7 s! l
  51. function on_message($connection, $data): p6 I' t+ B/ K& c" v" u
  52. {1 {6 ]7 x5 T: Q) s, \
  53.     $connection->send("hello\n");5 N, [3 h$ H' Z
  54. }
    " a* f6 M" d/ A! s

  55. 6 V2 U( t# g# d7 \3 X
  56. // 运行worker; G; `3 |6 v$ J$ U& g! S$ k) R" V& H
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php3 K+ x$ ]# E  ~( }2 M7 ~6 Q; [9 ~$ h
  2. use Workerman\Worker;( S- d- A1 t( W- _; e( ]4 V
  3. require_once './Workerman/Autoloader.php';
    : w7 E2 r* d" i. O  s7 r
  4. // 初始化一个worker容器,监听1234端口* H9 n, B9 g2 _0 N  q0 K
  5. $worker = new Worker('websocket://0.0.0.0:1234');) a. \2 y9 L9 ^

  6. ' l  d) d0 f! k1 e( w
  7. /*
    & W: @" y( ^4 B; j5 \: b: {! D% V# A
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    / z- ?! ~& M3 p& |. @  M/ l2 }
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    " _- B9 B6 {+ @- ]4 |8 t
  10. */! n) M# L' Q" ~
  11. $worker->count = 1;
    # Z# ]$ r* s* V. Y2 m/ W) z
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口& q1 @# y  k; E  x) x5 P3 [
  13. $worker->onWorkerStart = function($worker)+ a" B7 L1 i: T) G$ ^- p+ ]
  14. {
    % q2 J  |% T( {" P$ q" [, G8 N
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符/ k) j8 X) k# k; S, B2 F
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');0 K/ Z3 v$ G; K# f" `
  17.     $inner_text_worker->onMessage = function($connection, $buffer). ?8 }& I  M& ^
  18.     {
    - K9 u4 s8 c" Z7 N4 j; t# v
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    + s7 |: R  e3 F2 D/ i8 y" z3 s
  20.         $data = json_decode($buffer, true);
    + X- M/ ]2 L5 v- P. C
  21.         $uid = $data['uid'];
    6 q* s! b9 h. O( T6 L. K$ ~+ D
  22.         // 通过workerman,向uid的页面推送数据% Z: o( A" S5 F1 o
  23.         $ret = sendMessageByUid($uid, $buffer);# b, I- E4 g0 Y# s( [
  24.         // 返回推送结果
    - ~, Z- Z' y: Q; I/ ~; y. t9 S' D7 P
  25.         $connection->send($ret ? 'ok' : 'fail');
    & Y  x- |+ p/ }. @. h2 J1 J. N8 v
  26.     };
    ' G: g3 ]; v" K
  27.     // ## 执行监听 ##
    6 H# a0 N7 L0 V1 f
  28.     $inner_text_worker->listen();
    7 O6 _" ^  l0 Y5 m/ {# Y2 I
  29. };) X; J' q# p9 K$ g. s
  30. // 新增加一个属性,用来保存uid到connection的映射
    # ~( s  t6 L6 A  q8 b) q1 s
  31. $worker->uidConnections = array();. ~0 T0 @) e) @1 F
  32. // 当有客户端发来消息时执行的回调函数
    + u& H# H* x  U; W
  33. $worker->onMessage = function($connection, $data)7 m$ i7 u9 z" Y7 @% r
  34. {/ d; i# l  c5 V/ A) k
  35.     global $worker;
    ; m: w# E$ I$ c: q8 d2 h
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    9 N2 b9 i$ {! Y& j6 {1 W
  37.     if(!isset($connection->uid))
    / U, ]6 p; x# V/ A% E" p. [. h- `
  38.     {
    0 I  C2 m0 Q, F5 s$ U- H
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
    8 q, g7 _) \7 n# `7 k! W% U! a
  40.        $connection->uid = $data;0 M0 ]+ L' ]( [* j; k) C% @; B
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,% A7 _; T6 s0 a  z% _$ _( N
  42.         * 实现针对特定uid推送数据* a$ @: X9 `) w: _% Y
  43.         */- j  ^: @0 I2 A  \+ N0 H, N
  44.        $worker->uidConnections[$connection->uid] = $connection;
    3 p5 y( `! K  }9 S
  45.        return;
      x( K8 n* P- X
  46.     }& m- s; N/ R0 w0 D* G
  47. };
    ( J: ]# n; a/ L$ u- H& g" \& ]
  48. 6 ]. H% x- P, [: `
  49. // 当有客户端连接断开时4 ]$ u7 O/ x' Z; X8 w
  50. $worker->onClose = function($connection)/ e. X( [3 S' r/ H% d* F; @
  51. {
    9 H# ?' \" ?" P
  52.     global $worker;
    / K/ a, Z% Y( f( K+ C2 O' ^6 O' L) _& ^% D
  53.     if(isset($connection->uid))& r; ~3 n- [5 i  d# T
  54.     {
    9 y; }/ T8 I; M: `+ I
  55.         // 连接断开时删除映射
    . N# U2 Q6 w5 T1 N: j/ ]
  56.         unset($worker->uidConnections[$connection->uid]);
    0 n9 U/ I/ M3 B* o7 B6 O$ c3 @
  57.     }
    9 G, r9 g: p+ [2 F& q  R
  58. };
    + I0 }2 |" j! }% h: F7 H  W4 ?
  59. - u8 |" ?0 I5 _/ \
  60. // 向所有验证的用户推送数据! _: w# c; O5 }+ Y
  61. function broadcast($message)
    ) o% p, t- N. p9 i6 J7 m
  62. {
    : y# K9 s0 K. v  U
  63.    global $worker;
    # c' Z2 ^* R) P! r  _% X0 h
  64.    foreach($worker->uidConnections as $connection)
    , ?" X+ `9 M9 I6 A2 N4 f6 W  ?' F
  65.    {9 O5 T1 h5 e8 Y6 j" {/ z; M
  66.         $connection->send($message);
    8 V$ N1 T2 z$ ~3 a! V
  67.    }9 p* I* d2 v. n4 R- V$ v( g
  68. }1 f- P7 Z7 ~6 N$ u2 B( ^
  69. " v, S  d) x0 {: `2 _  i
  70. // 针对uid推送数据3 E6 i# G# ]) o( [6 ]+ n: K# y% C
  71. function sendMessageByUid($uid, $message)
    / U9 C7 P" L2 K% K3 g
  72. {
    # }1 L: g; M' K" s& B) X9 x; O# T
  73.     global $worker;
    " ^  g4 J3 }# Q( v" f% |$ @
  74.     if(isset($worker->uidConnections[$uid]))1 T# E/ o  E- r/ b( j& i! T
  75.     {( o  i& `' |1 U. X
  76.         $connection = $worker->uidConnections[$uid];9 t+ g: ^2 y* e) i0 j# v, {, o- i5 R
  77.         $connection->send($message);1 \" Q. ?0 p) b! t, a; O, g
  78.         return true;
    ) x5 M8 A3 }8 A3 D% X/ [
  79.     }
      e% e* F/ p/ v$ F+ r! D0 C7 R
  80.     return false;
    . R& X- a+ U  _$ ~+ [% I
  81. }* C# g$ ~/ [- Q( L9 _
  82. 3 d; w& r0 c: V3 O) ~- q
  83. // 运行所有的worker
    % I2 v9 F( z- w6 a* ~; ^2 P1 a1 @% f
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    ! P; U1 J6 C( C6 c
  2. ws.onopen = function(){" Y' n- |& J1 W; G3 ?
  3.     var uid = 'uid1';
    5 i+ i& K1 Z) |4 O4 P* J2 ~+ W
  4.     ws.send(uid);5 \8 a" G! c8 d$ B' \6 |- A
  5. };* `8 X; P; f! q& F% }+ x- q  L
  6. ws.onmessage = function(e){
    5 t# Q, b$ |  c* n$ X6 ^: }
  7.     alert(e.data);
    5 ]9 c  U1 w1 _: t# ]/ l# `. T
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口9 v6 c; K( D$ G
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);/ N2 i5 ?" b3 T0 G6 R3 n
  3. // 推送的数据,包含uid字段,表示是给这个uid推送: o6 B, s; w. @0 @9 P! U6 l
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');0 y8 B! w! h4 Q" h3 y+ q
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符% i5 O+ @: Y6 d9 f. k
  6. fwrite($client, json_encode($data)."\n");
    4 @; x* i7 G$ a6 Y, p
  7. // 读取推送结果% |/ n- z" C2 j- b
  8. echo fread($client, 8192);
复制代码
& S* Q, d/ j0 i/ l$ v5 @
3 X' i9 F0 e; [- O& c1 f, B5 L0 I
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-12-22 21:42 , Processed in 0.114301 second(s), 20 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!