如何将bag文件批量转成mp4

目录

简介

这篇博客涉及的脚本用来将bag文件批量转化为mp4文件

dockerfile

FROM osrf/ros:kinetic-desktop-full
RUN sudo apt update && \\
    sudo apt install -y ffmpeg

Build docker image

docker build -t ros/kinetic:latest .

Build docker container

这里链接的数据卷,要换成你自己的本地路径。容器路径也可以自定义。

docker run -it -v /C/yp/project/realsense2video:/test ros/kinetic:latest

Run script

cd ${your project root}
python main.py --data_root /test/data --out_root /test/save --fps 25
param data_root:包含bag的文件夹
param out_root:输出目录
param fps:每秒的帧数

Source code

bag2video.py

# -*- coding: utf-8 -*-
 
#!/usr/bin/env python2
 
import roslib
#roslib.load_manifest(\'rosbag\')
import rospy
import rosbag
import sys, getopt
import os
from sensor_msgs.msg import CompressedImage  #压缩图片
from sensor_msgs.msg import Image
import cv2
 
import numpy as np
 
import shlex, subprocess  #读取命令行参数
#subprocess 是一个 python 标准类库,用于创建进程运行系统命令,并且可以连接进程的输入输出和
#错误管道,获取它们的返回,使用起来要优于 os.system,在这里我们使用这个库运行 hive 语句并获取返回结果。
 
#shlex 是一个 python 标准类库,使用这个类我们可以轻松的做出对 linux shell 的词法分析,在
#这里我们将格式化好的 hive 连接语句用 shlex 切分,配合 subprocess.run 使用。
MJPEG_VIDEO = 1
RAWIMAGE_VIDEO = 2
VIDEO_CONVERTER_TO_USE = \"ffmpeg\" # or you may want to use \"avconv\" #视频转换器
 
def print_help():
    print(\'rosbag2video.py [--fps 25] [--rate 1] [-o outputfile] [-v] [-s] [-t topic] bagfile1 [bagfile2] ...\')
    print()
    print(\'Converts image sequence(s) in ros bag file(s) to video file(s) with fixed frame rate using\',VIDEO_CONVERTER_TO_USE)
    print(VIDEO_CONVERTER_TO_USE,\'needs to be installed!\')
    print()
    print(\'--fps   Sets FPS value that is passed to\',VIDEO_CONVERTER_TO_USE)
    print(\'        Default is 25.\')
    print(\'-h      Displays this help.\')
    print(\'--ofile (-o) sets output file name.\')
    print(\'        If no output file name (-o) is given the filename \\\'<prefix><topic>.mp4\' is used and default output codec is h264.\')
    print(\'        Multiple image topics are supported only when -o option is _not_ used.\')
    print(\'        \',VIDEO_CONVERTER_TO_USE,\' will guess the format according to given extension.\')
    print(\'        Compressed and raw image messages are supported with mono8 and bgr8/rgb8/bggr8/rggb8 formats.\')
    print(\'--rate  (-r) You may slow down or speed up the video.\')
    print(\'        Default is 1.0, that keeps the original speed.\')
    print(\'-s      Shows each and every image extracted from the rosbag file (cv_bride is needed).\')
    print(\'--topic (-t) Only the images from topic \"topic\" are used for the video output.\')
    print(\'-v      Verbose messages are displayed.\')
    print(\'--prefix (-p) set a output file name prefix othervise \\\'bagfile1\' is used (if -o is not set).\')
    print(\'--start Optional start time in seconds.\')
    print(\'--end   Optional end time in seconds.\')
class RosVideoWriter():
    def __init__(self, fps=25.0, rate=1.0, topic=\"\", output_filename =\"\", display= False, verbose = False, start = rospy.Time(0), end = rospy.Time(sys.maxsize)):
        self.opt_topic = topic
        self.opt_out_file = output_filename
        self.opt_verbose = verbose
        self.opt_display_images = display
        self.opt_start = start
        self.opt_end = end
        self.rate = rate
        self.fps = fps
        self.opt_prefix= None
        self.t_first={}
        self.t_file={}
        self.t_video={}
        self.p_avconv = {}
        
    #语法分析Args
    def parseArgs(self, args):
        opts, opt_files = getopt.getopt(args,\"hsvr:o:t:p:\",[\"fps=\",\"rate=\",\"ofile=\",\"topic=\",\"start=\",\"end=\",\"prefix=\"])
        #getopt()
        for opt, arg in opts:
            if opt == \'-h\':
                print_help()
                sys.exit(0)
            elif opt == \'-s\':
                self.opt_display_images = True
            elif opt == \'-v\':
                self.opt_verbose = True
            elif opt in (\"--fps\"):
                self.fps = float(arg)
            elif opt in (\"-r\", \"--rate\"):
                self.rate = float(arg)
            elif opt in (\"-o\", \"--ofile\"):
                self.opt_out_file = arg
            elif opt in (\"-t\", \"--topic\"):
                self.opt_topic = arg
            elif opt in (\"-p\", \"--prefix\"):
                self.opt_prefix = arg
            elif opt in (\"--start\"):
                self.opt_start = rospy.Time(int(arg))
                if(self.opt_verbose):
                    print(\"starting at\",self.opt_start.to_sec())
            elif opt in (\"--end\"):
                self.opt_end = rospy.Time(int(arg))
                if(self.opt_verbose):
                    print(\"ending at\",self.opt_end.to_sec())
            else:
                print(\"opt:\", opt,\'arg:\', arg)
 
        if (self.fps<=0):
            print(\"invalid fps\", self.fps)
            self.fps = 1
 
        if (self.rate<=0):
            print(\"invalid rate\", self.rate)
            self.rate = 1
 
        if(self.opt_verbose):
            print(\"using \",self.fps,\" FPS\")
        return opt_files
    # filter messages using type or only the opic we whant from the \'topic\' argument
    def filter_image_msgs(self, topic, datatype, md5sum, msg_def, header):
        if(datatype==\"sensor_msgs/CompressedImage\"):
            if (self.opt_topic != \"\" and self.opt_topic == topic) or self.opt_topic == \"\":
                print(\"############# COMPRESSED IMAGE  ######################\")
                print(topic,\' with datatype:\', str(datatype))
                print()
                return True;
 
        if(datatype==\"theora_image_transport/Packet\"):
            if (self.opt_topic != \"\" and self.opt_topic == topic) or self.opt_topic == \"\":
                print(topic,\' with datatype:\', str(datatype))
                print(\'!!! theora is not supported, sorry !!!\')
                return False;
 
        if(datatype==\"sensor_msgs/Image\"):
            if (self.opt_topic != \"\" and self.opt_topic == topic) or self.opt_topic == \"\":
                print(\"############# UNCOMPRESSED IMAGE ######################\")
                print(topic,\' with datatype:\', str(datatype))
                print()
                return True;
 
        return False;
    def write_output_video(self, msg, topic, t, video_fmt, pix_fmt = \"\"):
        # no data in this topic
        if len(msg.data) == 0 :
            return
        # initiate data for this topic
        if not topic in self.t_first :
            self.t_first[topic] = t # timestamp of first image for this topic
            self.t_video[topic] = 0
            self.t_file[topic] = 0
        # if multiple streams of images will start at different times the resulting video files will not be in sync
        # current offset time we are in the bag file
        self.t_file[topic] = (t-self.t_first[topic]).to_sec()
        # fill video file up with images until we reache the current offset from the beginning of the bag file
        while self.t_video[topic] < self.t_file[topic]/self.rate :
            if not topic in self.p_avconv:
                # we have to start a new process for this topic
                if self.opt_verbose :
                    print(\"Initializing pipe for topic\", topic, \"at time\", t.to_sec())
                if self.opt_out_file==\"\":
                    out_file = self.opt_prefix + str(topic).replace(\"/\", \"_\")+\".mp4\"
                else:
                    out_file = self.opt_out_file
 
                if self.opt_verbose :
                    print(\"Using output file \", out_file, \" for topic \", topic, \".\")
 
                if video_fmt == MJPEG_VIDEO :
                    cmd = [VIDEO_CONVERTER_TO_USE, \'-v\', \'1\', \'-stats\', \'-r\',str(self.fps),\'-c\',\'mjpeg\',\'-f\',\'mjpeg\',\'-i\',\'-\',\'-an\',out_file]
                    self.p_avconv[topic] = subprocess.Popen(cmd, stdin=subprocess.PIPE)
                    if self.opt_verbose :
                        print(\"Using command line:\")
                        print(cmd)
                elif video_fmt == RAWIMAGE_VIDEO :
                    size = str(msg.width)+\"x\"+str(msg.height)
                    cmd = [VIDEO_CONVERTER_TO_USE, \'-v\', \'1\', \'-stats\',\'-r\',str(self.fps),\'-f\',\'rawvideo\',\'-s\',size,\'-pix_fmt\', pix_fmt,\'-i\',\'-\',\'-an\',out_file]
                    self.p_avconv[topic] = subprocess.Popen(cmd, stdin=subprocess.PIPE)
                    if self.opt_verbose :
                        print(\"Using command line:\")
                        print(cmd)
 
                else :
                    print(\"Script error, unknown value for argument video_fmt in function write_output_video.\")
                    exit(1)
            # send data to ffmpeg process pipe
            self.p_avconv[topic].stdin.write(msg.data)
            # next frame time
            self.t_video[topic] += 1.0/self.fps
 
    def addBag(self, filename):
        if self.opt_display_images:
            from cv_bridge import CvBridge, CvBridgeError
            bridge = CvBridge()
            cv_image = []
 
        if self.opt_verbose :
            print(\"Bagfile: {}\".format(filename))
 
        if not self.opt_prefix:
            # create the output in the same folder and name as the bag file minu \'.bag\'
            self.opt_prefix = bagfile[:-4]
 
        #Go through the bag file
        bag = rosbag.Bag(filename)
        if self.opt_verbose :
            print(\"Bag opened.\")
        # loop over all topics
        for topic, msg, t in bag.read_messages(connection_filter=self.filter_image_msgs, start_time=self.opt_start, end_time=self.opt_end):
            try:
                if msg.format.find(\"jpeg\")!=-1 :
                    if msg.format.find(\"8\")!=-1 and (msg.format.find(\"rgb\")!=-1 or msg.format.find(\"bgr\")!=-1 or msg.format.find(\"bgra\")!=-1 ):
                        if self.opt_display_images:
                            np_arr = np.fromstring(msg.data, np.uint8)
                            cv_image = cv2.imdecode(np_arr, cv2.CV_LOAD_IMAGE_COLOR)
                        self.write_output_video( msg, topic, t, MJPEG_VIDEO )
                    elif msg.format.find(\"mono8\")!=-1 :
                        if self.opt_display_images:
                            np_arr = np.fromstring(msg.data, np.uint8)
                            cv_image = cv2.imdecode(np_arr, cv2.CV_LOAD_IMAGE_COLOR)
                        self.write_output_video( msg, topic, t, MJPEG_VIDEO )
                    elif msg.format.find(\"16UC1\")!=-1 :
                        if self.opt_display_images:
                            np_arr = np.fromstring(msg.data, np.uint16)
                            cv_image = cv2.imdecode(np_arr, cv2.CV_LOAD_IMAGE_COLOR)
                        self.write_output_video( msg, topic, t, MJPEG_VIDEO )
                    else:
                        print(\'unsupported jpeg format:\', msg.format, \'.\', topic)
 
            # has no attribute \'format\'
            except AttributeError:
                try:
                        pix_fmt=None
                        if msg.encoding.find(\"mono8\")!=-1 or msg.encoding.find(\"8UC1\")!=-1:
                            pix_fmt = \"gray\"
                            if self.opt_display_images:
                                cv_image = bridge.imgmsg_to_cv2(msg, \"bgr8\")
 
                        elif msg.encoding.find(\"bgra\")!=-1 :
                            pix_fmt = \"bgra\"
                            if self.opt_display_images:
                                cv_image = bridge.imgmsg_to_cv2(msg, \"bgr8\")
 
                        elif msg.encoding.find(\"bgr8\")!=-1 :
                            pix_fmt = \"bgr24\"
                            if self.opt_display_images:
                                cv_image = bridge.imgmsg_to_cv2(msg, \"bgr8\")
                        elif msg.encoding.find(\"bggr8\")!=-1 :
                            pix_fmt = \"bayer_bggr8\"
                            if self.opt_display_images:
                                cv_image = bridge.imgmsg_to_cv2(msg, \"bayer_bggr8\")
                        elif msg.encoding.find(\"rggb8\")!=-1 :
                            pix_fmt = \"bayer_rggb8\"
                            if self.opt_display_images:
                                cv_image = bridge.imgmsg_to_cv2(msg, \"bayer_rggb8\")
                        elif msg.encoding.find(\"rgb8\")!=-1 :
                            pix_fmt = \"rgb24\"
                            if self.opt_display_images:
                                cv_image = bridge.imgmsg_to_cv2(msg, \"bgr8\")
                        elif msg.encoding.find(\"16UC1\")!=-1 :
                            pix_fmt = \"gray16le\"
                        else:
                            print(\'unsupported encoding:\', msg.encoding, topic)
                            #exit(1)
                        if pix_fmt:
                            self.write_output_video( msg, topic, t, RAWIMAGE_VIDEO, pix_fmt )
 
                except AttributeError:
                    # maybe theora packet
                    # theora not supported
                    if self.opt_verbose :
                        print(\"Could not handle this format. Maybe thoera packet? theora is not supported.\")
                    pass
            if self.opt_display_images:
                cv2.imshow(topic, cv_image)
                key=cv2.waitKey(1)
                if key==1048603:
                    exit(1)
        if self.p_avconv == {}:
            print(\"No image topics found in bag:\", filename)
        bag.close()
if __name__ == \'__main__\':
    #print()
    #print(\'rosbag2video, by Maximilian Laiacker 2020 and Abel Gabor 2019\')
    #print()
 
    if len(sys.argv) < 2:
        print(\'Please specify ros bag file(s)!\')
        print_help()
        sys.exit(1)
    else :
        videowriter = RosVideoWriter()
        try:
            opt_files = videowriter.parseArgs(sys.argv[1:])
        except getopt.GetoptError:
            print_help()
            sys.exit(2)
    # loop over all files
    for files in range(0,len(opt_files)):
        #First arg is the bag to look at
        bagfile = opt_files[files]
        videowriter.addBag(bagfile)
    print(\"finished\")

main.py

import glob
import sys
import glob
import os
import argparse

def mkdir(path):
    if not os.path.exists(path):
        os.mkdir(path)
if __name__ == \"__main__\":
    parser = argparse.ArgumentParser(
        description=\'\'\'This is a code for bag2video.\'\'\')
    parser.add_argument(\'--data_root\', type=str, default=r\'/test/data\', help=\'path to the root of data\')
    parser.add_argument(\'--out_root\', type=str, default=r\'/test/save\', help=\'path to the root of data\')
    parser.add_argument(\'--fps\', type=int, default=25, help=\'path to the root of data\')
    args = parser.parse_args()

    mkdir(args.out_root)
    bagList = glob.glob(os.path.join(args.data_root, \"*.bag\"))
    for bagPath in bagList:
        baseName = os.path.basename(bagPath).split(\".\")[0]
        outPath = os.path.join(args.out_root, baseName + \".mp4\")
        os.system(\"python bag2video.py -o {outName} --fps {fps} {bagPath}\".format(outName=outPath, fps=args.fps, bagPath=bagPath))
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容