您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 12168|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    8 Q0 _: c2 P9 B8 j. s# M2 c
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    : p) r" ?8 y$ f  v* a7 U1 x

  3. " D+ Y# l8 e1 x1 @2 W
  4. $worker = new Worker();
    % T- p, D  {6 N
  5. // 4个进程
    . b2 T  [3 t7 `7 G- Q* R5 _1 x7 }
  6. $worker->count = 4;
    / \; _- p5 A+ C' y2 p, @
  7. // 每个进程启动后在当前进程新增一个Worker监听: D  M# |3 Q: D4 l
  8. $worker->onWorkerStart = function($worker)$ H. v: n4 [2 L4 ~9 a5 h$ }
  9. {
    8 Z7 F2 ]+ q7 e* i
  10.     /**  _0 k2 S; v0 u- |2 y" n. F
  11.      * 4个进程启动的时候都创建2016端口的Worker% o' S: Q( X* r5 p& F* |7 r) [/ k
  12.      * 当执行到worker->listen()时会报Address already in use错误7 g7 \4 r7 a: q- V" K$ x
  13.      * 如果worker->count=1则不会报错: M/ e% u- b0 s5 E, n: p( z# W  _
  14.      */
    8 C9 K( P* v- _( F& Y
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');6 I! Q$ R8 d$ m1 W' b7 d) [
  16.     $inner_worker->onMessage = 'on_message';" i2 t. ]& S5 G! w* Z  w
  17.     // 执行监听。这里会报Address already in use错误. k: q, l6 S  o& R' _( V* x
  18.     $inner_worker->listen();* ~8 n$ x9 p) f& N* E
  19. };( M7 ?9 i) t. I9 Y" t2 \
  20. 7 v& y- x4 F' n, Q
  21. $worker->onMessage = 'on_message';; F# u* c7 G" k
  22. 2 E3 {) w8 [  {) ~9 U  z1 @! N
  23. function on_message($connection, $data)$ @* r! O; e8 b+ N. H4 @7 e7 P
  24. {
    ( t( P" `* t. h* ?1 s% o- l8 ?
  25.     $connection->send("hello\n");2 C* s$ |+ W: E$ z6 {, @) T( Q
  26. }# f+ \% |" G5 V) _7 J

  27. : [0 C" N/ N4 }0 g- i* Z
  28. // 运行worker( i  t# y' G0 v+ g4 E
  29. Worker::runAll();
    . T; B* j2 _, t, k: R5 ]6 X6 T& e. ]
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    " u, d6 t$ o7 v( J

  31. 2 p: o& b8 }- p( C
  32. use Workerman\Worker;) E9 d9 D/ ^9 V
  33. require_once './Workerman/Autoloader.php';
    ! |# c0 j$ t1 |( Q) l8 o1 r  \
  34. ) D) \; c2 }! K/ g5 B7 d
  35. $worker = new Worker('text://0.0.0.0:2015');, n$ M( Y2 W9 x  r8 S
  36. // 4个进程
    . a( _% M# ~5 ?- o# \
  37. $worker->count = 4;- S& N1 r4 A- w9 Q2 u3 P
  38. // 每个进程启动后在当前进程新增一个Worker监听
    2 v7 Y3 X/ a  y2 _' D+ A2 c
  39. $worker->onWorkerStart = function($worker)
    7 G5 a* c5 |1 C  \. [2 b
  40. {, O$ E9 n$ M5 o! q* A0 r/ m
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    8 @4 _7 k  E0 d4 w( u% R( Y9 h- ?
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)% |+ u# |# s* ^. G
  43.     $inner_worker->reusePort = true;9 [4 B$ `1 H5 U6 `% x2 v' |. s
  44.     $inner_worker->onMessage = 'on_message';- t# \; ^8 s- Y! v
  45.     // 执行监听。正常监听不会报错
    + T+ K, z3 i: C7 u% {% n
  46.     $inner_worker->listen();
    - |* q, N0 t3 q  t! Y
  47. };  F5 d6 c- r% u

  48. * C3 K3 W3 R/ y
  49. $worker->onMessage = 'on_message';
    ! ^# S" {, y6 Y7 P2 n! T1 U4 G/ t0 g, z
  50. 1 O" J7 T/ X& W1 I6 h. ~7 _6 G' G
  51. function on_message($connection, $data)0 s# F3 h1 v' t9 o$ N8 B
  52. {
    4 J/ A4 X( l9 [) `3 C; H: ]/ d3 w
  53.     $connection->send("hello\n");
      v) C2 \* h: z0 r5 n
  54. }% y9 P, X$ W- j6 g# o

  55. - a& n& b; o) T3 L$ B
  56. // 运行worker9 i5 V) Q- O& ?. W) v. s
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    4 d) I  A5 Z6 K5 l6 O2 r" W  t
  2. use Workerman\Worker;1 N) v  T' G& q! E/ m
  3. require_once './Workerman/Autoloader.php';
    ; H  K' G+ p) B- T  _" r
  4. // 初始化一个worker容器,监听1234端口1 _& ~% I1 I9 K. ~) ^1 j# J0 ^
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    - u. X- ], @0 l7 Y8 ^# w

  6. , c; ~7 q! b# r2 H; V( c
  7. /*$ N9 n, ~. }' U
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误8 y1 z6 F' h, \3 S* B, h
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)) K% \( a, X- H8 m  @" M
  10. */
    ; K5 ^2 a4 Q% e/ E2 l! O
  11. $worker->count = 1;
    , x: J% c& d0 w+ o) F8 [' x3 W. w
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口. z" o: r# T' U
  13. $worker->onWorkerStart = function($worker)
    , j  p7 t* p. }: a. S: h
  14. {
    : F( A3 A3 W  T: u) o& \! F+ a4 g
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符/ `' U& M/ @  S8 S1 Q6 _4 F- R
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');# L7 {. u9 M+ N; u
  17.     $inner_text_worker->onMessage = function($connection, $buffer)% v- Q  O/ t" ?) p6 Q8 o4 u
  18.     {1 Z7 Y, j( f7 b1 n9 I, c) q+ _
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据/ K7 T5 m7 G/ X* b1 @8 |0 J$ g
  20.         $data = json_decode($buffer, true);: K6 u# x2 @: S; F& u3 F
  21.         $uid = $data['uid'];4 \! W0 I: u: T8 R! l7 H
  22.         // 通过workerman,向uid的页面推送数据8 K8 f6 i; L4 _1 w
  23.         $ret = sendMessageByUid($uid, $buffer);* S" T$ y+ L6 ?% z5 a
  24.         // 返回推送结果
    " k3 l1 X) i6 C5 j, V9 r
  25.         $connection->send($ret ? 'ok' : 'fail');
    ) i8 w+ ^3 c+ \" N) j" g
  26.     };( m. k- b# J+ t
  27.     // ## 执行监听 ##2 p0 i/ N2 \) b5 i) s" P
  28.     $inner_text_worker->listen();
    * _3 l1 _9 z7 {* z
  29. };$ c( m9 L, c; r; m
  30. // 新增加一个属性,用来保存uid到connection的映射0 O5 {4 \- T6 x: p% [/ _" p
  31. $worker->uidConnections = array();
    $ K$ W$ K1 @( W2 M6 b# _
  32. // 当有客户端发来消息时执行的回调函数
    ) P( y+ O! a9 N  F6 ?
  33. $worker->onMessage = function($connection, $data)0 u' C( _% m) g
  34. {6 g; x* J8 ?- o6 o
  35.     global $worker;4 B) n0 W$ i; i
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    2 D( e0 ^. m6 T" N0 t( ]
  37.     if(!isset($connection->uid))
    ; i8 `  [7 m8 j9 }+ O4 p. P" a
  38.     {; v' g4 t) [9 x  }* c
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)2 W9 O5 i0 b- E4 o  _& S
  40.        $connection->uid = $data;
    " e  X5 H. K5 V# ~4 V
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,6 K  P; L8 G5 j/ M
  42.         * 实现针对特定uid推送数据1 M9 z; W! u, w* g4 h% U
  43.         */
    " f  A$ [4 ~0 n6 F: p9 }0 N. w
  44.        $worker->uidConnections[$connection->uid] = $connection;
    ! r# `6 c9 C4 p  B) e# }
  45.        return;: [# O( h- a1 b5 ^
  46.     }
    - P0 \5 H4 L7 D
  47. };, i2 Y$ R8 G, j# ^4 o

  48. 5 S( |5 c8 }$ ^( Q# j
  49. // 当有客户端连接断开时7 E, S. k2 l( H) H
  50. $worker->onClose = function($connection)
    * V9 a% R  u1 s4 E8 s
  51. {
    , a- B/ d; a5 L( O9 P/ b6 k) R& P
  52.     global $worker;7 W/ R4 M: f8 L2 l( ~$ [9 g
  53.     if(isset($connection->uid))
    ! Z# R. P/ G$ Y4 y
  54.     {! f' }9 V% ^( P/ `" y" }
  55.         // 连接断开时删除映射4 S8 c9 {% v& @; p; Q; M- l+ ~
  56.         unset($worker->uidConnections[$connection->uid]);& P* w8 P2 W/ ^( y% m% i) S
  57.     }9 M; C0 |+ i* t/ }5 j! q
  58. };' t. w9 \+ K* n: A' r* N  V3 t- w- @
  59. 5 U3 r) l9 W9 Y2 b
  60. // 向所有验证的用户推送数据/ @7 S" j! N" Z& s1 p, m/ Y
  61. function broadcast($message)
    ' e3 ]& H' C& ]) H3 p/ ^2 p( s& S" \
  62. {8 J0 G' \9 d! Y' ]' V6 ^
  63.    global $worker;1 @6 U, g5 i7 H, d/ _" g
  64.    foreach($worker->uidConnections as $connection)
    5 F0 i/ ^% ], p/ E4 m5 r7 ^# [
  65.    {
    ) v* ^1 F; G" E- ]# \6 y4 J3 Y
  66.         $connection->send($message);
    ! ^, k/ s1 ]8 ]# X: U, }1 U5 N
  67.    }9 i6 z6 m9 G& j' r
  68. }
    : H: V( j9 o8 ]& @6 N4 t

  69. 0 |3 Y' ~% |1 c8 N' O( M
  70. // 针对uid推送数据% c3 ~3 B( x9 M' A% T
  71. function sendMessageByUid($uid, $message)
    " |( V( O3 ?+ S; v/ A
  72. {  ~7 Z1 [% ~. y3 W" P
  73.     global $worker;
    3 \% U4 b* j% J
  74.     if(isset($worker->uidConnections[$uid])), u  S$ ?3 K6 s6 C  [# ?. f
  75.     {
    - L& g2 n" g  k: D: z& u/ I  x% j
  76.         $connection = $worker->uidConnections[$uid];; @( {- F% i5 J( Q: D
  77.         $connection->send($message);
    3 ]* ]2 {! F  _( q& @" a6 N
  78.         return true;
    3 e. |0 M, Q+ }/ h' @/ K
  79.     }
    ; W& p/ i- O) K+ h6 d! q
  80.     return false;
    , A& {$ w& S; E* F7 [6 p
  81. }( Q' x% }8 x, Z( Y
  82. 8 H0 R! g2 x9 `' c0 @$ h
  83. // 运行所有的worker) ^) H8 w, U7 Y" ]: ~& K5 Z! S
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    2 h' `9 E# {" b1 q9 V
  2. ws.onopen = function(){4 }/ u0 n; f2 C; s6 @' [/ }
  3.     var uid = 'uid1';' G. g' W! h1 K7 I9 @  Y2 f* w
  4.     ws.send(uid);
    6 q% S5 I2 n4 D- w6 j: ~* R+ g
  5. };
    $ J9 \% i  b$ A( r6 p9 f# c8 Q4 v) V
  6. ws.onmessage = function(e){
    : B! y& h# g" r" M% u/ y5 ?) L
  7.     alert(e.data);
    3 ~& C7 ^1 D  Z7 `
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    4 b& ?0 X4 X  n+ S' z
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    2 O& N; t/ P' t& s3 k7 k8 [" V8 _
  3. // 推送的数据,包含uid字段,表示是给这个uid推送1 ~$ S: N: T/ p9 r& J
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');( U# V6 Z' F8 g* w& t% O1 z6 b
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    ; E% n  E& C$ D7 E/ t( P  ]# r
  6. fwrite($client, json_encode($data)."\n");
    2 ]' _5 @& H. `
  7. // 读取推送结果8 X5 d3 \/ K# _" o/ c% _) C
  8. echo fread($client, 8192);
复制代码

; [/ m1 d9 _! |& }* W3 T1 [3 F* d% K/ R, k  U& U$ |8 ^9 ~
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-12-22 20:47 , Processed in 0.158010 second(s), 22 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!