您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 12154|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;4 ^3 ]- C0 f0 y4 ]
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    1 z6 Y0 m; z; B0 l7 ?
  3. # W! _4 x, ~( o; c
  4. $worker = new Worker();( C+ i! |- ~7 t3 _% l
  5. // 4个进程/ D: J. ^& N6 `* y% X- [; b
  6. $worker->count = 4;4 \5 Y2 p+ k$ O" d7 }# J  w. w
  7. // 每个进程启动后在当前进程新增一个Worker监听! L$ ?' S8 e0 ^. K, T
  8. $worker->onWorkerStart = function($worker)( z( H% @1 P+ B) @, p
  9. {
    # B8 Z+ |  d( \! T! |
  10.     /**
    6 m3 D% `7 s& T0 `3 F3 ~9 w  i8 v1 F
  11.      * 4个进程启动的时候都创建2016端口的Worker
    9 E5 p0 j; f$ j4 Y) Q0 M
  12.      * 当执行到worker->listen()时会报Address already in use错误( ~. }6 J2 N0 }# ]
  13.      * 如果worker->count=1则不会报错
    0 G# l8 {& E" z
  14.      */
    2 R7 t7 F2 Y3 X) F! h
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');
    ! f; u) `0 \; n  J1 |" a$ h2 M8 m7 l3 B
  16.     $inner_worker->onMessage = 'on_message';: T' w) ]# p  g, g# ?4 f
  17.     // 执行监听。这里会报Address already in use错误4 l; d1 z  w  m; F- t
  18.     $inner_worker->listen();2 o8 L0 e  Z! W( d% x0 X  c# Y
  19. };( U% n$ V) Q2 r9 e
  20. : s; t" \8 C1 W) @% f4 B
  21. $worker->onMessage = 'on_message';) n! ^5 e3 ^* x6 g, D3 K3 {! @

  22. ( x6 W) D( n& R. i1 w
  23. function on_message($connection, $data)$ ~' E& X) E1 f! z
  24. {# Z1 Q2 q8 o9 {7 d' [, N' V
  25.     $connection->send("hello\n");/ U; e' ^( @/ Y0 ]
  26. }) G: l9 \3 N, S0 D
  27. + ~/ c8 Y# U% a, z3 C$ C8 A
  28. // 运行worker
    " \  u3 z+ Z' @" f' v
  29. Worker::runAll();
    " @  R$ V  q& }$ o$ m3 x+ }
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    3 F# p0 ~$ j0 S8 g  h+ S
  31. 6 g) ?0 Z$ j+ B$ ?; i, Z4 _: s
  32. use Workerman\Worker;9 @2 z. w, r3 a  w6 U! y
  33. require_once './Workerman/Autoloader.php';! s" V0 t1 t8 R; c% ^/ n

  34. 3 }! [' H* v, Q$ i6 \. L
  35. $worker = new Worker('text://0.0.0.0:2015');( b. M$ l- ~. L/ S
  36. // 4个进程1 E: i1 S3 i$ I3 A- q; f* Z
  37. $worker->count = 4;& j! F4 p  ]  U
  38. // 每个进程启动后在当前进程新增一个Worker监听
      \/ r- s9 J# K1 M) \, j: u
  39. $worker->onWorkerStart = function($worker)
    ) k" J- ~' B1 R1 T$ k6 H
  40. {
    # K0 g1 N" ?" k) S6 l& l# [
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');. u- j  S( d3 t
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
      k8 H+ R: ]" \1 D
  43.     $inner_worker->reusePort = true;
    , `1 W5 s0 n. u8 m$ J! t; C
  44.     $inner_worker->onMessage = 'on_message';
      X; `, E% T$ V3 ]  v& L% H4 S( |
  45.     // 执行监听。正常监听不会报错
    ; V% Q3 x8 s4 E, w& u
  46.     $inner_worker->listen();
    8 V9 g' _, ?' w# T" i9 p
  47. };9 a9 H5 K2 B1 A" u+ @! X4 s

  48. * P  d' e: W8 X" U9 t0 A
  49. $worker->onMessage = 'on_message';
    * j* G2 n6 t9 g  r1 s7 x& |
  50. # I7 w# S) n- @+ O
  51. function on_message($connection, $data)/ O) q- X; [1 E# a3 a$ O& M
  52. {+ Q  H) g7 ^! ~& X
  53.     $connection->send("hello\n");
    5 ]% b/ v6 j, ?& Z! Z8 h
  54. }
    0 U, z) J! I5 C5 L3 z& a; b' k8 |
  55. ' T( I* j  p) c0 y2 P" W" v
  56. // 运行worker
    & f8 s6 I$ e! V. f4 h) B
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php5 ~& ?" C( v0 R% Q
  2. use Workerman\Worker;
    , b: A9 X) A  T
  3. require_once './Workerman/Autoloader.php';
    ! W- Q% \" L  l, b, [& z
  4. // 初始化一个worker容器,监听1234端口1 F3 z! }8 Z5 d8 F/ z
  5. $worker = new Worker('websocket://0.0.0.0:1234');) P- A: Y! m9 T- G' n3 N3 V1 E

  6. + b( k5 ?; m& R! q  q
  7. /*% _7 d  e( }, s& {
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误* j. B3 _7 ^9 b& f- m
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)! r: }/ e! }  B. M! f9 o
  10. */  ?$ `+ g5 G0 m" m/ @/ Q% f
  11. $worker->count = 1;
    ' P; a4 m0 P6 f" e7 E. p
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口; X: N/ }- l! Q( q: Q0 H1 C8 q2 n
  13. $worker->onWorkerStart = function($worker)/ `, I6 T& X( _& Q
  14. {3 \. r, t3 j7 ?. _4 ~0 K2 n" b) b
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    " w; R/ N+ O% N  e& }, a9 D4 W! Y
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');3 B  k+ x0 B( [9 e* y( b
  17.     $inner_text_worker->onMessage = function($connection, $buffer)
    ; K! o4 E& t/ V9 @5 P. W# G- T
  18.     {
    " ^/ o; v( e! b. C. U
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    , n* h! H6 K# [/ W
  20.         $data = json_decode($buffer, true);5 t  j9 C. H* K3 a- m
  21.         $uid = $data['uid'];
    . ^* }1 n/ l. c% R* T& Z
  22.         // 通过workerman,向uid的页面推送数据
    1 B: ?0 a0 |7 K" d; T  Y  R$ O
  23.         $ret = sendMessageByUid($uid, $buffer);
    6 `. r" T+ L# v( b' y& D5 T% d' h* H/ E
  24.         // 返回推送结果
    & ?: c; Z  Y  r* I/ S8 a6 W( i
  25.         $connection->send($ret ? 'ok' : 'fail');
    * S0 u9 V. k% F' K( b
  26.     };; S+ x4 Q' ~% a
  27.     // ## 执行监听 ##
    / r5 t" ^- |7 g3 C# M2 g8 E- @
  28.     $inner_text_worker->listen();: d. h; {) k, D5 ^, f8 x
  29. };
    " U- `# r0 S4 P9 c: Z0 S( V
  30. // 新增加一个属性,用来保存uid到connection的映射
    " R+ \2 ]) E8 l; Q
  31. $worker->uidConnections = array();
    4 Q6 s7 b" f7 Z* x; b
  32. // 当有客户端发来消息时执行的回调函数+ a6 c" U: p' a. l! ~! y
  33. $worker->onMessage = function($connection, $data)# f4 V& y- I9 \- ]  O
  34. {
    3 [" c1 |% @& v7 R- w/ T' a# Y: [
  35.     global $worker;
    4 \9 f  K! t" e  \% l
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    % Y7 V* }8 w% p; Q  Y
  37.     if(!isset($connection->uid))4 k- k. L1 q( i6 V
  38.     {
    - h" ?' D# H9 T
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)- X' {6 t8 d: W
  40.        $connection->uid = $data;% [/ l4 K* B5 r! K, M- }
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,. s5 x+ S/ G; v2 T" l
  42.         * 实现针对特定uid推送数据
    ; A& F" A) c9 ~( v) N
  43.         *// q: @" B+ I* H4 u8 N! p! D
  44.        $worker->uidConnections[$connection->uid] = $connection;
    . P; O" w! z3 C
  45.        return;2 D7 D1 v" u) c; s" ~
  46.     }
    ! c6 K) X% i# q' T# I/ s$ x
  47. };0 P: U7 v  ~( d
  48. . K- q; e2 S4 [. J
  49. // 当有客户端连接断开时
    ) A% o  \+ J1 _: I4 s/ k9 z
  50. $worker->onClose = function($connection)
    4 i. h8 ^) f! o5 M2 x1 y
  51. {7 b& Q8 k4 I. Q) {  B3 i0 K3 e3 k
  52.     global $worker;
    ( y, Y: y7 @! H- c* G9 ~
  53.     if(isset($connection->uid))
    5 b  F8 G) T" H2 a$ o
  54.     {
    5 Q+ s4 `, Q$ |2 l: B& o
  55.         // 连接断开时删除映射
      E- Q3 a# E8 ^+ A- m; Y1 F$ ]
  56.         unset($worker->uidConnections[$connection->uid]);) k/ y8 k; g  l: N5 a2 {& t: r5 o6 U
  57.     }
      w) E" h6 T& D  |$ p: ^( ]
  58. };9 A, o% J$ u( n* j4 w8 E& H* j
  59. ( b4 X7 P/ H$ l4 E' K7 e
  60. // 向所有验证的用户推送数据
    / D0 O$ p4 b. k& S' e! r
  61. function broadcast($message)0 k0 J5 {2 x$ B4 I
  62. {
    + S3 W& D. J% o& ?# n0 S3 N, r
  63.    global $worker;
    8 N4 d5 h! U# ]5 R7 O- ~
  64.    foreach($worker->uidConnections as $connection)
    & a# q4 Z) A8 v: l1 P+ T5 C# g
  65.    {: G# P' j+ ], s2 U
  66.         $connection->send($message);, [  A8 ]/ d- v' b/ w) H
  67.    }/ G4 b/ d  c- e' p
  68. }1 d$ {; p; t; {& y/ `5 l) L

  69. / a/ Z' M, q' {4 L" B+ T5 K
  70. // 针对uid推送数据% Y. f- O) H5 t/ @* [. m
  71. function sendMessageByUid($uid, $message)+ R; x  ?; Q* r* F9 E( H) N5 H
  72. {& Y, i6 b; l* H
  73.     global $worker;
      @; Q5 J8 c- {6 i* f* g
  74.     if(isset($worker->uidConnections[$uid]))
    $ z4 g: j7 m  z$ G( g
  75.     {
    # @0 a2 d, O; `* b$ D
  76.         $connection = $worker->uidConnections[$uid];
    ! v, X3 v1 ?3 r. l" S
  77.         $connection->send($message);
    ) X$ b5 s4 j# v" _' C: Y
  78.         return true;
    & U( `3 D- @0 R8 I
  79.     }3 [7 U. P% I# e
  80.     return false;
    5 e% i& B7 F4 Q  M3 b2 S/ f
  81. }
    ) c3 P1 f1 q8 R6 E* u
  82. 1 G8 Y) ^# y4 f* e0 Y: C7 Z
  83. // 运行所有的worker
    , o( K8 P! Q0 f) g$ Y7 {# X' O
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');- h* z" x, U- P# F8 e/ v
  2. ws.onopen = function(){$ G5 J  @) J# r0 I3 x" N' d
  3.     var uid = 'uid1';8 Y# U3 q3 T5 \
  4.     ws.send(uid);
      I- \, g0 d& B* ~
  5. };  I+ X0 \3 F( T6 W5 r( `4 Y
  6. ws.onmessage = function(e){
    ' ^; M. d% U! i. z- ]3 m, d
  7.     alert(e.data);/ @0 |3 T  r, x' A9 `' [# {; ?
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口. l1 N  U. G- D  y
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);( c& Y# d. d6 c% w, }
  3. // 推送的数据,包含uid字段,表示是给这个uid推送( q  S1 q% c6 J1 {6 {
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');4 j4 i0 D; T: `$ P9 I
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符* Q% i* w0 W7 [" T4 A2 J
  6. fwrite($client, json_encode($data)."\n");6 f3 U/ i1 C: Y/ u" z/ I
  7. // 读取推送结果
    # R" ?- z% z* x9 d- p
  8. echo fread($client, 8192);
复制代码
' t, a8 b4 B. s4 q. J
8 |9 K* e7 L; y: _# h, _& g
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-12-22 15:08 , Processed in 0.111257 second(s), 19 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!