基于Broadway的HTML5视频监控
简介
Broadway是一个基于JavaScript的H.264解码器,支持Baseline Profile,我们在HTML5视频监控技术预研一文中介绍过这个库。如果你的监控摄像头支持Baseline的H.264码流,利用Broadway可以实现不需要重新编码的视频监控,这样服务器的负载可以大大减轻。
本文不进行理论知识的讨论,仅仅给出一个简单的实现。此实现由三个部分组成:
- 基于live555的C++程序,用来从视频源取RTP流,解析出NALU然后通过WebSocket推送给WebSocket服务器
 - 基于Spring Boot的Java WebSocket服务器,接收C++程序推送来的NALU并广播给客户端
 - 基于Broadway的HTML5视频监控客户端,为了简化开发,我们使用了Broadway的一个封装http-live-player
 
代码托管于GitHub:https://github.com/gmemcc/h5vs.git
C++部分
这部分主要是一个RTSP客户端,功能上面已经介绍过,此客户端依赖于我以前一篇文章中的live555 RTSP客户端封装。
WebSocket客户端
		| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43  | 
						// // Created by alex on 10/9/17. // #ifndef LIVE5555_WEBSOCKETCLIENT_H #define LIVE5555_WEBSOCKETCLIENT_H #include <pthread.h> #include <websocketpp/config/asio_no_tls_client.hpp> #include <websocketpp/client.hpp> typedef websocketpp::client<websocketpp::config::asio_client> WebSocketppClient; typedef websocketpp::connection_hdl WebSocketppConnHdl; class WebSocketClient { private:     char *url;     pthread_t wsThread;     WebSocketppClient *wsppClient;     WebSocketppConnHdl wsppConnHdl; public:     WebSocketClient( char *url );     char *getUrl() const;     virtual void connect();     virtual void sendBytes( unsigned char *buf, unsigned size );     virtual void sendText( char *text );     virtual ~WebSocketClient();     pthread_t getWsThread() const;     WebSocketppClient *getWsppClient();     void setWsppConnHdl( WebSocketppConnHdl wsppConnHdl ); }; #endif //LIVE5555_WEBSOCKETCLIENT_H  | 
					
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63  | 
						// // Created by alex on 10/9/17. // #include "WebSocketClient.h" using websocketpp::lib::placeholders::_1; using websocketpp::lib::placeholders::_2; using websocketpp::lib::bind; #include "spdlog/spdlog.h" static auto LOGGER = spdlog::stdout_color_st( "WebSocketClient" ); WebSocketClient::WebSocketClient( char *url ) : url( url ), wsppClient( new WebSocketppClient()) { } WebSocketClient::~WebSocketClient() {     delete wsppClient; } static void *wsRoutine( void *arg ) {     WebSocketClient *client = (WebSocketClient *) arg;     WebSocketppClient *wsppClient = client->getWsppClient();     wsppClient->clear_access_channels( websocketpp::log::alevel::frame_header );     wsppClient->clear_access_channels( websocketpp::log::alevel::frame_payload );     wsppClient->init_asio();     websocketpp::lib::error_code ec;     WebSocketppClient::connection_ptr con = wsppClient->get_connection( std::string( client->getUrl()), ec );     wsppClient->connect( con );     client->setWsppConnHdl( con->get_handle());     wsppClient->run(); } void WebSocketClient::connect() {     pthread_create( &wsThread, NULL, wsRoutine, (void *) this ); } void WebSocketClient::sendBytes( unsigned char *buf, unsigned size ) {     wsppClient->send( wsppConnHdl, buf, size, websocketpp::frame::opcode::BINARY ); } void WebSocketClient::sendText( char *text ) {     wsppClient->send( wsppConnHdl, text, strlen( text ), websocketpp::frame::opcode::TEXT ); } char *WebSocketClient::getUrl() const {     return url; } pthread_t WebSocketClient::getWsThread() const {     return wsThread; } WebSocketppClient *WebSocketClient::getWsppClient() {     return wsppClient; }; void WebSocketClient::setWsppConnHdl( WebSocketppConnHdl wsppConnHdl ) {     this->wsppConnHdl = wsppConnHdl; }  | 
					
主程序
		| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100  | 
						#include <iostream> #include "live5555/client.h" #include "spdlog/spdlog.h" #include "WebSocketClient.h" static auto LOGGER = spdlog::stdout_color_st( "wspush" ); class VideoSink : public SinkBase { private: #ifdef _SAVE_H264_SEQ     FILE *os = fopen( "./rtsp.h264", "w" ); #endif     WebSocketClient *wsClient;     bool firstFrameWritten;     const char *sPropParameterSetsStr;     unsigned char const start_code[4] = { 0x00, 0x00, 0x00, 0x01 }; public:     VideoSink( UsageEnvironment &env, unsigned int recvBufSize, WebSocketClient *wsClient ) : SinkBase( env, recvBufSize ), wsClient( wsClient ) {         // 缓冲区前面留出起始码4字节         recvBuf += sizeof( start_code );     }     virtual ~VideoSink() {     }     virtual void onMediaSubsessionOpened( MediaSubsession *subSession ) {         sPropParameterSetsStr = subSession->fmtp_spropparametersets();     }     void afterGettingFrame( unsigned frameSize, unsigned numTruncatedBytes, struct timeval presentationTime ) override {         size_t scLen = sizeof( start_code );         if ( !firstFrameWritten ) {             // 填写起始码             memcpy( recvBuf - scLen, start_code, scLen );             // 防止RTSP源不送SPS/PPS             unsigned numSPropRecords;             SPropRecord *sPropRecords = parseSPropParameterSets( sPropParameterSetsStr, numSPropRecords );             for ( unsigned i = 0; i < numSPropRecords; ++i ) {                 unsigned int propLen = sPropRecords[ i ].sPropLength;                 size_t bufLen = propLen + scLen;                 unsigned char buf[bufLen];                 memcpy( buf, start_code, scLen );                 memcpy( buf + scLen, sPropRecords[ i ].sPropBytes, propLen );                 wsClient->sendBytes( buf, bufLen ); #ifdef _SAVE_H264_SEQ                 fwrite( buf, sizeof( unsigned char ), bufLen, os ); #endif             }             firstFrameWritten = true;         } #ifdef _SAVE_H264_SEQ         fwrite( recvBuf - scLen, sizeof( unsigned char ), frameSize + scLen, os ); #endif         unsigned naluHead = recvBuf[ 0 ];         unsigned nri = naluHead >> 5;         unsigned f = nri >> 2;         unsigned type = naluHead & 0b00011111;         wsClient->sendBytes( recvBuf - scLen, frameSize + scLen );         LOGGER->trace( "NALU info: nri {} type {}", nri, type );         SinkBase::afterGettingFrame( frameSize, numTruncatedBytes, presentationTime );     } }; class H264RTSPClient : public RTSPClientBase { private:     VideoSink *videoSink; public:     H264RTSPClient( UsageEnvironment &env, const char *rtspURL, VideoSink *videoSink ) :         RTSPClientBase( env, rtspURL ), videoSink( videoSink ) {} protected:     // 测试用的摄像头(RTSP源)仅仅有一个子会话,因此这里简化了实现:     bool acceptSubSession( const char *mediumName, const char *codec ) override {         return true;     }     MediaSink *createSink( const char *mediumName, const char *codec, MediaSubsession *subSession ) override {         videoSink->onMediaSubsessionOpened( subSession );         return videoSink;     } }; int main() {     spdlog::set_pattern( "%Y-%m-%d %H:%M:%S.%e [%l] [%n] %v" );     spdlog::set_level( spdlog::level::trace );     WebSocketClient *wsClient;     wsClient = new WebSocketClient( "ws://192.168.0.89:9090/h264src" );     wsClient->connect();     sleep( 3 ); // 等待WebSocket连接建立     wsClient->sendText( "ch1" );     TaskScheduler *scheduler = BasicTaskScheduler::createNew();     BasicUsageEnvironment *env = BasicUsageEnvironment::createNew( *scheduler );     VideoSink *sink = new VideoSink( *env, 1024 * 1024, wsClient );     H264RTSPClient *client = new H264RTSPClient( *env, "rtsp://admin:kingsmart123@192.168.0.196:554/ch1/sub/av_stream", sink );     client->start();     return 0; }  | 
					
Java部分
这部分实现了NALU转发功能,基于Spring Boot实现。
主程序
		| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65  | 
						package cc.gmem.study.kurento; import org.kurento.client.KurentoClient; import org.kurento.client.KurentoClientBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.simp.config.ChannelRegistration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.messaging.simp.stomp.StompCommand; import org.springframework.messaging.simp.stomp.StompHeaderAccessor; import org.springframework.messaging.support.ChannelInterceptorAdapter; import org.springframework.messaging.support.MessageHeaderAccessor; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.config.annotation.*; import org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean; import sun.security.acl.PrincipalImpl; import java.security.Principal; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @SpringBootApplication @EnableWebSocket @EnableWebSocketMessageBroker @EnableScheduling public class VideoSurveillanceApp extends AbstractWebSocketMessageBrokerConfigurer implements WebSocketConfigurer {     private static final Logger LOGGER = LoggerFactory.getLogger( VideoSurveillanceApp.class );     @Bean     public ServletServerContainerFactoryBean createWebSocketContainer() {         ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();         // WebSocket消息缓冲区大小,如果客户端发来的消息较大,需要按需调整         container.setMaxTextMessageBufferSize( 1024 * 1024 );         container.setMaxBinaryMessageBufferSize( 1024 * 1024 );         return container;     }     @Override     public void registerWebSocketHandlers( WebSocketHandlerRegistry registry ) {         registry.addHandler( h264FrameSinkHandler(), "/h264sink" );         registry.addHandler( h264FrameSrcHandler(), "/h264src" );     }     @Bean     public WebSocketHandler h264FrameSrcHandler() {         return new H264FrameSrcHandler();     }     @Bean     public WebSocketHandler h264FrameSinkHandler() {         return new H264FrameSinkHandler();     }     public static void main( String[] args ) {         new SpringApplication( VideoSurveillanceApp.class ).run( args );     } }  | 
					
H264FrameSrcHandler
此Bean接受C++程序的NALU推送:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50  | 
						package cc.gmem.study.kurento; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.socket.BinaryMessage; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.AbstractWebSocketHandler; import javax.inject.Inject; import java.nio.ByteBuffer; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class H264FrameSrcHandler extends AbstractWebSocketHandler {     private static final Logger LOGGER = LoggerFactory.getLogger( H264FrameSrcHandler.class );     private Map<String, String> sessionIdToChannel = new ConcurrentHashMap<>();     @Inject     private H264FrameSinkHandler sinkHandler;     public void afterConnectionEstablished( WebSocketSession session ) throws Exception {         LOGGER.debug( "{} connected.", session.getRemoteAddress() );     }     @Override     protected void handleBinaryMessage( WebSocketSession session, BinaryMessage message ) throws Exception {         ByteBuffer payload = message.getPayload();         StringBuilder hex = new StringBuilder();         byte[] pa = payload.array();         int len = 16;         if ( pa.length < 16 ) len = pa.length;         for ( byte i = 0; i < len; i++ ) {             hex.append( String.format( "%02x ",Byte.toUnsignedInt( pa[i] )  ) );         }         LOGGER.debug( "Received binary message {} bytes: {}...", payload.array().length, hex );         String chnl = sessionIdToChannel.get( session.getId() );         if ( chnl != null ) sinkHandler.broadcast( chnl, payload );     }     @Override     protected void handleTextMessage( WebSocketSession session, TextMessage message ) throws Exception {         String payload = message.getPayload();         sessionIdToChannel.put( session.getId(), payload );         LOGGER.debug( "Received text message: {}", payload );     } }  | 
					
H264FrameSinkHandler
此Bean向Web客户端广播NALU:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96  | 
						package cc.gmem.study.kurento; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.mutable.MutableInt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.web.socket.BinaryMessage; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.TextWebSocketHandler; import javax.inject.Inject; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.ConcurrentHashMap; public class H264FrameSinkHandler extends TextWebSocketHandler {     private static final Logger LOGGER = LoggerFactory.getLogger( H264FrameSinkHandler.class );     public static final String ACTION_INIT = "init";     private static final String ACTION_INIT_RESP = "initresp";     public static final String ACTION_PLAY = "play";     public static final String ACTION_STOP = "stop";     public static final String KEY_ACTION = "action";     @Inject     private ObjectMapper om;     private Map<String, List<WebSocketSession>> chnlToSessions = new ConcurrentHashMap<>();     @Override     protected void handleTextMessage( WebSocketSession session, TextMessage message ) throws Exception {         String client = session.getId() + '@' + session.getRemoteAddress();         Map req = om.readValue( message.getPayload(), Map.class );         Map resp = new LinkedHashMap();         Object action = req.get( KEY_ACTION );         if ( ACTION_INIT.equals( action ) ) {             String channel = (String) req.get( "channel" );             LOGGER.debug( "{} request to subscribe channel {}", client, channel );             addPushTarget( channel, session );             resp.put( KEY_ACTION, ACTION_INIT_RESP );             resp.put( "width", 352 );             resp.put( "height", 288 );             session.sendMessage( new TextMessage( om.writeValueAsString( resp ) ) );         } else if ( ACTION_PLAY.equals( action ) ) {             LOGGER.debug( "{} request to receive nalu push", session.getRemoteAddress(), client );             session.getAttributes().put( ACTION_PLAY, true );         }     }     private synchronized void addPushTarget( String channel, WebSocketSession session ) {         List<WebSocketSession> sessions = chnlToSessions.get( channel );         if ( sessions == null ) {             sessions = new ArrayList<>();             chnlToSessions.put( channel, sessions );         }         sessions.add( session );     }     public synchronized void broadcast( String chnl, ByteBuffer payload ) {         List<WebSocketSession> sessions = chnlToSessions.get( chnl );         if ( sessions == null ) return;         sessions.forEach( sess -> {             try {                 if ( sess.isOpen() && Boolean.TRUE.equals( sess.getAttributes().get( ACTION_PLAY ) ) ) {                     sess.sendMessage( new BinaryMessage( payload ) );                 }             } catch ( Exception e ) {                 LOGGER.error( e.getMessage(), e );             }         } );     }     @Scheduled( fixedRate = 10000 )     public synchronized void cleanup() {         final MutableInt counter = new MutableInt( 0 );         chnlToSessions.values().forEach( sessions -> {             Iterator<WebSocketSession> it = sessions.listIterator();             while ( it.hasNext() ) {                 if ( !it.next().isOpen() ) {                     it.remove();                     counter.increment();                 }             }         } );         if ( counter.intValue() > 0 ) LOGGER.debug( "Remove {} invalid websocket session.", counter );     } }  | 
					
Web部分
我们对http-live-player进行了简单的修改,主要是修改其通信方式以配合上述WebSocket服务器。核心代码没有变动,因此这里不张贴其代码。
客户端代码
		| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27  | 
						<!DOCTYPE html> <html lang="en"> <head>     <meta charset="UTF-8">     <title>Broadway Video Surveillance</title>     <script src="js/broadway/http-live-player.js"></script>     <link rel="stylesheet" href="style.css"> </head> <body> <div class="nvbar">     <div class="title">基于Broadway+WebSocket的视频监控示例</div>     <div class="subtitle">http://192.168.0.89:9090/broadway.html</div> </div> <div class="videos-wrapper">     <div id="videos"></div> </div> <script type="text/javascript">     var videos = document.getElementById( 'videos' );     for ( var i = 0; i < 9; i++ ) {         var canvas = document.createElement( "canvas" );         videos.appendChild( canvas );         var player = new WSAvcPlayer( canvas, "webgl", 'ch1', true );         player.connect( "ws://" + document.location.host + "/h264sink" );     } </script> </body> </html>  | 
					
效果截图
下面的截图是开了九画面的视频监控,使用的是子码流,在测试机器上CPU压力不大。

注意:如果Broadway来不及解码,http-live-player会把缓冲区中的所有NALU全部丢弃,这可能导致暂时的花屏。选择适当的帧率、码率、画幅可以尽量避免这种情况的发生。
            
Leave a Reply