Эволюция Docker. Часть 2.2

Kate

Administrator
Команда форума

Вступление​

Данная статья является третьей в цикле (1,2), посвященном изучению исходного кода Docker и прямым продолжением предыдущей статьи, в которой мы начали разбирать код первого публичного релиза Docker v0.1.0. В этой части будет рассмотрена реализация практически всех команд, а в конце, мы создадим образ и запустим докер контейнер на его основе. Для удобства я постарался разбить список команд на условные группы: работа с образами, работа с контейнерами, сетевой стек и т.д.

А теперь, как говорится, “without further ado”, приступим к изучению кода из файла commands.go начиная с команд для управления образами (images).

Управление образами​

Import​

Команда import позволяет импортировать образ файловой системы из tar архива, подаваемого на stdin, или же загрузить его по url:

CmdImport
func (srv *Server) CmdImport(stdin io.ReadCloser, stdout io.Writer, args ...string) error {
cmd := rcli.Subcmd(stdout, "import", "[OPTIONS] URL|- [REPOSITORY [TAG]]", "Create a new filesystem image from the contents of a tarball")
var archive io.Reader
var resp *http.Response

if err := cmd.Parse(args); err != nil {
return nil
}
src := cmd.Arg(0)
if src == "" {
return errors.New("Not enough arguments")
} else if src == "-" {
archive = stdin
} else {
u, err := url.Parse(src)
if err != nil {
return err
}
if u.Scheme == "" {
u.Scheme = "http"
u.Host = src
u.Path = ""
}
fmt.Fprintf(stdout, "Downloading from %s\n", u.String())
// Download with curl (pretty progress bar)
// If curl is not available, fallback to http.Get()
resp, err = Download(u.String(), stdout)
if err != nil {
return err
}
archive = ProgressReader(resp.Body, int(resp.ContentLength), stdout)
}
img, err := srv.runtime.graph.Create(archive, nil, "Imported from "+src)
if err != nil {
return err
}
// Optionally register the image at REPO/TAG
if repository := cmd.Arg(1); repository != "" {
tag := cmd.Arg(2) // Repository will handle an empty tag properly
if err := srv.runtime.repositories.Set(repository, tag, img.Id, true); err != nil {
return err
}
}
fmt.Fprintln(stdout, img.Id)
return nil
}
После стандартного разбора аргументов функция определяет место откуда нужно импортировать образ: "-" означает stdin, в другом случае аргумент рассматривается, как url. Хелпер методы Download и ProgressReader для загрузки архива по http находятся в файле utils.go:

utils.go
// Request a given URL and return an io.Reader
func Download(url string, stderr io.Writer) (*http.Response, error) {
var resp *http.Response
var err error = nil
if resp, err = http.Get(url); err != nil {
return nil, err
}
if resp.StatusCode >= 400 {
return nil, errors.New("Got HTTP status code >= 400: " + resp.Status)
}
return resp, nil
}

type progressReader struct {
reader io.ReadCloser // Stream to read from
output io.Writer // Where to send progress bar to
read_total int // Expected stream length (bytes)
read_progress int // How much has been read so far (bytes)
last_update int // How many bytes read at least update
}

func (r *progressReader) Read(p []byte) (n int, err error) {
read, err := io.ReadCloser(r.reader).Read(p)
r.read_progress += read

// Only update progress for every 1% read
update_every := int(0.01 * float64(r.read_total))
if r.read_progress-r.last_update > update_every || r.read_progress == r.read_total {
fmt.Fprintf(r.output, "%d/%d (%.0f%%)\r",
r.read_progress,
r.read_total,
float64(r.read_progress)/float64(r.read_total)*100)
r.last_update = r.read_progress
}
// Send newline when complete
if err == io.EOF {
fmt.Fprintf(r.output, "\n")
}

return read, err
}
func (r *progressReader) Close() error {
return io.ReadCloser(r.reader).Close()
}
func ProgressReader(r io.ReadCloser, size int, output io.Writer) *progressReader {
return &progressReader{r, output, size, 0, 0}
}

Далее управление переходит в функцию graph.Create из файла graph.go:

graph.Create
func (graph *Graph) Create(layerData Archive, container *Container, comment string) (*Image, error) {
img := &Image{
Id: GenerateId(),
Comment: comment,
Created: time.Now(),
}
if container != nil {
img.Parent = container.Image
img.Container = container.Id
img.ContainerConfig = *container.Config
}
if err := graph.Register(layerData, img); err != nil {
return nil, err
}
return img, nil
}
Здесь генерируется уникальный идентификатор образа и инициализируется структура Image, которая далее вместе с данными архива передается в метод graph.Register. Если дополнительно передан и контейнер, то ссылка на его образ будет сохранена в поле img.Parent - это используется в команде Commit, создающей новый образ из текущего контейнера. Структура Image и функции для генерации Id на основе SHA256 находятся в файле image.go:

image.go
type Image struct {
Id string `json:"id"`
Parent string `json:"parent,omitempty"`
Comment string `json:"comment,omitempty"`
Created time.Time `json:"created"`
Container string `json:"container,omitempty"`
ContainerConfig Config `json:"container_config,omitempty"`
graph *Graph
}

func GenerateId() string {
// FIXME: don't seed every time
rand.Seed(time.Now().UTC().UnixNano())
randomBytes := bytes.NewBuffer([]byte(fmt.Sprintf("%x", rand.Int())))
id, _ := ComputeId(randomBytes) // can't fail
return id
}

// ComputeId reads from `content` until EOF, then returns a SHA of what it read, as a string.
func ComputeId(content io.Reader) (string, error) {
h := sha256.New()
if _, err := io.Copy(h, content); err != nil {
return "", err
}
return fmt.Sprintf("%x", h.Sum(nil)[:8]), nil
}

Далее взглянем на метод graph.Register:

graph.Register
func (graph *Graph) Register(layerData Archive, img *Image) error {
if err := ValidateId(img.Id); err != nil {
return err
}
// (This is a convenience to save time. Race conditions are taken care of by os.Rename)
if graph.Exists(img.Id) {
return fmt.Errorf("Image %s already exists", img.Id)
}
tmp, err := graph.Mktemp(img.Id)
defer os.RemoveAll(tmp)
if err != nil {
return fmt.Errorf("Mktemp failed: %s", err)
}
if err := StoreImage(img, layerData, tmp); err != nil {
return err
}
// Commit
if err := os.Rename(tmp, graph.imageRoot(img.Id)); err != nil {
return err
}
img.graph = graph
return nil
}
После валидации id на наличие запрещенного символа ":" (так как он является разделителем для тега), создается временная папка для модификаций, а затем вызывается функция StoreImage, в которой происходит создание образа. По завершению временная папка переименовывается в img.Id:

StoreImage
func StoreImage(img *Image, layerData Archive, root string) error {
// Check that root doesn't already exist
if _, err := os.Stat(root); err == nil {
return fmt.Errorf("Image %s already exists", img.Id)
} else if !os.IsNotExist(err) {
return err
}
// Store the layer
layer := layerPath(root)
if err := os.MkdirAll(layer, 0700); err != nil {
return err
}
if err := Untar(layerData, layer); err != nil {
return err
}
// Store the json ball
jsonData, err := json.Marshal(img)
if err != nil {
return err
}
if err := ioutil.WriteFile(jsonPath(root), jsonData, 0600); err != nil {
return err
}
return nil
}

func layerPath(root string) string {
return path.Join(root, "layer")
}

func jsonPath(root string) string {
return path.Join(root, "json")
}
В StoreImage создается директория layer, в которую помещается распакованный при помощи функции Untar архив файловой системы, после чего структура Image экспортируется в json и сохраняется в соседний файл, как метаданные для образа. Функции Tar и Untar для работы с архивами находятся в файле archive.go и представляют собой лишь удобные обертки над утилитой bsdtar:

archive.go
type Archive io.Reader

type Compression uint32

const (
Uncompressed Compression = iota
Bzip2
Gzip
)

func (compression *Compression) Flag() string {
switch *compression {
case Bzip2:
return "j"
case Gzip:
return "z"
}
return ""
}

func Tar(path string, compression Compression) (io.Reader, error) {
cmd := exec.Command("bsdtar", "-f", "-", "-C", path, "-c"+compression.Flag(), ".")
return CmdStream(cmd)
}

func Untar(archive io.Reader, path string) error {
cmd := exec.Command("bsdtar", "-f", "-", "-C", path, "-x")
cmd.Stdin = archive
output, err := cmd.CombinedOutput()
if err != nil {
return errors.New(err.Error() + ": " + string(output))
}
return nil
}

func CmdStream(cmd *exec.Cmd) (io.Reader, error) {
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
stderr, err := cmd.StderrPipe()
if err != nil {
return nil, err
}
pipeR, pipeW := io.Pipe()
go func() {
_, err := io.Copy(pipeW, stdout)
if err != nil {
pipeW.CloseWithError(err)
}
errText, e := ioutil.ReadAll(stderr)
if e != nil {
errText = []byte("(...couldn't fetch stderr: " + e.Error() + ")")
}
if err := cmd.Wait(); err != nil {
// FIXME: can this block if stderr outputs more than the size of StderrPipe()'s buffer?
pipeW.CloseWithError(errors.New(err.Error() + ": " + string(errText)))
} else {
pipeW.Close()
}
}()
if err := cmd.Start(); err != nil {
return nil, err
}
return pipeR, nil
}
Отметим, что функция import также может сохранять tag образа, принимая его опциональным параметром, но этот функционал мы рассмотрим позже, когда до него дойдет очередь.

Export​

Команда export возвращает экспортированный архив файловой системы контейнера:

CmdExport
func (srv *Server) CmdExport(stdin io.ReadCloser, stdout io.Writer, args ...string) error {
cmd := rcli.Subcmd(stdout,
"export", "CONTAINER",
"Export the contents of a filesystem as a tar archive")
if err := cmd.Parse(args); err != nil {
return nil
}
name := cmd.Arg(0)
if container := srv.runtime.Get(name); container != nil {
data, err := container.Export()
if err != nil {
return err
}
// Stream the entire contents of the container (basically a volatile snapshot)
if _, err := io.Copy(stdout, data); err != nil {
return err
}
return nil
}
return errors.New("No such container: " + name)
}

func (container *Container) Export() (Archive, error) {
if err := container.EnsureMounted(); err != nil {
return nil, err
}
return Tar(container.RootfsPath(), Uncompressed)
}

Функция по переданному имени получает контейнер и вызывает у него метод container.Export, который в свою очередь просто возвращает созданный архив, смонтированной директории Rootfs. Код функции Tar был приведен выше в файле archive.go.

Rmi​

Удаляет переданный список образов, вызывая метод graph.Delete:

graph.Delete
func (srv *Server) CmdRmi(stdin io.ReadCloser, stdout io.Writer, args ...string) (err error) {
cmd := rcli.Subcmd(stdout, "rmimage", "[OPTIONS] IMAGE", "Remove an image")
if cmd.Parse(args) != nil || cmd.NArg() < 1 {
cmd.Usage()
return nil
}
for _, name := range cmd.Args() {
if err := srv.runtime.graph.Delete(name); err != nil {
return err
}
}
return nil
}

func (graph *Graph) Delete(id string) error {
garbage, err := graph.Garbage()
if err != nil {
return err
}
return os.Rename(graph.imageRoot(id), garbage.imageRoot(id))
}

func (graph *Graph) Garbage() (*Graph, error) {
return NewGraph(path.Join(graph.Root, ":garbage:"))
}

В реальности, graph.Delete перемещает их в папку :garbage:, для возможности последующего восстановления, но данная функция здесь не используется.

Images​

Возвращает таблицу со списком имеющихся образов:

CmdImages
func (srv *Server) CmdImages(stdin io.ReadCloser, stdout io.Writer, args ...string) error {
cmd := rcli.Subcmd(stdout, "images", "[OPTIONS] [NAME]", "List images")
//limit := cmd.Int("l", 0, "Only show the N most recent versions of each image")
quiet := cmd.Bool("q", false, "only show numeric IDs")
fl_a := cmd.Bool("a", false, "show all images")
if err := cmd.Parse(args); err != nil {
return nil
}
if cmd.NArg() > 1 {
cmd.Usage()
return nil
}
var nameFilter string
if cmd.NArg() == 1 {
nameFilter = cmd.Arg(0)
}
w := tabwriter.NewWriter(stdout, 20, 1, 3, ' ', 0)
if !*quiet {
fmt.Fprintf(w, "REPOSITORY\tTAG\tID\tCREATED\tPARENT\n")
}
var allImages map[string]*Image
var err error
if *fl_a {
allImages, err = srv.runtime.graph.Map()
} else {
allImages, err = srv.runtime.graph.Heads()
}
if err != nil {
return err
}
for name, repository := range srv.runtime.repositories.Repositories {
if nameFilter != "" && name != nameFilter {
continue
}
for tag, id := range repository {
image, err := srv.runtime.graph.Get(id)
if err != nil {
log.Printf("Warning: couldn't load %s from %s/%s: %s", id, name, tag, err)
continue
}
delete(allImages, id)
if !*quiet {
for idx, field := range []string{
/* REPOSITORY */ name,
/* TAG */ tag,
/* ID */ id,
/* CREATED */ HumanDuration(time.Now().Sub(image.Created)) + " ago",
/* PARENT */ srv.runtime.repositories.ImageName(image.Parent),
} {
if idx == 0 {
w.Write([]byte(field))
} else {
w.Write([]byte("\t" + field))
}
}
w.Write([]byte{'\n'})
} else {
stdout.Write([]byte(image.Id + "\n"))
}
}
}
// Display images which aren't part of a
if nameFilter == "" {
for id, image := range allImages {
if !*quiet {
for idx, field := range []string{
/* REPOSITORY */ "",
/* TAG */ "",
/* ID */ id,
/* CREATED */ HumanDuration(time.Now().Sub(image.Created)) + " ago",
/* PARENT */ srv.runtime.repositories.ImageName(image.Parent),
} {
if idx == 0 {
w.Write([]byte(field))
} else {
w.Write([]byte("\t" + field))
}
}
w.Write([]byte{'\n'})
} else {
stdout.Write([]byte(image.Id + "\n"))
}
}
}
if !*quiet {
w.Flush()
}
return nil
}
В функции происходит простая итерация по полученным образам, фильтрация на основе имени репозитория и вывод полей в консольную таблицу. Map, Head и вспомогательные к ним функции, формирующие хеш таблицы allImages, находятся в файле graph.go:

graph.go
func (graph *Graph) Map() (map[string]*Image, error) {
// FIXME: this should replace All()
all, err := graph.All()
if err != nil {
return nil, err
}
images := make(map[string]*Image, len(all))
for _, image := range all {
images[image.Id] = image
}
return images, nil
}

func (graph *Graph) All() ([]*Image, error) {
var images []*Image
err := graph.WalkAll(func(image *Image) {
images = append(images, image)
})
return images, err
}

func (graph *Graph) WalkAll(handler func(*Image)) error {
files, err := ioutil.ReadDir(graph.Root)
if err != nil {
return err
}
for _, st := range files {
if img, err := graph.Get(st.Name()); err != nil {
// Skip image
continue
} else if handler != nil {
handler(img)
}
}
return nil
}

func (graph *Graph) ByParent() (map[string][]*Image, error) {
byParent := make(map[string][]*Image)
err := graph.WalkAll(func(image *Image) {
image, err := graph.Get(image.Parent)
if err != nil {
return
}
if children, exists := byParent[image.Parent]; exists {
byParent[image.Parent] = []*Image{image}
} else {
byParent[image.Parent] = append(children, image)
}
})
return byParent, err
}

func (graph *Graph) Heads() (map[string]*Image, error) {
heads := make(map[string]*Image)
byParent, err := graph.ByParent()
if err != nil {
return nil, err
}
err = graph.WalkAll(func(image *Image) {
// If it's not in the byParent lookup table, then
// it's not a parent -> so it's a head!
if _, exists := byParent[image.Id]; !exists {
heads[image.Id] = image
}
})
return heads, err
}

History​

Отображает историю образа:

CmdHistory
func (srv *Server) CmdHistory(stdin io.ReadCloser, stdout io.Writer, args ...string) error {
cmd := rcli.Subcmd(stdout, "history", "[OPTIONS] IMAGE", "Show the history of an image")
if cmd.Parse(args) != nil || cmd.NArg() != 1 {
cmd.Usage()
return nil
}
image, err := srv.runtime.repositories.LookupImage(cmd.Arg(0))
if err != nil {
return err
}
w := tabwriter.NewWriter(stdout, 20, 1, 3, ' ', 0)
defer w.Flush()
fmt.Fprintf(w, "ID\tCREATED\tCREATED BY\n")
return image.WalkHistory(func(img *Image) error {
fmt.Fprintf(w, "%s\t%s\t%s\n",
srv.runtime.repositories.ImageName(img.Id),
HumanDuration(time.Now().Sub(img.Created))+" ago",
strings.Join(img.ContainerConfig.Cmd, " "),
)
return nil
})
}

func (img *Image) WalkHistory(handler func(*Image) error) (err error) {
currentImg := img
for currentImg != nil {
if handler != nil {
if err := handler(currentImg); err != nil {
return err
}
}
currentImg, err = currentImg.GetParent()
if err != nil {
return fmt.Errorf("Error while getting parent image: %v", err)
}
}
return nil
}

func (img *Image) GetParent() (*Image, error) {
if img.Parent == "" {
return nil, nil
}
if img.graph == nil {
return nil, fmt.Errorf("Can't lookup parent of unregistered image")
}
return img.graph.Get(img.Parent)
}
После получения структуры образа по переданному имени вызывается метод image.WalkHistory, который по цепочке обходит родительские образы, используя сохраненные ссылки Image.Parent, и выводит информацию в виде таблицы.

Commit​

Создает новый образ на основе измененных данных файловой системы контейнера:

CmdCommit
func (srv *Server) CmdCommit(stdin io.ReadCloser, stdout io.Writer, args ...string) error {
cmd := rcli.Subcmd(stdout,
"commit", "[OPTIONS] CONTAINER [REPOSITORY [TAG]]",
"Create a new image from a container's changes")
if err := cmd.Parse(args); err != nil {
return nil
}
containerName, repository, tag := cmd.Arg(0), cmd.Arg(1), cmd.Arg(2)
if containerName == "" {
cmd.Usage()
return nil
}
img, err := srv.runtime.Commit(containerName, repository, tag)
if err != nil {
return err
}
fmt.Fprintln(stdout, img.Id)
return nil
}

// Commit creates a new filesystem image from the current state of a container.
// The image can optionally be tagged into a repository
func (runtime *Runtime) Commit(id, repository, tag string) (*Image, error) {
container := runtime.Get(id)
if container == nil {
return nil, fmt.Errorf("No such container: %s", id)
}
// FIXME: freeze the container before copying it to avoid data corruption?
// FIXME: this shouldn't be in commands.
rwTar, err := container.ExportRw()
if err != nil {
return nil, err
}
// Create a new image from the container's base layers + a new layer from container changes
img, err := runtime.graph.Create(rwTar, container, "")
if err != nil {
return nil, err
}
// Register the image if needed
if repository != "" {
if err := runtime.repositories.Set(repository, tag, img.Id, true); err != nil {
return img, err
}
}
return img, nil
}
Метод Commit получает структуру container по переданному имени, вызывает метод container.ExportRw, который возвращает архив с директорией rw, после чего передает его в метод graph.Create, который мы уже разбирали выше. Если передано имя репозитория и tag, то дополнительно будет создан tag образа. Этот функционал будет разобран ниже в команде Tag.

Tag​

Функция создает tag образа в локальном репозитории. Используется при импортировании образа и коммите контейнера:

CmdTag
func (srv *Server) CmdTag(stdin io.ReadCloser, stdout io.Writer, args ...string) error {
cmd := rcli.Subcmd(stdout, "tag", "[OPTIONS] IMAGE REPOSITORY [TAG]", "Tag an image into a repository")
force := cmd.Bool("f", false, "Force")
if err := cmd.Parse(args); err != nil {
return nil
}
if cmd.NArg() < 2 {
cmd.Usage()
return nil
}
return srv.runtime.repositories.Set(cmd.Arg(1), cmd.Arg(2), cmd.Arg(0), *force)
}
После обработки параметров вызывается метод repositories.Set из файла tags.go. Ниже я приведу содержание файла tags.go, который отвечает за весь этот функционал. Общий принцип работы довольно простой - структура TagStore имеет хеш таблицу(map) для соответствия тегов и образов. При добавлении нового тега он проходит валидацию на запрещенные символы и записывается в хеш таблицу. Далее структура TagStore экспортируется в json и сохраняется в файле на диске. При запуске докер структура загружается из этого файла и в дальнейшем на ее основе осуществляется поиск и фильтрация образов по именам и тегам:

tags.go
const DEFAULT_TAG = "latest"

type TagStore struct {
path string
graph *Graph
Repositories map[string]Repository
}

type Repository map[string]string

func NewTagStore(path string, graph *Graph) (*TagStore, error) {
abspath, err := filepath.Abs(path)
if err != nil {
return nil, err
}
store := &TagStore{
path: abspath,
graph: graph,
Repositories: make(map[string]Repository),
}
// Load the json file if it exists, otherwise create it.
if err := store.Reload(); os.IsNotExist(err) {
if err := store.Save(); err != nil {
return nil, err
}
} else if err != nil {
return nil, err
}
return store, nil
}

func (store *TagStore) Save() error {
// Store the json ball
jsonData, err := json.Marshal(store)
if err != nil {
return err
}
if err := ioutil.WriteFile(store.path, jsonData, 0600); err != nil {
return err
}
return nil
}

func (store *TagStore) Reload() error {
jsonData, err := ioutil.ReadFile(store.path)
if err != nil {
return err
}
if err := json.Unmarshal(jsonData, store); err != nil {
return err
}
return nil
}

func (store *TagStore) LookupImage(name string) (*Image, error) {
img, err := store.graph.Get(name)
if err != nil {
// FIXME: standardize on returning nil when the image doesn't exist, and err for everything else
// (so we can pass all errors here)
repoAndTag := strings.SplitN(name, ":", 2)
if len(repoAndTag) == 1 {
repoAndTag = append(repoAndTag, DEFAULT_TAG)
}
if i, err := store.GetImage(repoAndTag[0], repoAndTag[1]); err != nil {
return nil, err
} else if i == nil {
return nil, fmt.Errorf("No such image: %s", name)
} else {
img = i
}
}
return img, nil
}

// Return a reverse-lookup table of all the names which refer to each image
// Eg. {"43b5f19b10584": {"base:latest", "base:v1"}}
func (store *TagStore) ById() map[string][]string {
byId := make(map[string][]string)
for repoName, repository := range store.Repositories {
for tag, id := range repository {
name := repoName + ":" + tag
if _, exists := byId[id]; !exists {
byId[id] = []string{name}
} else {
byId[id] = append(byId[id], name)
}
}
}
return byId
}

func (store *TagStore) ImageName(id string) string {
if names, exists := store.ById()[id]; exists && len(names) > 0 {
return names[0]
}
return id
}

func (store *TagStore) Set(repoName, tag, imageName string, force bool) error {
img, err := store.LookupImage(imageName)
if err != nil {
return err
}
if tag == "" {
tag = DEFAULT_TAG
}
if err := validateRepoName(repoName); err != nil {
return err
}
if err := validateTagName(tag); err != nil {
return err
}
if err := store.Reload(); err != nil {
return err
}
var repo Repository
if r, exists := store.Repositories[repoName]; exists {
repo = r
} else {
repo = make(map[string]string)
if old, exists := store.Repositories[repoName]; exists && !force {
return fmt.Errorf("Tag %s:%s is already set to %s", repoName, tag, old)
}
store.Repositories[repoName] = repo
}
repo[tag] = img.Id
return store.Save()
}

func (store *TagStore) Get(repoName string) (Repository, error) {
if err := store.Reload(); err != nil {
return nil, err
}
if r, exists := store.Repositories[repoName]; exists {
return r, nil
}
return nil, nil
}

func (store *TagStore) GetImage(repoName, tag string) (*Image, error) {
repo, err := store.Get(repoName)
if err != nil {
return nil, err
} else if repo == nil {
return nil, nil
}
if revision, exists := repo[tag]; exists {
return store.graph.Get(revision)
}
return nil, nil
}

// Validate the name of a repository
func validateRepoName(name string) error {
if name == "" {
return fmt.Errorf("Repository name can't be empty")
}
if strings.Contains(name, ":") {
return fmt.Errorf("Illegal repository name: %s", name)
}
return nil
}

// Validate the name of a tag
func validateTagName(name string) error {
if name == "" {
return fmt.Errorf("Tag name can't be empty")
}
if strings.Contains(name, "/") || strings.Contains(name, ":") {
return fmt.Errorf("Illegal tag name: %s", name)
}
return nil
}

Управление контейнерами​

Run​

Создает и запускает контейнер на основе заданного образа:

CmdRun
func (srv *Server) CmdRun(stdin io.ReadCloser, stdout io.Writer, args ...string) error {
config, err := ParseRun(args)
if err != nil {
return err
}
if config.Image == "" {
return fmt.Errorf("Image not specified")
}
if len(config.Cmd) == 0 {
return fmt.Errorf("Command not specified")
}
// Create new container
container, err := srv.runtime.Create(config)
if err != nil {
return errors.New("Error creating container: " + err.Error())
}
if config.OpenStdin {
cmd_stdin, err := container.StdinPipe()
if err != nil {
return err
}
if !config.Detach {
Go(func() error {
_, err := io.Copy(cmd_stdin, stdin)
cmd_stdin.Close()
return err
})
}
}
// Run the container
if !config.Detach {
cmd_stderr, err := container.StderrPipe()
if err != nil {
return err
}
cmd_stdout, err := container.StdoutPipe()
if err != nil {
return err
}
if err := container.Start(); err != nil {
return err
}
sending_stdout := Go(func() error {
_, err := io.Copy(stdout, cmd_stdout)
return err
})
sending_stderr := Go(func() error {
_, err := io.Copy(stdout, cmd_stderr)
return err
})
err_sending_stdout := <-sending_stdout
err_sending_stderr := <-sending_stderr
if err_sending_stdout != nil {
return err_sending_stdout
}
if err_sending_stderr != nil {
return err_sending_stderr
}
container.Wait()
} else {
if err := container.Start(); err != nil {
return err
}
fmt.Fprintln(stdout, container.Id)
}
return nil
}
В начале функция ParseRun производит разбор параметров и инициализацию структуры Config:

ParseRun
func ParseRun(args []string) (*Config, error) {
cmd := flag.NewFlagSet("", flag.ContinueOnError)
cmd.SetOutput(ioutil.Discard)
fl_user := cmd.String("u", "", "Username or UID")
fl_detach := cmd.Bool("d", false, "Detached mode: leave the container running in the background")
fl_stdin := cmd.Bool("i", false, "Keep stdin open even if not attached")
fl_tty := cmd.Bool("t", false, "Allocate a pseudo-tty")
fl_memory := cmd.Int64("m", 0, "Memory limit (in bytes)")
var fl_ports ports

cmd.Var(&fl_ports, "p", "Map a network port to the container")
var fl_env ListOpts
cmd.Var(&fl_env, "e", "Set environment variables")
if err := cmd.Parse(args); err != nil {
return nil, err
}
config := &Config{
Ports: fl_ports,
User: *fl_user,
Tty: *fl_tty,
OpenStdin: *fl_stdin,
Memory: *fl_memory,
Detach: *fl_detach,
Env: fl_env,
Cmd: cmd.Args()[1:],
Image: cmd.Arg(0),
}
return config, nil
}
На основании Config функция runtime.Create создает и возвращает новый контейнер, после чего при помощи пайпов и каналов перенаправляет потоки stdin, stdout, stderr, а затем запускает созданный контейнер методом container.Start.

runtime.Create
func (runtime *Runtime) Create(config *Config) (*Container, error) {
// Lookup image
img, err := runtime.repositories.LookupImage(config.Image)
if err != nil {
return nil, err
}
container := &Container{
// FIXME: we should generate the ID here instead of receiving it as an argument
Id: GenerateId(),
Created: time.Now(),
Path: config.Cmd[0],
Args: config.Cmd[1:], //FIXME: de-duplicate from config
Config: config,
Image: img.Id, // Always use the resolved image id
NetworkSettings: &NetworkSettings{},
// FIXME: do we need to store this in the container?
SysInitPath: sysInitPath,
}
container.root = runtime.containerRoot(container.Id)
// Step 1: create the container directory.
// This doubles as a barrier to avoid race conditions.
if err := os.Mkdir(container.root, 0700); err != nil {
return nil, err
}
// Step 2: save the container json
if err := container.ToDisk(); err != nil {
return nil, err
}
// Step 3: register the container
if err := runtime.Register(container); err != nil {
return nil, err
}
return container, nil
}
Здесь происходит инициализация структуры Container, создание рабочей директории контейнера и экспорт структуры в json формате. Завершает это вызов метода runtime.Register, код которого мы разбирали в прошлой статье. Функция GenerateId была рассмотрена ранее в разделе по команде import. Теперь перейдем к методу container.Start:

container.Start
func (container *Container) Start() error {
if err := container.EnsureMounted(); err != nil {
return err
}
if err := container.allocateNetwork(); err != nil {
return err
}
if err := container.generateLXCConfig(); err != nil {
return err
}
params := []string{
"-n", container.Id,
"-f", container.lxcConfigPath(),
"--",
"/sbin/init",
}

// Networking
params = append(params, "-g", container.network.Gateway.String())

// User
if container.Config.User != "" {
params = append(params, "-u", container.Config.User)
}

// Program
params = append(params, "--", container.Path)
params = append(params, container.Args...)

container.cmd = exec.Command("/usr/bin/lxc-start", params...)

// Setup environment
container.cmd.Env = append(
[]string{
"HOME=/",
"PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
},
container.Config.Env...,
)

var err error
if container.Config.Tty {
err = container.startPty()
} else {
err = container.start()
}
if err != nil {
return err
}
// FIXME: save state on disk *first*, then converge
// this way disk state is used as a journal, eg. we can restore after crash etc.
container.State.setRunning(container.cmd.Process.Pid)
container.ToDisk()
go container.monitor()
return nil
}
Основная логика его работы практически не изменилась со времен первого коммита, который мы разбирали в первой статье. Изменения коснулись части отвечающей за монтирование файловой системы, а также был добавлен код для сетевого стека и новый метод запуска процесса в контейнере. Разберем все по порядку, начиная с монтирования файловой системы:

EnsureMounted
func (container *Container) EnsureMounted() error {
if mounted, err := container.Mounted(); err != nil {
return err
} else if mounted {
return nil
}
return container.Mount()
}

func (container *Container) Mounted() (bool, error) {
return Mounted(container.RootfsPath())
}

func Mounted(mountpoint string) (bool, error) {
mntpoint, err := os.Stat(mountpoint)
if err != nil {
if os.IsNotExist(err) {
return false, nil
}
return false, err
}
parent, err := os.Stat(filepath.Join(mountpoint, ".."))
if err != nil {
return false, err
}
mntpointSt := mntpoint.Sys().(*syscall.Stat_t)
parentSt := parent.Sys().(*syscall.Stat_t)
return mntpointSt.Dev != parentSt.Dev, nil
}
Метод container.EnsureMounted проверяет была ли смонтирована файловая система, в противном случае - выполняет монтирование вызовом container.Mount():

container.Mount
func (container *Container) Mount() error {
image, err := container.GetImage()
if err != nil {
return err
}
return image.Mount(container.RootfsPath(), container.rwPath())
}

func (image *Image) Mount(root, rw string) error {
if mounted, err := Mounted(root); err != nil {
return err
} else if mounted {
return fmt.Errorf("%s is already mounted", root)
}
layers, err := image.layers()
if err != nil {
return err
}
// Create the target directories if they don't exist
if err := os.Mkdir(root, 0755); err != nil && !os.IsExist(err) {
return err
}
if err := os.Mkdir(rw, 0755); err != nil && !os.IsExist(err) {
return err
}
// FIXME: @creack shouldn't we do this after going over changes?
if err := MountAUFS(layers, rw, root); err != nil {
return err
}
// FIXME: Create tests for deletion
// FIXME: move this part to change.go
// Retrieve the changeset from the parent and apply it to the container
// - Retrieve the changes
changes, err := Changes(layers, layers[0])
if err != nil {
return err
}
// Iterate on changes
for _, c := range changes {
// If there is a delete
if c.Kind == ChangeDelete {
// Make sure the directory exists
file_path, file_name := path.Dir(c.Path), path.Base(c.Path)
if err := os.MkdirAll(path.Join(rw, file_path), 0755); err != nil {
return err
}
// And create the whiteout (we just need to create empty file, discard the return)
if _, err := os.Create(path.Join(path.Join(rw, file_path),
".wh."+path.Base(file_name))); err != nil {
return err
}
}
}
return nil
}
Подготовка параметров для монтирования производится функцией MountAUFS, логика которой аналогична первой версии, только теперь вместо утилиты mount монтирование производится системным вызовом:

MountAUFS
func MountAUFS(ro []string, rw string, target string) error {
// FIXME: Now mount the layers
rwBranch := fmt.Sprintf("%v=rw", rw)
roBranches := ""
for _, layer := range ro {
roBranches += fmt.Sprintf("%v=ro:", layer)
}
branches := fmt.Sprintf("br:%v:%v", rwBranch, roBranches)
return mount("none", target, "aufs", 0, branches)
}

func mount(source string, target string, fstype string, flags uintptr, data string) (err error) {
return syscall.Mount(source, target, fstype, flags, data)
}
После монтирования функция получает изменения файловой системы с помощью команды Changes и производит создание файлов c .wh. префиксом в папке rw, если они были удалены в верхнем слое. Алгоритм работы этой функции будет рассмотрен в разделе команды Diff, вычисляющей изменения в файловой системе. Теперь перейдем к инициализации сети вызовом container.allocateNetwork:

container.allocateNetwork
func (container *Container) allocateNetwork() error {
iface, err := container.runtime.networkManager.Allocate()
if err != nil {
return err
}
container.NetworkSettings.PortMapping = make(map[string]string)
for _, port := range container.Config.Ports {
if extPort, err := iface.AllocatePort(port); err != nil {
iface.Release()
return err
} else {
container.NetworkSettings.PortMapping[strconv.Itoa(port)] = strconv.Itoa(extPort)
}
}
container.network = iface
container.NetworkSettings.IpAddress = iface.IPNet.IP.String()
container.NetworkSettings.IpPrefixLen, _ = iface.IPNet.Mask.Size()
container.NetworkSettings.Gateway = iface.Gateway.String()
return nil
}
В нем происходит настройка сетевого интерфейса, присвоение ip адреса, маски и шлюза, а также проброс портов. Я подробно разберу весь этот функционал ниже, в отдельной части по работе с сетевым стеком. Далее идет вызов метода generateLXCConfig. Он был подробно разобран в первой статье и остался практически без изменений. Стоит лишь отметить, что теперь в lxc_template.go добавлены настройки сети, монтирование /etc/resolv.conf для работы dns и, главное, монтирование исполняемого файла docker в точку /sbin/init, так как теперь выполнение процесса будет начинаться с него. Я уже обращал на это внимание в части 2.1. Ниже приведены изменения в lxc_template:

lxc_template.go
# network configuration
lxc.network.type = veth
lxc.network.flags = up
lxc.network.link = lxcbr0
lxc.network.name = eth0
lxc.network.mtu = 1500
lxc.network.ipv4 = {{.NetworkSettings.IpAddress}}/{{.NetworkSettings.IpPrefixLen}}

# Inject docker-init
lxc.mount.entry = {{.SysInitPath}} {{$ROOTFS}}/sbin/init none bind,ro 0 0

# In order to get a working DNS environment, mount bind (ro) the host's /etc/resolv.conf into the container
lxc.mount.entry = /etc/resolv.conf {{$ROOTFS}}/etc/resolv.conf none bind,ro 0 0
Теперь осталось разобрать файл sysinit.go, с которого стартует созданный lxc контейнер:

sysinit.go
// Setup networking
func setupNetworking(gw string) {
if gw == "" {
return
}
cmd := exec.Command("/sbin/route", "add", "default", "gw", gw)
if err := cmd.Run(); err != nil {
log.Fatalf("Unable to set up networking: %v", err)
}
}

// Takes care of dropping privileges to the desired user
func changeUser(u string) {
if u == "" {
return
}
userent, err := user.LookupId(u)
if err != nil {
userent, err = user.Lookup(u)
}
if err != nil {
log.Fatalf("Unable to find user %v: %v", u, err)
}

uid, err := strconv.Atoi(userent.Uid)
if err != nil {
log.Fatalf("Invalid uid: %v", userent.Uid)
}
gid, err := strconv.Atoi(userent.Gid)
if err != nil {
log.Fatalf("Invalid gid: %v", userent.Gid)
}

if err := syscall.Setgid(gid); err != nil {
log.Fatalf("setgid failed: %v", err)
}
if err := syscall.Setuid(uid); err != nil {
log.Fatalf("setuid failed: %v", err)
}
}

func executeProgram(name string, args []string) {
path, err := exec.LookPath(name)
if err != nil {
log.Printf("Unable to locate %v", name)
os.Exit(127)
}

if err := syscall.Exec(path, args, os.Environ()); err != nil {
panic(err)
}
}

// Sys Init code
// This code is run INSIDE the container and is responsible for setting
// up the environment before running the actual process
func SysInit() {
if len(os.Args) <= 1 {
fmt.Println("You should not invoke docker-init manually")
os.Exit(1)
}
var u = flag.String("u", "", "username or uid")
var gw = flag.String("g", "", "gateway address")

flag.Parse()

setupNetworking(*gw)
changeUser(*u)
executeProgram(flag.Arg(0), flag.Args())
}
Как видим, в SysInit происходит настройка окружения перед запуском процесса. Добавление default gateway в таблицу маршрутизации, настройка пользователя и группы, под которыми будет выполняться процесс, и собственно запуск процесса стандартным методом Exec.

После запуска процесса в методе Start идет перенаправление стандартных потоков и запуск горутины container.monitor, работу которой мы разбирали в первой статье. Можно лишь добавить, что теперь в ней происходит освобождение назначенного ip адреса, проброшенных портов и размонтирование файловой системы.

container.monitor
func (container *Container) monitor() {
// Wait for the program to exit
container.cmd.Wait()
exitCode := container.cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()

// Cleanup
if err := container.releaseNetwork(); err != nil {
log.Printf("%v: Failed to release network: %v", container.Id, err)
}
container.stdout.Close()
container.stderr.Close()
if err := container.Unmount(); err != nil {
log.Printf("%v: Failed to umount filesystem: %v", container.Id, err)
}

// Re-create a brand new stdin pipe once the container exited
if container.Config.OpenStdin {
container.stdin, container.stdinPipe = io.Pipe()
}

// Report status back
container.State.setStopped(exitCode)
container.ToDisk()
}

func (container *Container) releaseNetwork() error {
err := container.network.Release()
container.network = nil
container.NetworkSettings = &NetworkSettings{}
return err
}

func (container *Container) Unmount() error {
return Unmount(container.RootfsPath())
}

func Unmount(target string) error {
if err := syscall.Unmount(target, 0); err != nil {
return err
}
// Even though we just unmounted the filesystem, AUFS will prevent deleting the mntpoint
// for some time. We'll just keep retrying until it succeeds.
for retries := 0; retries < 1000; retries++ {
err := os.Remove(target)
if err == nil {
// rm mntpoint succeeded
return nil
}
if os.IsNotExist(err) {
// mntpoint doesn't exist anymore. Success.
return nil
}
// fmt.Printf("(%v) Remove %v returned: %v\n", retries, target, err)
time.Sleep(10 * time.Millisecond)
}
return fmt.Errorf("Umount: Failed to umount %v", target)
}

Неожиданное завершение​

Ввиду того, что редактор хабра начал сильно тормозить, а местами и полностью зависать (видимо сказывается большое количество снипетов кода), я вынужден завершить эту часть, но продолжу в следующей, прямо с этого места.

 
Сверху