Π Π°Π·ΡΠ°Π±ΠΎΡΠΊΠ° Π½Π° Python
Π΄ΠΈΡΡΠ°Π½ΡΠΈΠΎΠ½Π½ΠΎ
Π΄ΠΎΠ³ΠΎΠ²ΠΎΡΠ½Π°Ρ
Data Science. ΠΠΎΡΠ°Π±ΠΎΡΠΊΠ° ΡΡΡΠ΅ΡΡΠ²ΡΡΡΠ΅Π³ΠΎ ΠΏΡΠΎΠ΄ΡΠΊΡΠ°, Π½Π°ΡΡΡΠΎΠΉΠΊΠ°. ΠΡΠΎ ΠΏΡΠΎΠ³ΡΠ°ΠΌΠΌΠ°, ΠΊΠΎΡΠΎΡΠ°Ρ Π΄ΠΎΠ»ΠΆΠ½Π° ΠΏΠΎΠ»ΡΡΠ°ΡΡ Ρ ΠΊΠ°ΠΌΠ΅ΡΡ(dev/video0) Π²ΠΈΠ΄Π΅ΠΎ Π² ΠΏΠΎΡΠΎΠΊΠΎΠ²ΠΎΠΌ ΡΠΎΡΠΌΠ°ΡΠ΅(ΡΠ΅ΠΉΡΠ°Ρ ΡΡΠΎ h264) ΠΈ ΡΡΠ°Π½ΡΠ»ΠΈΡΠΎΠ²Π°ΡΡ ΠΏΠΎ rtsp Π½Π° ΠΌΠΎΠΉ ip Π² Π»ΠΎΠΊΠ°Π»ΡΠ½ΠΎΠΉ ΡΠ΅ΡΠΈ(Π±Π΅Π· Π΄ΠΎΡΡΡΠΏΠ° Π² ΠΈΠ½ΡΠ΅ΡΠ½Π΅Ρ). ΠΠ°ΡΠ° Π·Π°Π΄Π°ΡΠ° - ΠΏΠ΅ΡΠ΅Π΄Π΅Π»Π°ΡΡ ΡΠ°Π±ΠΎΡΡ Ρ ΡΠΎΡΠΌΠ°ΡΠΎΠΌ ΠΏΠΎΡΠΎΠΊΠΎΠ²ΠΎΠ³ΠΎ Π²ΠΈΠ΄Π΅ΠΎ, Π±ΡΠ»ΠΎ Π²ΠΈΠ΄Π΅ΠΎ .h264, Π° ΡΠ΅ΠΏΠ΅ΡΡ Π½ΡΠΆΠ½ΠΎ ΠΏΠΎΡΠΎΠΊΠΎΠ²ΠΎΠ΅, mpeg Π½Π°ΠΏΡΠΈΠΌΠ΅Ρ. import gi gi.require_version('Gst', '1.0') gi.require_version('GstRtspServer', '1.0') from gi.repository import GLib, Gst, GstRtspServer from picamera2 import Picamera2 from picamera2.encoders import H264Encoder from picamera2.outputs import FfmpegOutput import socket # Π€ΡΠ½ΠΊΡΠΈΡ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ IP Π°Π΄ΡΠ΅ΡΠ° def get_local_ip(): try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(('[Π’Π΅Π»Π΅ΡΠΎΠ½ ΡΠΊΡΡΡ]', 1)) ip = s.getsockname()[0] except Exception: ip = '127.0.0.1' finally: s.close() return ip class CameraRtspFactory(GstRtspServer.RTSPMediaFactory): def init(self, pipeline): super(CameraRtspFactory, self).init() self.pipeline = pipeline def do_create_element(self, url): return self.pipeline def main(): # ΠΠ½ΠΈΡΠΈΠ°Π»ΠΈΠ·Π°ΡΠΈΡ ΠΊΠ°ΠΌΠ΅ΡΡ picam2 = Picamera2() video_config = picam2.create_video_configuration(main={"size": (1920, 1080)}) picam2.configure(video_config) encoder = H264Encoder(bitrate=17000000) # ΠΠ½ΠΈΡΠΈΠ°Π»ΠΈΠ·Π°ΡΠΈΡ RTSP ΡΠ΅ΡΠ²Π΅ΡΠ° Gst.init(None) server = GstRtspServer.RTSPServer() server.set_service('8554') # ΠΠ°ΡΡΡΠΎΠΉΠΊΠ° Π²ΡΡ
ΠΎΠ΄Π° ΡΠ΅ΡΠ΅Π· GStreamer pipeline_str = ( 'appsrc ! videoconvert ! video/x-raw,format=I420 ! ' 'x264enc bitrate=17000 speed-preset=ultrafast tune=zerolatency ! ' 'rtph264pay name=pay0 pt=96 config-interval=1' ) gst_pipeline = Gst.parse_launch(pipeline_str) # ΠΠ°ΡΡΡΠΎΠΉΠΊΠ° Π²ΡΡ
ΠΎΠ΄Π° Π΄Π»Ρ ΠΊΠ°ΠΌΠ΅ΡΡ ffmpeg_output = FfmpegOutput(gst_pipeline.get_by_name("appsrc")) picam2.start_recording(encoder, ffmpeg_output) # ΠΠ°ΡΡΡΠΎΠΉΠΊΠ° RTSP factory = CameraRtspFactory(gst_pipeline) factory.set_shared(True) mount_points = server.get_mount_points() mount_points.add_factory("/test", factory) server.attach(None) ip = get_local_ip() print(f"Streaming RTSP at rtsp://{ip}:8554/test") loop = GLib.MainLoop() loop.run() if name == "main": main().
2024-10-19
ΠΡΠΊΠ»ΠΈΠΊΠ½ΡΡΡΡΡ