By Alex
/ in
Broadway是一个基于JavaScript的H.264解码器,支持Baseline Profile,我们在HTML5视频监控技术预研一文中介绍过这个库。如果你的监控摄像头支持Baseline的H.264码流,利用Broadway可以实现不需要重新编码的视频监控,这样服务器的负载可以大大减轻。
本文不进行理论知识的讨论,仅仅给出一个简单的实现。此实现由三个部分组成:
代码托管于GitHub:https://github.com/gmemcc/h5vs.git
这部分主要是一个RTSP客户端,功能上面已经介绍过,此客户端依赖于我以前一篇文章中的live555 RTSP客户端封装。
// // Created by alex on 10/9/17. // #ifndef LIVE5555_WEBSOCKETCLIENT_H #define LIVE5555_WEBSOCKETCLIENT_H #include <pthread.h> #include <websocketpp/config/asio_no_tls_client.hpp> #include <websocketpp/client.hpp> typedef websocketpp::client<websocketpp::config::asio_client> WebSocketppClient; typedef websocketpp::connection_hdl WebSocketppConnHdl; class WebSocketClient { private: char *url; pthread_t wsThread; WebSocketppClient *wsppClient; WebSocketppConnHdl wsppConnHdl; public: WebSocketClient( char *url ); char *getUrl() const; virtual void connect(); virtual void sendBytes( unsigned char *buf, unsigned size ); virtual void sendText( char *text ); virtual ~WebSocketClient(); pthread_t getWsThread() const; WebSocketppClient *getWsppClient(); void setWsppConnHdl( WebSocketppConnHdl wsppConnHdl ); }; #endif //LIVE5555_WEBSOCKETCLIENT_H
// // Created by alex on 10/9/17. // #include "WebSocketClient.h" using websocketpp::lib::placeholders::_1; using websocketpp::lib::placeholders::_2; using websocketpp::lib::bind; #include "spdlog/spdlog.h" static auto LOGGER = spdlog::stdout_color_st( "WebSocketClient" ); WebSocketClient::WebSocketClient( char *url ) : url( url ), wsppClient( new WebSocketppClient()) { } WebSocketClient::~WebSocketClient() { delete wsppClient; } static void *wsRoutine( void *arg ) { WebSocketClient *client = (WebSocketClient *) arg; WebSocketppClient *wsppClient = client->getWsppClient(); wsppClient->clear_access_channels( websocketpp::log::alevel::frame_header ); wsppClient->clear_access_channels( websocketpp::log::alevel::frame_payload ); wsppClient->init_asio(); websocketpp::lib::error_code ec; WebSocketppClient::connection_ptr con = wsppClient->get_connection( std::string( client->getUrl()), ec ); wsppClient->connect( con ); client->setWsppConnHdl( con->get_handle()); wsppClient->run(); } void WebSocketClient::connect() { pthread_create( &wsThread, NULL, wsRoutine, (void *) this ); } void WebSocketClient::sendBytes( unsigned char *buf, unsigned size ) { wsppClient->send( wsppConnHdl, buf, size, websocketpp::frame::opcode::BINARY ); } void WebSocketClient::sendText( char *text ) { wsppClient->send( wsppConnHdl, text, strlen( text ), websocketpp::frame::opcode::TEXT ); } char *WebSocketClient::getUrl() const { return url; } pthread_t WebSocketClient::getWsThread() const { return wsThread; } WebSocketppClient *WebSocketClient::getWsppClient() { return wsppClient; }; void WebSocketClient::setWsppConnHdl( WebSocketppConnHdl wsppConnHdl ) { this->wsppConnHdl = wsppConnHdl; }
#include <iostream> #include "live5555/client.h" #include "spdlog/spdlog.h" #include "WebSocketClient.h" static auto LOGGER = spdlog::stdout_color_st( "wspush" ); class VideoSink : public SinkBase { private: #ifdef _SAVE_H264_SEQ FILE *os = fopen( "./rtsp.h264", "w" ); #endif WebSocketClient *wsClient; bool firstFrameWritten; const char *sPropParameterSetsStr; unsigned char const start_code[4] = { 0x00, 0x00, 0x00, 0x01 }; public: VideoSink( UsageEnvironment &env, unsigned int recvBufSize, WebSocketClient *wsClient ) : SinkBase( env, recvBufSize ), wsClient( wsClient ) { // 缓冲区前面留出起始码4字节 recvBuf += sizeof( start_code ); } virtual ~VideoSink() { } virtual void onMediaSubsessionOpened( MediaSubsession *subSession ) { sPropParameterSetsStr = subSession->fmtp_spropparametersets(); } void afterGettingFrame( unsigned frameSize, unsigned numTruncatedBytes, struct timeval presentationTime ) override { size_t scLen = sizeof( start_code ); if ( !firstFrameWritten ) { // 填写起始码 memcpy( recvBuf - scLen, start_code, scLen ); // 防止RTSP源不送SPS/PPS unsigned numSPropRecords; SPropRecord *sPropRecords = parseSPropParameterSets( sPropParameterSetsStr, numSPropRecords ); for ( unsigned i = 0; i < numSPropRecords; ++i ) { unsigned int propLen = sPropRecords[ i ].sPropLength; size_t bufLen = propLen + scLen; unsigned char buf[bufLen]; memcpy( buf, start_code, scLen ); memcpy( buf + scLen, sPropRecords[ i ].sPropBytes, propLen ); wsClient->sendBytes( buf, bufLen ); #ifdef _SAVE_H264_SEQ fwrite( buf, sizeof( unsigned char ), bufLen, os ); #endif } firstFrameWritten = true; } #ifdef _SAVE_H264_SEQ fwrite( recvBuf - scLen, sizeof( unsigned char ), frameSize + scLen, os ); #endif unsigned naluHead = recvBuf[ 0 ]; unsigned nri = naluHead >> 5; unsigned f = nri >> 2; unsigned type = naluHead & 0b00011111; wsClient->sendBytes( recvBuf - scLen, frameSize + scLen ); LOGGER->trace( "NALU info: nri {} type {}", nri, type ); SinkBase::afterGettingFrame( frameSize, numTruncatedBytes, presentationTime ); } }; class H264RTSPClient : public RTSPClientBase { private: VideoSink *videoSink; public: H264RTSPClient( UsageEnvironment &env, const char *rtspURL, VideoSink *videoSink ) : RTSPClientBase( env, rtspURL ), videoSink( videoSink ) {} protected: // 测试用的摄像头(RTSP源)仅仅有一个子会话,因此这里简化了实现: bool acceptSubSession( const char *mediumName, const char *codec ) override { return true; } MediaSink *createSink( const char *mediumName, const char *codec, MediaSubsession *subSession ) override { videoSink->onMediaSubsessionOpened( subSession ); return videoSink; } }; int main() { spdlog::set_pattern( "%Y-%m-%d %H:%M:%S.%e [%l] [%n] %v" ); spdlog::set_level( spdlog::level::trace ); WebSocketClient *wsClient; wsClient = new WebSocketClient( "ws://192.168.0.89:9090/h264src" ); wsClient->connect(); sleep( 3 ); // 等待WebSocket连接建立 wsClient->sendText( "ch1" ); TaskScheduler *scheduler = BasicTaskScheduler::createNew(); BasicUsageEnvironment *env = BasicUsageEnvironment::createNew( *scheduler ); VideoSink *sink = new VideoSink( *env, 1024 * 1024, wsClient ); H264RTSPClient *client = new H264RTSPClient( *env, "rtsp://admin:kingsmart123@192.168.0.196:554/ch1/sub/av_stream", sink ); client->start(); return 0; }
这部分实现了NALU转发功能,基于Spring Boot实现。
package cc.gmem.study.kurento; import org.kurento.client.KurentoClient; import org.kurento.client.KurentoClientBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.simp.config.ChannelRegistration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.messaging.simp.stomp.StompCommand; import org.springframework.messaging.simp.stomp.StompHeaderAccessor; import org.springframework.messaging.support.ChannelInterceptorAdapter; import org.springframework.messaging.support.MessageHeaderAccessor; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.config.annotation.*; import org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean; import sun.security.acl.PrincipalImpl; import java.security.Principal; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @SpringBootApplication @EnableWebSocket @EnableWebSocketMessageBroker @EnableScheduling public class VideoSurveillanceApp extends AbstractWebSocketMessageBrokerConfigurer implements WebSocketConfigurer { private static final Logger LOGGER = LoggerFactory.getLogger( VideoSurveillanceApp.class ); @Bean public ServletServerContainerFactoryBean createWebSocketContainer() { ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean(); // WebSocket消息缓冲区大小,如果客户端发来的消息较大,需要按需调整 container.setMaxTextMessageBufferSize( 1024 * 1024 ); container.setMaxBinaryMessageBufferSize( 1024 * 1024 ); return container; } @Override public void registerWebSocketHandlers( WebSocketHandlerRegistry registry ) { registry.addHandler( h264FrameSinkHandler(), "/h264sink" ); registry.addHandler( h264FrameSrcHandler(), "/h264src" ); } @Bean public WebSocketHandler h264FrameSrcHandler() { return new H264FrameSrcHandler(); } @Bean public WebSocketHandler h264FrameSinkHandler() { return new H264FrameSinkHandler(); } public static void main( String[] args ) { new SpringApplication( VideoSurveillanceApp.class ).run( args ); } }
此Bean接受C++程序的NALU推送:
package cc.gmem.study.kurento; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.socket.BinaryMessage; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.AbstractWebSocketHandler; import javax.inject.Inject; import java.nio.ByteBuffer; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class H264FrameSrcHandler extends AbstractWebSocketHandler { private static final Logger LOGGER = LoggerFactory.getLogger( H264FrameSrcHandler.class ); private Map<String, String> sessionIdToChannel = new ConcurrentHashMap<>(); @Inject private H264FrameSinkHandler sinkHandler; public void afterConnectionEstablished( WebSocketSession session ) throws Exception { LOGGER.debug( "{} connected.", session.getRemoteAddress() ); } @Override protected void handleBinaryMessage( WebSocketSession session, BinaryMessage message ) throws Exception { ByteBuffer payload = message.getPayload(); StringBuilder hex = new StringBuilder(); byte[] pa = payload.array(); int len = 16; if ( pa.length < 16 ) len = pa.length; for ( byte i = 0; i < len; i++ ) { hex.append( String.format( "%02x ",Byte.toUnsignedInt( pa[i] ) ) ); } LOGGER.debug( "Received binary message {} bytes: {}...", payload.array().length, hex ); String chnl = sessionIdToChannel.get( session.getId() ); if ( chnl != null ) sinkHandler.broadcast( chnl, payload ); } @Override protected void handleTextMessage( WebSocketSession session, TextMessage message ) throws Exception { String payload = message.getPayload(); sessionIdToChannel.put( session.getId(), payload ); LOGGER.debug( "Received text message: {}", payload ); } }
此Bean向Web客户端广播NALU:
package cc.gmem.study.kurento; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.mutable.MutableInt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.web.socket.BinaryMessage; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.TextWebSocketHandler; import javax.inject.Inject; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.ConcurrentHashMap; public class H264FrameSinkHandler extends TextWebSocketHandler { private static final Logger LOGGER = LoggerFactory.getLogger( H264FrameSinkHandler.class ); public static final String ACTION_INIT = "init"; private static final String ACTION_INIT_RESP = "initresp"; public static final String ACTION_PLAY = "play"; public static final String ACTION_STOP = "stop"; public static final String KEY_ACTION = "action"; @Inject private ObjectMapper om; private Map<String, List<WebSocketSession>> chnlToSessions = new ConcurrentHashMap<>(); @Override protected void handleTextMessage( WebSocketSession session, TextMessage message ) throws Exception { String client = session.getId() + '@' + session.getRemoteAddress(); Map req = om.readValue( message.getPayload(), Map.class ); Map resp = new LinkedHashMap(); Object action = req.get( KEY_ACTION ); if ( ACTION_INIT.equals( action ) ) { String channel = (String) req.get( "channel" ); LOGGER.debug( "{} request to subscribe channel {}", client, channel ); addPushTarget( channel, session ); resp.put( KEY_ACTION, ACTION_INIT_RESP ); resp.put( "width", 352 ); resp.put( "height", 288 ); session.sendMessage( new TextMessage( om.writeValueAsString( resp ) ) ); } else if ( ACTION_PLAY.equals( action ) ) { LOGGER.debug( "{} request to receive nalu push", session.getRemoteAddress(), client ); session.getAttributes().put( ACTION_PLAY, true ); } } private synchronized void addPushTarget( String channel, WebSocketSession session ) { List<WebSocketSession> sessions = chnlToSessions.get( channel ); if ( sessions == null ) { sessions = new ArrayList<>(); chnlToSessions.put( channel, sessions ); } sessions.add( session ); } public synchronized void broadcast( String chnl, ByteBuffer payload ) { List<WebSocketSession> sessions = chnlToSessions.get( chnl ); if ( sessions == null ) return; sessions.forEach( sess -> { try { if ( sess.isOpen() && Boolean.TRUE.equals( sess.getAttributes().get( ACTION_PLAY ) ) ) { sess.sendMessage( new BinaryMessage( payload ) ); } } catch ( Exception e ) { LOGGER.error( e.getMessage(), e ); } } ); } @Scheduled( fixedRate = 10000 ) public synchronized void cleanup() { final MutableInt counter = new MutableInt( 0 ); chnlToSessions.values().forEach( sessions -> { Iterator<WebSocketSession> it = sessions.listIterator(); while ( it.hasNext() ) { if ( !it.next().isOpen() ) { it.remove(); counter.increment(); } } } ); if ( counter.intValue() > 0 ) LOGGER.debug( "Remove {} invalid websocket session.", counter ); } }
我们对http-live-player进行了简单的修改,主要是修改其通信方式以配合上述WebSocket服务器。核心代码没有变动,因此这里不张贴其代码。
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Broadway Video Surveillance</title> <script src="js/broadway/http-live-player.js"></script> <link rel="stylesheet" href="style.css"> </head> <body> <div class="nvbar"> <div class="title">基于Broadway+WebSocket的视频监控示例</div> <div class="subtitle">http://192.168.0.89:9090/broadway.html</div> </div> <div class="videos-wrapper"> <div id="videos"></div> </div> <script type="text/javascript"> var videos = document.getElementById( 'videos' ); for ( var i = 0; i < 9; i++ ) { var canvas = document.createElement( "canvas" ); videos.appendChild( canvas ); var player = new WSAvcPlayer( canvas, "webgl", 'ch1', true ); player.connect( "ws://" + document.location.host + "/h264sink" ); } </script> </body> </html>
下面的截图是开了九画面的视频监控,使用的是子码流,在测试机器上CPU压力不大。
注意:如果Broadway来不及解码,http-live-player会把缓冲区中的所有NALU全部丢弃,这可能导致暂时的花屏。选择适当的帧率、码率、画幅可以尽量避免这种情况的发生。
Leave a Reply