krot

rabbitmq_bottel_recv.php

Sep 18th, 2020
487
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
PHP 9.25 KB | None | 0 0
  1. #!/bin/bash
  2.  
  3. CMD="$(basename $BASH_SOURCE .sh)"
  4.  
  5. PROC=rabbitmq_bottel_recv$1
  6. if pidof -s $PROC >/dev/null; then
  7.     echo "$CMD $PROC already run"
  8. else
  9.     echo "$CMD $PROC not run"
  10.     php rabbitmq_bottel_recv.php $PROC
  11. fi
  12.  
  13. <?php
  14. ignore_user_abort(true);
  15. set_time_limit(0);
  16. ini_set('memory_limit', -1);
  17.  
  18. if(empty($argv[1]))exit(1);
  19. define('PROC_NAME', $argv[1]);
  20. define('LOCK_FILE', "/var/run/" . basename(PROC_NAME, ".php") . ".lock");
  21. if (!tryLock())die("Already running.\n");
  22. # remove the lock on exit (Control+C doesn't count as 'exit'?)
  23. register_shutdown_function('unlink', LOCK_FILE);
  24.  
  25.  
  26.  
  27. $pid = getmypid();
  28. if(!cli_set_process_title(PROC_NAME)){
  29.     echo "ERROR set_process_title::$pid...\n";
  30.     exit(1);
  31. }
  32.  
  33. require_once __DIR__ . '/vendor/autoload.php';
  34.  
  35. use PhpAmqpLib\Connection\AMQPStreamConnection;
  36. use PhpAmqpLib\Message\AMQPMessage;
  37.  
  38.  
  39.  
  40. use Psr\Http\Message\ResponseInterface;
  41. use Psr\Http\Message\ServerRequestInterface;
  42. use \Aws\S3\S3Client;
  43. use \Aws\Exception\AwsException;
  44. use \Aws\S3\Exception\S3Exception;
  45.  
  46.  
  47.  
  48. //Подключение Madeline с гитхаба
  49. if (!file_exists(__DIR__ . '/madeline.php')) {
  50.     copy('https://phar.madelineproto.xyz/madeline.php', __DIR__ . '/madeline.php');
  51. }
  52. include __DIR__ . '/madeline.php';
  53.  
  54. $settings['app_info'] = [
  55.     'api_id' => '1111*',
  56.     'api_hash' => '0v**',
  57. ];
  58.  
  59. $MadelineProto = new \danog\MadelineProto\API('session.madeline', $settings);
  60. $MadelineProto->start();
  61. $me = $MadelineProto->get_self();
  62.  
  63. #######
  64. $pdo = new PDO("mysql:host=localhost;port=3307;dbname=bottelegram;charset=utf8",
  65.         'bottelegram1','%');
  66. $pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
  67. $pdo->setAttribute(PDO::ATTR_DEFAULT_FETCH_MODE, PDO::FETCH_ASSOC);
  68.  
  69. $token='bot-telegram:3c9e5145';
  70. $client_jid='bot-telegram@localhost';
  71.  
  72. $bot=new CmmntBot\Bot($token);
  73.  
  74. ###########
  75. $queue_name='%:bottelegram';
  76.  
  77. $connection = new AMQPStreamConnection('%', 5672, 'u', 'p');
  78.  
  79. $channel = $connection->channel();
  80.  
  81. $channel->queue_declare($queue_name, false, true, false, false);
  82.  
  83. echo " [*] Waiting for messages. To exit press CTRL+C\n";
  84.  
  85. $channel->basic_qos(null,1,null);
  86.                
  87.  
  88. $callback = function (AMQPMessage $msg) use($bot,$client_jid,$pdo,$MadelineProto){
  89.   echo ' [x] Received ';
  90.    $json=json_decode( $msg->body ,true);
  91.    
  92.   // $pdo
  93.     $sender=$json['sender'];
  94.     $to=$json['jid_group'];
  95.     //$sign=md5("BOT{$client_jid}{$to}");;
  96.     $id=(int)$json['id'];
  97.      
  98.     $telegram_channel=$json['telegram_channel'];
  99.    
  100.     echo $id.':'.$telegram_channel.PHP_EOL;
  101.        
  102.     $telegram_channel=substr($telegram_channel,1);
  103.    
  104.  
  105.    
  106.     #get offset id
  107.         $sql='SELECT offset_id from `log` where id='.$id.' limit 1';
  108.         $q = $pdo->prepare($sql);
  109.         $q->execute();
  110.         $res = $q->fetchAll(PDO::FETCH_ASSOC);
  111.         $min_id=$res[0]['offset_id']??0;
  112.         $offset_id_new=$min_id;
  113.    
  114.      
  115. echo PHP_EOL;  
  116.  
  117. $q = new SplStack();
  118. $channel = $telegram_channel;
  119. $offset_id = 0;
  120. $limit = 90;
  121. echo '->'.$min_id.PHP_EOL;
  122.  
  123. //max id определить
  124. $messages_Messages = $MadelineProto->messages->getHistory(['peer' => $channel, 'offset_id' => 0, 'offset_date' => 0, 'add_offset' => 0, 'limit' => 1, 'max_id' => 0, 'min_id' => 0, 'hash' => 0 ]);
  125. foreach ($messages_Messages['messages'] as $message) {
  126.      //echo "id: " . $message['id'].':'.@$message['message'];
  127.      $end_offset=$message['id'];
  128.      break;
  129. }
  130. echo  'end_offset:'.$end_offset .PHP_EOL;
  131.  
  132.  
  133. if($offset_id_new!=$end_offset){
  134.     $offset_id_new+=$limit;
  135.     //echo $offset_id_new.PHP_EOL;
  136.     //echo $end_offset.PHP_EOL;
  137.     /*if($end_offset<$offset_id_new){
  138.         $offset_id_new=$end_offset;
  139.         $limit=$end_offset-$min_id;
  140.     }*/
  141.     echo 'for:'.$offset_id_new.':'.$end_offset.PHP_EOL;
  142.     $for=array();
  143.    
  144.     $offset_id1=$end_offset;
  145.    
  146.     for($i=$offset_id_new;$i<=$end_offset;$i+=$limit){
  147.         $offset_id1=$i+1;
  148.         $for[]=['offset_id'=>$i+1,'limit'=>$limit+1, 'max_id' => $i+1, 'min_id' => $i-$limit];
  149.        
  150.     }
  151.    
  152.     if($offset_id1<$end_offset){
  153.         $for[]=['offset_id'=>$end_offset+1,'limit'=>$end_offset-$offset_id1+1, 'max_id' => $end_offset+1, 'min_id' => $offset_id1-1];
  154.  
  155.     }
  156.     if(empty($for)){
  157.         $for[]=['offset_id'=>$end_offset+1,'limit'=>$end_offset+1, 'max_id' => $end_offset+1, 'min_id' =>0];
  158.     }
  159.     /*$t1=$for[116];
  160.      $for=[];
  161.      $for[]=$t1;*/
  162.  
  163.      print_r($for);
  164.     // exit;
  165.      
  166.     foreach($for as $v){
  167.         $st=new SplStack();
  168.         $messages_Messages = $MadelineProto->messages->getHistory(['peer' => $channel, 'offset_id' => $v['offset_id'], 'offset_date' => 0, 'add_offset' => 0, 'limit' => $v['limit'], 'max_id' => $v['max_id'], 'min_id' =>  $v['min_id'], 'hash' => 0 ]);
  169.        
  170.         echo '::offset_id:'.($v['offset_id']).':min_id:'.$v['min_id'].':'.$end_offset.':' .PHP_EOL;
  171.        
  172.         if(!empty($messages_Messages['messages'])) {
  173.         foreach ($messages_Messages['messages'] as $message) {
  174.              if(isset($message['message'])){
  175.                 // echo "id: " . $message['id'];//.':'.$file_url['url'] . "\n";
  176.               //  echo @$message['message'];
  177.                  //echo "\n";
  178.                  #################
  179.                  /*if(!empty($message['media']) && ($message['media']['_']=='messageMediaPhoto'||$message['media']['_']=='messageMediaDocument')){
  180.                                  try{
  181.                                  $info =$MadelineProto->getDownloadInfo($message['media']);
  182.                                  echo $message['id'].':'.$info['name'].$info['ext'].PHP_EOL;
  183.                                     $tmpfile = tempnam("/tmp", "tl");
  184.                                     if($info['size']>0){
  185.                                         $MadelineProto->downloadToFile($message['media'], $tmpfile);
  186.                                         $file_url=uploadS3($sender,$info['name'].$info['ext'],$tmpfile);
  187.                                         unlink($tmpfile);
  188.                                     }
  189.                                     }catch(Exception $e){
  190.                                         //$info =$MadelineProto->getDownloadInfo($message['media']);
  191.                                         //   [_] => messageMediaWebPage
  192.                                     }
  193.                              }*/
  194.                  ####################
  195.                  $aa= [
  196.                     'id'=>$message['id'],
  197.                     'msg'=>@$message['message'],
  198.                     ];
  199.                     if(!empty($file_url['url'])){
  200.                         $aa['media_url']=$file_url['url'];
  201.                     }
  202.                    
  203.                 $st->push($aa);
  204.                 }
  205.         }
  206.         }
  207.         $st->rewind();
  208.         while($st->valid()){
  209.             $c = $st->current();
  210.             //print_r($c);
  211.             if(!empty($c['media_url'])){
  212.              echo 'send:'.$c['id'].':'.$c['media_url'] .PHP_EOL;
  213.             }else{
  214.                  echo 'send:'.$c['id'] .PHP_EOL;
  215.             }
  216.            
  217.              $e=array();
  218.              $e["body"]=$c['msg'].' https://t.me/'.$telegram_channel.'/'.$c['id'];
  219.             // $e["image"]
  220.             try{
  221.                 sendMessage($to,$e );
  222.             }catch(Exception $e){}
  223.            
  224.             $st->next();
  225.         }
  226.         //break;
  227.     }
  228.     echo '$i:'.$i.' $end_offset'.$end_offset.PHP_EOL;
  229.     $sql=sprintf(
  230.         "INSERT INTO `log` (`id` ,`telegram_channel`,`offset_id`)VALUES(%d,%s,%d) ON DUPLICATE KEY UPDATE offset_id=VALUES(offset_id),tdate=NOW()",
  231.              $id,
  232.              $pdo->quote($telegram_channel),
  233.              $end_offset
  234.          );  
  235.         if($pdo->exec($sql)){}
  236.        
  237. }
  238. echo 'end'.$offset_id_new.PHP_EOL;
  239.  
  240.  
  241.  
  242.  
  243. /*try{
  244.  
  245. }catch(Exception $e){}*/
  246.  
  247.    echo "\n";
  248.    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
  249.    
  250. };
  251.  
  252. $channel->basic_consume($queue_name, '',
  253. false,
  254. false, //подтвердить              
  255. false,
  256. false, $callback);
  257.  
  258. while ($channel->is_consuming()) {
  259. //while(count($channel->callbacks)) {
  260.     $channel->wait();
  261. }
  262.  
  263.  
  264.  
  265.  
  266. $channel->close();
  267. $connection->close();
  268.  
  269.  
  270.  
  271. #######################################
  272.  
  273. function tryLock()
  274. {
  275.     # If lock file exists, check if stale.  If exists and is not stale, return TRUE
  276.    # Else, create lock file and return FALSE.
  277.  
  278.     if (@symlink("/proc/" . getmypid(), LOCK_FILE) !== FALSE) # the @ in front of 'symlink' is to suppress the NOTICE you get if the LOCK_FILE exists
  279.        return true;
  280.  
  281.     # link already exists
  282.    # check if it's stale
  283.    if (is_link(LOCK_FILE) && !is_dir(LOCK_FILE))
  284.     {
  285.         unlink(LOCK_FILE);
  286.         # try to lock again
  287.        return tryLock();
  288.     }
  289.  
  290.     return false;
  291. }
  292.  
  293.  
  294.  
  295.  
  296.  
  297.  
  298.  
  299.  
  300.  
  301.  
  302.  
  303. function uploadS3($sender,$FileName,$uploadedFile){
  304.         //$uploadedFile = $uploadedFiles[$var];
  305.             $s3 = new S3Client(array(
  306.                 'region' => "eu-west-1",
  307.                 'version' => 'latest',
  308.                 'endpoint' => "http://**:8000",
  309.                 'credentials' => array(
  310.                     'key' =>"*",
  311.                     'secret' => "*",
  312.                 ),
  313.                 'use_path_style_endpoint' => true
  314.                 //,'debug'=>true
  315.             ));
  316.                 $md5=md5_file($uploadedFile);
  317.                 $Key ="{$md5[0]}/{$md5[1]}/{$md5[2]}/{$md5[3]}/". substr($md5,4).'/'.$FileName;
  318.                 if($s3->doesObjectExist("app",$Key))return ['exist'=>true,'url'=>$s3->getObjectUrl("app", $Key)];
  319.                
  320.                 $result = $s3->putObject(array(
  321.                     'Bucket' => "bot",
  322.                     'Key' => $Key,
  323.                     'SourceFile' => $uploadedFile,
  324.                     'ACL' => 'public-read',
  325.                 ));
  326.                 if($result instanceof \Aws\Result){
  327.                     return ['url'=>$result->get('ObjectURL')];
  328.                 }else throw new \Exception('S3 not upload',504);
  329.     }
  330.    
  331.    
  332.    
  333.    
  334.    
  335.    
  336. function sendMessage($jid,$param){
  337. try{
  338.     $id=explode('@',$jid)[0];
  339. $context = stream_context_create(array(
  340.   'http' => array(
  341.     'method' => 'POST',
  342.     'header' => "Content-Type: application/json\r\n"
  343.         . 'Authorization: Basic ' . base64_encode("user@localhost:pass"),
  344.     'content' =>  json_encode($param)
  345.     ),
  346.      "ssl"=>array(
  347.         "verify_peer"=>false,
  348.         "verify_peer_name"=>false,
  349.     ),
  350.   )
  351. );
  352. $j= file_get_contents("https://*:8089/api/rooms/$id/messages", false, $context);
  353. $j=json_decode($j,true);
  354. return $j['id'];
  355. } catch (Exception $e) {
  356.     return false;
  357. }
  358. }
  359.    
  360.    
  361.    
Add Comment
Please, Sign In to add comment