#!/bin/env python
"""Collection of small utilities."""
__version__ = "0.0.1a"
__date__ = "08/20/09"
__author__ = "rbistolfi"
__credits__ = "M0E-lnx, Uelsk8s, Kidd"
import os
import re
import shutil
from vinstall.backend import sp
from vinstall.core import log
from threading import Lock
import subprocess
LOG = log.get_logger("vinstall_utils")
def _run_program(argv, root='/', stdin=None, env_prune=None,
stderr_to_stdout=False, binary_output=False):
"""DO NOT USE THIS METHOD DIRECTLY. Use the run_program methods
instead"""
if env_prune is None:
env_prune = []
def chroot():
if root and root != '/':
os.chroot(root)
program_log_lock = Lock()
with program_log_lock:
LOG.info("Running ... %s", " ".join(argv))
env = os.environ.copy()
env.update({"LC_ALL": "C",})
for var in env_prune:
env.pop(var, None)
if stderr_to_stdout:
stderr_dir = subprocess.STDOUT
else:
stderr_dir = subprocess.PIPE
try:
proc = subprocess.Popen(argv,
stdin=stdin,
stdout=subprocess.PIPE,
stderr=stderr_dir,
close_fds=True,
preexec_fn=chroot,
cwd=root,
env=env)
out, err = proc.communicate()
if not binary_output:
out = out.decode('utf-8')
if out:
if not stderr_to_stdout:
LOG.debug("stdout:")
for line in out.splitlines():
LOG.info("%s", line)
if not stderr_to_stdout and err:
LOG.info("stderr:")
for line in err.splitlines():
LOG.info("%s", line)
except OSError as e:
LOG.error("Error while running %s: %s", argv[0], e.strerror)
raise #XX: We need to define an exception for this!!!!
LOG.debug("Return code: %d", proc.returncode)
return (proc.returncode, out)
[docs]def run_program(*args, **kwargs):
return _run_program(*args, **kwargs)[0]
[docs]def capture_output(*args, **kwargs):
return _run_program(*args, **kwargs)[1]
[docs]def capture_output_binary(*args, **kwargs):
kwargs["binary_output"] = True
return _run_program(*args, **kwargs)[1]
[docs]def run_program_and_capture_output(*args, **kwargs):
return _run_program(*args, **kwargs)
[docs]def run_program_and_capture_output_binary(*args, **kwargs):
kwargs["binary_output"] = True
return _run_program(*args, **kwargs)
[docs]def get_mem_size():
"""Get the amount of RAM availablein the system in mB.
"""
meminfo = "/proc/meminfo"
fo = open(meminfo)
line = fo.readline()
fo.close()
label, size, unit = line.split()
return int(size) / 1024.0
[docs]def activate_swap(path=None):
if path is None:
#Activate all swap space
command = 'swapon -a'
else:
command = 'swapon %s'% path
sp.call(command.split())
[docs]def mount(src, mountpoint, filesystem='auto'):
"""Mount a filesystem.
Arguments:
src: the filesystem or device to be mounted.
filesystem: filesystem type passed to the mount command, for example:
"ext3". Default is auto.
"""
command = 'mount -t %s %s %s' % (filesystem, src, mountpoint)
sp.call(command.split())
return mountpoint
[docs]def mountiso(src, mountpoint):
"""Mount a filesystem.
Arguments:
src: the filesystem or device to be mounted.
extra: extra options to be passed to the mount command, for example:
"-o loop". Default is None.
filesystem: filesystem type passed to the mount command, for example:
"ext3". Default is auto.
"""
command = 'mount -o loop %s %s' % (src, mountpoint)
sp.call(command.split())
return mountpoint
[docs]def bind_mount(src, mountpoint):
"""Mount a filesystem with --bind
"""
command = 'mount --bind %s %s' % (src, mountpoint)
sp.call(command.split())
return mountpoint
[docs]def umount(mounted):
"""Unmount a filesystem.
Arguments:
A mounted filesystem or device.
>>> umount('/mnt/loop')
0
"""
command = 'umount %s' % (mounted)
return sp.call(command.split())
[docs]def exec_chroot(root, command):
"""Change the root dir for current shell and run the given command.
Arguments:
dir: The directory to for chrooting to.
command: The command to be executed under the new root.
>>> exec_chroot('/', 'pwd')
0
"""
command = 'chroot %s %s' % (root, command)
sp.call(command.split())
[docs]def is_mounted(device):
"""Returns true if device is mounted.
"""
for line in open("/proc/mounts"):
blockdev, mountpoint, _ = line.split(" ", 2)
if blockdev == device:
return True
return False
[docs]def get_mounted(mountpoint):
"""Return device for a mountpoint
"""
for line in open("/etc/mtab"):
device, mpoint, _ = line.split(" ", 2)
if mountpoint == mpoint:
return device
[docs]def get_existing_fs(partition_path):
"""Return the FS for a (mounted) mountpoint
"""
#XX: Consider using pyblkid for God's sake!
command = 'blkid %s' %partition_path
data = capture_output(command.split())
# Should give us a line like:
# u'/dev/sda1: UUID="blah-blah-blah-uuid" TYPE="ext4" PARTUUID="0004f5e6-01"\n'
flag = [ sec for sec in data.split(" ") if sec.startswith("TYPE") ][0]
key, val = flag.split("=")
val = val.replace("\"","")
return val.strip()
[docs]def supported_filesystems():
"""Returns a list of filesystems supported by the system."""
fo = open('/proc/filesystems')
data = fo.read()
fo.close()
kernel_fs = re.findall(r'^\s+(\w+)', data, re.M)
path = os.environ['PATH'].split(':')
for p in path:
for k in kernel_fs:
if os.path.isfile(os.path.join(p, 'mkfs.' + k)):
yield k
[docs]def depmod():
"""Run depmod"""
command = "depmod -a"
return sp.call(command.split())
[docs]def mkinitrd(root, fs, disable_bootsplash=False):
"""Create an initrd"""
# included_modules="i810:i915:intel:nouveau:radeon:vmwgfx"
#XX: Do not include any video modules if we dont use a bootsplash
included_modules="ext4"
LOG.debug("Creating initrd for root device %s with fs %s" %(root, fs))
if fs is None:
LOG.debug("WARNING: Attempting to determine filesystem for OS root %s" %root)
fs = get_existing_fs(root)
LOG.debug("WARNING: Filesystem for OS root determined as %s" %fs)
command = ("mkinitrd -c -u -r %s -f %s -m %s"% (root, fs, included_modules))
LOG.debug("Running %s"% command)
sp.call(command.split())
if disable_bootsplash == True:
# Remove the plymouth binary from the intird-tree dir
os.remove("/boot/initrd-tree/bin/plymouth")
# remove the libs that support that binary we just removed
for f in os.listdir("/boot/initrd-tree/lib/"):
if f.startswith("libply"):
os.remove("/boot/initrd-tree/lib/%s"% f)
shutil.rmtree("/boot/initrd-tree/usr/share/plymouth")
shutil.rmtree("/boot/initrd-tree/etc/plymouth")
# Now run the command again without clearning the currrent dir
command = ("mkinitrd -s /boot/initrd-tree -r %s -f %s -m %s"% (root, fs, included_modules))
sp.call(command.split())
[docs]class Chroot(object):
"""Execute block in a chrooted context
>>> with Chroot("/mnt"):
... dostuff()
If chdir kwarg is True, os.chdir to "/" before entering
"""
def __init__(self, new_root, chdir=True):
self.new_root = new_root
self.chdir = chdir
def __enter__(self):
"""Save current root for restoring later
"""
LOG.debug("Entering chroot")
try:
self.real_root = os.open("/", os.O_RDONLY)
self.old_cwd = os.getcwd()
os.chroot(self.new_root)
if self.chdir:
os.chdir("/")
except:
LOG.error("Error occurred while running commands in chroot %s"% self.new_root)
def __exit__(self, *_):
"""Go back to previous root
"""
os.fchdir(self.real_root)
os.chroot(".")
os.close(self.real_root)
os.chdir(self.old_cwd)
LOG.debug("Leaving chroot")