鱼香ROS社区
    • 版块
    • 最新
    • 未解决
    • 已解决
    • 群组
    • 注册
    • 登录
    紧急通知:禁止一切关于政治&VPN翻墙等话题,发现相关帖子会立马删除封号
    提问前必看的发帖注意事项: 社区问答规则(小鱼个人)更新 | 高质量帖子发布指南

    ROS2GO初始化代码

    已定时 已固定 已锁定 已移动
    FishROS2GO
    fishros ros2go
    1
    1
    1.0k
    正在加载更多帖子
    • 从旧到新
    • 从新到旧
    • 最多赞同
    回复
    • 在新帖中回复
    登录后回复
    此主题已被删除。只有拥有主题管理权限的用户可以查看。
    • 小鱼小
      小鱼 技术大佬
      最后由 编辑

      绕了一大圈

      import parted
      import pexpect
      import time
      
      class RootShellInteractor:
          def __init__(self):
              self.shell_process = None
              self.root_prompt = '#'
      
          def connect(self):
              # 启动一个交互式Shell进程,使用 sudo 运行命令
              self.shell_process = pexpect.spawn('/bin/bash')
              self.shell_process.expect_exact(self.root_prompt)
      
          def execute_command(self, command,prompt='#'):
              print("send command",command)
              # 发送命令并等待输出
              self.shell_process.sendline(command)
              if len(prompt):
                  self.shell_process.expect_exact(prompt)
      
              # 获取命令输出
              time.sleep(0.2)
              output = self.shell_process.before.decode('utf-8')
              print(f"========\ncommand:{command}\nc+result:{output.strip()}")
              return output
      
          def disconnect(self):
              # 关闭交互式Shell进程
              self.shell_process.close()
      
      
      
      def update_disk_size(disk_name='/dev/sda',id=2,size='60GiB'):
          interactor = RootShellInteractor()
      
          try:
              interactor.connect()
      
              # 调用 execute_command 方法执行需要的命令
              command_output = interactor.execute_command(f"parted {disk_name} resizepart {id} {size}","No?")
              command_output = interactor.execute_command("yes","")
              command_output = interactor.execute_command(f"resize2fs {disk_name}{id}","#")
              command_output = interactor.execute_command(f"fdisk -l","#")
              # print("Command Output:\n", command_output)
      
          except Exception as e:
              print("Error:", e)
      
          finally:
              interactor.disconnect()
      
              
      def get_root_disk():
          with open('/proc/mounts', 'r') as f:
              mounts = f.readlines()
      
          for mount in mounts:
              fields = mount.strip().split()
              if len(fields) >= 2:
                  mount_point = fields[1]
                  if mount_point == '/':
                      device_path = fields[0]
                      return device_path
      
          return None
      
      def get_disk_info(device_path):
          try:
              disk = parted.getDevice(device_path)
              model = disk.model
              size = disk.getLength(unit='B')
              return model, size
          except Exception as e:
              print(f"Error getting disk info: {e}")
              return None, None
      
      def main():
          root_disk = get_root_disk()
      
          if root_disk:
              print(f"The root filesystem is mounted on: {root_disk}")
              model, size = get_disk_info(root_disk[:-1])  # Remove the partition number
              disk = root_disk[:-1]
              disk_id = root_disk[-1]
              size_gib = f"{int(size/1024/1024/1024)}GiB"
              if model and size:
                  print(f"Root filesystem is on disk: {model}-{root_disk}-{disk_id}")
                  print(f"Disk size: {size_gib}")
                  update_disk_size(disk,disk_id,size_gib)
              else:
                  print("Failed to retrieve disk information.")
          else:
              print("Failed to determine the root filesystem.")
      
      if __name__ == "__main__":
          main()
      
      

      新书配套视频:https://www.bilibili.com/video/BV1GW42197Ck/

      1 条回复 最后回复 回复 引用 0
      • 小伊小 小伊 在 中固定了该主题
      • 第一个帖子
        最后一个帖子
      皖ICP备16016415号-7
      Powered by NodeBB | 鱼香ROS