[Closed] Unable to connect to Astra camera with openCV

Hi,

I am trying to get the camera image of the rosbot 2 pro via opencv to use in an machine learning project. But I am not getting an camera image when passing the camera path to open cv. I got the camera path (at least I think it is the correct path) by using a script.

For testing I used this publisher:

# Basic ROS 2 program to publish real-time streaming 
# video from your built-in webcam
# Author:
# - Addison Sears-Collins
# - https://automaticaddison.com
   
# Import the necessary libraries
import rclpy # Python Client Library for ROS 2
from rclpy.node import Node # Handles the creation of nodes
from sensor_msgs.msg import Image # Image is the message type
from cv_bridge import CvBridge # Package to convert between ROS and OpenCV Images
import cv2 # OpenCV library
  
class ImagePublisher(Node):
  """
  Create an ImagePublisher class, which is a subclass of the Node class.
  """
  def __init__(self):
    """
    Class constructor to set up the node
    """
    # Initiate the Node class's constructor and give it a name
    super().__init__('image_publisher')
       
    # Create the publisher. This publisher will publish an Image
    # to the video_frames topic. The queue size is 10 messages.
    self.publisher_ = self.create_publisher(Image, 'video_frames', 10)
       
    # We will publish a message every 0.1 seconds
    timer_period = 0.1  # seconds
       
    # Create the timer
    self.timer = self.create_timer(timer_period, self.timer_callback)
          
    # Create a VideoCapture object
    # The argument '0' gets the default webcam.
    self.cap = cv2.VideoCapture(´"/dev/bus/usb/001/004")
          
    # Used to convert between ROS and OpenCV images
    self.br = CvBridge()
    
  def timer_callback(self):
    """
    Callback function.
    This function gets called every 0.1 seconds.
    """
    # Capture frame-by-frame
    # This method returns True/False as well
    # as the video frame.
    ret, frame = self.cap.read()
           
    if ret == True:
      # Publish the image.
      # The 'cv2_to_imgmsg' method converts an OpenCV
      # image to a ROS 2 image message
      self.publisher_.publish(self.br.cv2_to_imgmsg(frame))
  
    # Display the message on the console
    self.get_logger().info('Publishing video frame')
   
def main(args=None):
   
  # Initialize the rclpy library
  rclpy.init(args=args)
   
  # Create the node
  image_publisher = ImagePublisher()
   
  # Spin the node so the callback function is called.
  rclpy.spin(image_publisher)
   
  # Destroy the node explicitly
  # (optional - otherwise it will be done automatically
  # when the garbage collector destroys the node object)
  image_publisher.destroy_node()
   
  # Shutdown the ROS client library for Python
  rclpy.shutdown()
   
if __name__ == '__main__':
  main()

from Getting Started With OpenCV in ROS 2 Galactic (Python) – Automatic Addison

Bus 001 Device 003: ID 0424:4603 Microchip Technology, Inc. (formerly SMSC)  - /dev/bus/usb/001/003
Bus 001 Device 004: ID 2bc5:0401   - /dev/bus/usb/001/004
Bus 001 Device 006: ID 0424:2530 Microchip Technology, Inc. (formerly SMSC) Lenovo USB Optical Mouse - /dev/bus/usb/001/006
Bus 001 Device 005: ID 148f:7601 Ralink Technology, Corp. MT7601U Wireless Adapter - /dev/bus/usb/001/005
Bus 001 Device 001: ID 1d6b:0002 Linux Foundation 2.0 root hub - /dev/bus/usb/001/001
Bus 001 Device 002: ID 10c4:ea60 Silicon Labs CP210x UART Bridge - /dev/bus/usb/001/002
Bus 001 Device 007: ID 17ef:608d Lenovo Lenovo USB Optical Mouse - /dev/bus/usb/001/007
Bus 001 Device 007: ID 17ef:608d Lenovo Lenovo USB Optical Mouse - /dev/input/mouse0
Bus 001 Device 007: ID 17ef:608d Lenovo Lenovo USB Optical Mouse - /dev/input/event8
Bus 002 Device 001: ID 1d6b:0003 Linux Foundation 3.0 root hub - /dev/bus/usb/002/001

From this I surmised the camera is on /dev/bus/usb/001/004

But openCV does not capture an image. Is the path wrong or am I doing something else wrong?

Kind regards,
Daniel

Hi @danielKleissl!
I will try to reproduce your issue and find out the solution!

Best regards,
JD

Welcome back @danielKleissl !
I reproduced your issue and I’m working on getting image from Astra in opencv.
Currently, I suggest using our Docker container featuring the Astra camera. By doing so, you can create an image subscriber for seamless functionality.

You can use this compose.yaml file:

services:
  astra:
    image: husarion/astra:humble-1.0.2-20230427
    network_mode: host
    ipc: host
    environment:
      - RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
    devices:
      - /dev/bus/usb/
    command: ros2 launch astra_camera astra_mini.launch.py

and run Astra container:

docker compose up

From the same tutorial you can use subscriber with changed QoS policy and topic name.

Here is example:

# Basic ROS 2 program to subscribe to real-time streaming
# video from your built-in webcam
# Author:
# - Addison Sears-Collins
# - https://automaticaddison.com

# Import the necessary libraries
import rclpy # Python library for ROS 2
from rclpy.node import Node # Handles the creation of nodes
from sensor_msgs.msg import Image # Image is the message type
from cv_bridge import CvBridge # Package to convert between ROS and OpenCV Images
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
import cv2 # OpenCV library

class ImageSubscriber(Node):
  """
  Create an ImageSubscriber class, which is a subclass of the Node class.
  """
  def __init__(self):
    """
    Class constructor to set up the node
    """
    # Initiate the Node class's constructor and give it a name
    super().__init__('image_subscriber')

    # Create the subscriber. This subscriber will receive an Image
    # from the /camera/color/image_raw topic.
    # Astra changes the policy of publisher.
    qos_profile = QoSProfile(
        reliability=QoSReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT,
        history=QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_LAST,
        depth=1
    )

    self.subscription = self.create_subscription(
      Image,
      '/camera/color/image_raw',
      self.listener_callback,
      qos_profile=qos_profile)
    self.subscription # prevent unused variable warning

    # Used to convert between ROS and OpenCV images
    self.br = CvBridge()

  def listener_callback(self, data):
    """
    Callback function.
    """
    # Display the message on the console
    self.get_logger().info('Receiving video frame')

    # Convert ROS Image message to OpenCV image
    current_frame = self.br.imgmsg_to_cv2(data)

    # Display image
    cv2.imshow("camera", current_frame)

    cv2.waitKey(1)

def main(args=None):

  # Initialize the rclpy library
  rclpy.init(args=args)

  # Create the node
  image_subscriber = ImageSubscriber()

  # Spin the node so the callback function is called.
  rclpy.spin(image_subscriber)

  # Destroy the node explicitly
  # (optional - otherwise it will be done automatically
  # when the garbage collector destroys the node object)
  image_subscriber.destroy_node()

  # Shutdown the ROS client library for Python
  rclpy.shutdown()

if __name__ == '__main__':
  main()

Please inform me if it works for you. I will continue working on obtaining the Astra image directly from OpenCV.
Best regards,
JD

Thank you @JakubDelicat

I am currently writing my bachelor’s thesis, so I am unsure when I will be able to test that.

It might take a few weeks until I can play around with the RosBot again. I will get back to you after I try your suggestion. Still, at first glance, it seems sensible and something I also thought of: starting the cam with the docker container and modifying my object detector to subscribe to that topic instead of directly ingesting it from the cam. So at least as a workaround to get a proof of concept going, this should work.

Regards,
Daniel

I will stay in touch and best of luck with your bachelor’s thesis as well. Feel free to reach out if you need any further assistance.
JD

1 Like

Hi @danielKleissl!
If you are still working on your project look at our new tutorial with ROSbots and OpenCV:

Best regards,
JD

1 Like