Elixir : Upload zip to S3

How to upload a zip to S3


Today we'll see how to upload a bunch of files stored in a zip file using Elixir and Phoenix and a little bit of Erlang too.

First of all let's see the process, the user send us a multipart zip file using a form, we receive and store it. Then we need to unzip the file (in our case we chose to do it in /tmp). Then we use the Elixir Arc library to store it in S3. Actually we're using Arc Ecto to do it as we want to store the file reference in our database to access it later.

We want to make it transactional as we don't want to have just part of the zip stored in our database, but all entries. We'll introduce Ecto.Multi that allows us to do this.

First of all let's have a look to our Media model that will store the media in our PostgreSQL.

defmodule MyApp.Media do
  use Ecto.Schema
  use Arc.Ecto.Schema
  import Ecto.Changeset

  @primary_key {:id, Ecto.UUID, autogenerate: true}
  @derive {Phoenix.Param, key: :id}

  schema "model_medias" do
    field(:name, MyApp.Uploader.Media.Type)

    timestamps()
  end

  @doc false
  def changeset(media, attrs) do
    media
    |> cast(attrs, [:name])
    |> cast_attachments(attrs, [:name])
    |> validate_required([:name])
  end
end

At the beginning we've defined an UUID primary key but that is not mandatory and a name field defined as a MyApp.Uploader.Media. This field is just a simple string with the file name and a timestamp that will be stored in database. Now let's have a look to our media uploader module.

defmodule MyApp.Uploader.Media do
  use Arc.Definition
  use Arc.Ecto.Definition

  # To add a thumbnail version:
  @versions [:original, :thumb]

  # Whitelist file extensions:
  def validate({file, _}) do
    ~w(.jpg .jpeg .gif .png .pdf) |> Enum.member?(Path.extname(file.file_name))
  end

  # Define a thumbnail transformation:
  def transform(:thumb, {file, _scope}) do
    {:convert, "-strip -thumbnail 250x250^ -format png", :png}
  end

  # Override the persisted filenames:
  def filename(version, {file, _scope}) do
    file_name = Path.basename(file.file_name, Path.extname(file.file_name))
    "#{version}_#{file_name}"
  end

  def filename(version, _) do
    version
  end

  # Override the storage directory:
  def storage_dir(_version, {_file, _scope}) do
    "uploads/medias/"
  end
end

You can see that this is an Arc definition, to grab all the details I strongly recommend you to have a look to the official documentation. This script define that we want to store the file in the "uploads/medias/" folder and we want to create a thumbnail of 250x250 when uploading.

Now that we have our model we need to load the zip file and unzip it, to do that we'll use the :zip erlang library. You first need to “open” the archive by unzipping it (you can do it on hard drive or in memory) and then access each file to store it and then close the handler by calling zip_close. Here is how you do it :

def import(%Plug.Upload{} = zipfile) do
    path = to_charlist(zipfile.path)
    path_name = to_charlist("/tmp")

    with {:ok, handle} <- :zip.zip_open(path, [{:cwd, path_name}]),
          {:ok, file_names} = :zip.zip_get(handle) do
      try do
        # remove files beginning with '.'
        filter_hidden_files(file_names)
        # transform file to %Plug.Upload{}
        |> to_plug_upload()
        # create multi with all inserts
        |> to_multi(Multi.new())
        # run the transaction
        |> Repo.transaction()
      after
        :zip.zip_close(handle)
      end
    end
  end
end

As you can see we chose to unzip in “/tmp” folder. After getting the list of file paths, we remove the hidden ones (beginning by '.') to generate the transaction for data storage. I'll pass the filter_hidden_files function that is trivial to code and show you the to_plug_upload function that is a litle trickier.

defp to_plug_upload(_, uploads \\ [])

defp to_plug_upload([], uploads) do
  uploads
end

defp to_plug_upload([file | tail], uploads) do
  upload = %Plug.Upload{
    content_type: MIME.from_path(file),
    filename: Path.basename(file),
    path: to_string(file)
  }

  to_plug_upload(tail, uploads ++ [upload])
end


You see that the recursion is done on each file contained in the zip, we use it to generate a Plug.Upload structure by specifying the MIME type thanks to the MIME module, the filename and the file path. Remember that to use the :zip library we had to transform our string to a char list (because of Erlang compatibility), we need to do the same in the other side by changing our file path to a string.

Then there is the transaction creation through the to_multi implementation.


defp to_multi([], multi) do
  multi
end

defp to_multi([file | tail], multi) do
  attrs = %{name: file}
  m = %Media{} |> Media.changeset(attrs)
  to_multi(tail, multi |> Multi.insert(file.filename, m))
end

It's straightforward too as you see we recurse over the file list and add an insert in our transaction for each file.

I hope the quick example will make you're work easier if you're coding in Elixir or make you wanting to try it out.

Oh, and if you find the possible bug in this very basic implementation you can put it as a comment.

Elixir: Document your API


In order to have someone who will actually use your API, you need to provide some documentation and usages for it. Developer eXperience is also very important when making an API. Making back-end is difficult but having a great front-end too, that is why using your API should be a breeze.

I really like to have my documentation as near as my code, but I want it to be decoupled too. There’s different libraries that offers such a feature, some might use controller annotations like ExDoc, others generate the documentation from the test cases using Bureaucrat, personally I prefer to add my documentation explicitly manually for a matter of separation of concerns using PhoenixSwagger. This way I can eventually not provide external documentation for some endpoints, and have more test cases. It’s a matter of taste.

For the sake of standardisation I prefer to use the OpenAPI Specification (aka Swagger) in order to document my API, it provides some great tooling and is widely supported. That is why I used the phoenix_swagger library and added the documentation in my controllers. You can have a look at the OpenAPI 2.0 Specs to know more about supported formats, parameters, headers and authentication. In Phoenix we have a DSL allowing us to generate the swagger.json file based on our definitions.

To make it work just follow the phoenix_swagger installation guide, you’ll mainly need to provide your router and endpoint module name in the config.ex and a definition of swagger_info/0 in your router


def swagger_info do
  %{
    info: %{
      version: "1.0",
      title: "MY API",
      consumes: [
        "multipart/form-data",
        "application/json",
        "application/vnd.api+json"
      ]
    },
    securityDefinitions: %{
      bearerAuth: %{
        type: "apiKey",
        name: "Authorization",
        in: "header"
      }
    },
    security: [
      %{bearerAuth: []}
    ]
  }
end


This is mine, you can see that you need to define all of your API supported content-type. I’ve also defined that my API ask for a bearer authorization token.

I’ve also added a route to access the generated API at /api/swagger. It will allow developers to access the last available version of the documentation easily :


scope "/api/swagger" do
  pipe_through([:api_doc_auth])

  forward("/", PhoenixSwagger.Plug.SwaggerUI, otp_app: :MY_API, swagger_file: "swagger.json")
end


You can see that I have a pipeline applied to this route, this is because we want to add a basic authentication to the route.


pipeline :api_doc_auth do
  plug(BasicAuth, use_config: {:gi_api, :api_doc_auth})
end


We used the basic_auth Phoenix plug to make this possible and easy, that’s how you can configure it to use environment variables as login / password for your API documentation route.


config :gi_api,
  api_doc_auth: [
    username: System.get_env("BASIC_AUTH_API_USERNAME"),
    password: System.get_env("BASIC_AUTH_API_PASSWORD"),
    realm: "API Doc Area"
  ]


Now that you’ve configured how you documentation will be available, you just need to declare it :) This is done in your controller, you’ll need to call swagger_path/2 in order to define each endpoint, they can then refer to more complexes schemas that needs to be loaded calling swagger_definitions/0.


swagger_path :index do
  PhoenixSwagger.Path.get("/api/datas")
  consumes("application/vnd.api+json")
  produces("application/vnd.api+json")

  operation_id("index")

  tag("Data")

  paging(size: "page[page_size]", number: "page[page]")

  description("List data")

  response(200, "Success", Schema.ref(:Data))
  response(401, "Not Authenticated")
end


This is my definition for the paginated data schema as a JSON-API resource.


%{
  DataResource:
    JsonApi.resource do
      description("A data.")

      relationship(:user)

      relationship(:media, type: :has_many)

      attributes do
        some_attribute(:string, "Data attribute")
      end
    end,
  Data: JsonApi.single(:DataResource),
  Datas: JsonApi.page(:DataResource)
}


On our project we use JsonApi through JaSerializer and pagination thanks to Scrivener. As it is a common stack PhoenixSwagger helpers around JSON-API resources. Very useful when you’re using it as your Data Transfer Protocol. You can define a single resource with JsonApi.single/1 or a paginated resource (based on the paging parameter) with JsonApi.page/1.

Another example with a POST request getting a file from a multipart/form-data form could be.


swagger_path :create do
  PhoenixSwagger.Path.post("/api/datas/{data_id}/medias")
  consumes("multipart/form-data")
  produces("application/json")

  operation_id("create")

  tag("Medias")

  description("Create a media")

  parameters do
    data_id(:path, :string, "Data UUID", required: true)
    kind(:formData, :string, "Should be [image|document]", required: true)
    file(:formData, :file, "Attached media", required: true)
  end

  response(200, "Success", Schema.ref(:Media))
  response(404, "Not Found")
  response(422, "Unprocessable Entity")
  response(401, "Not Authenticated")
end


The :formData specify to Swagger that the API accepts a form formatted field and the type :file means that we will have a binary field and expect a multipart/form-data content type.

With all this you’ll have a smooth API documentation available at /api/swagger, protected by a login / password which supports JWT token authorization header.



You can go further by looking at the PhoenixSwagger documentation. The documentation is not as up to date as I thought but you can access the @doc annotations directly in the source code to have some more examples. I had to find in passed issues too for some of my needs. Besides it also offer easier controller testing through schema validation, but this is another story :)

I hope that this will help you provide a great developer experience to your front end developers and that they will let you waste more time on 9gag now :)

If you like this Elixir / Phoenix blog post serie please share it or drop a comment.

Elixir: Live Notifications

We’ve seen at the beginning of our post series that we wanted to notify users in our application with some administrative messages in real time.

For that we chose to connect all our clients to a room where they will receive these messages through web socket and notify the end user using some front-end tricks.

On the Phoenix side this is pretty straightforward as it natively provides live messaging through web socket or long polling depending on what you need or better said depending on what the network let you have :)

Web socket is a protocol allowing you to have a bidirectional live connection between your clients and server. It will manage all the keep-alive stuff and connection refresh for you. If there is a problem during the bidirectional communication it will try to fallback to long polling if possible. We'll see how to use it with Phoenix.

So first of all, you’ll need to define an endpoint for your web socket, to do this, you should add to your endpoint.ex.

socket("/socket", MyAppWeb.UserSocket)

In your UserSocket module you’ll need to specify the transport you want to manage and the channels that will be available to the clients. We decided to call the publishing channel room:lobby but you can give it any name you want. It also provides some hook function for connecting or identifying the clients. We do not need that at the moment so I’ll just show you a basic user_socket.ex.

defmodule MyAppWeb.UserSocket do
  use Phoenix.Socket

  ## Channels

  channel("room:lobby", MyAppWeb.RoomChannel)

  ## Transports

  transport(:websocket, Phoenix.Transports.WebSocket)
  transport(:longpoll, Phoenix.Transports.LongPoll)
end


As we see we defined the room and attached the logic associated to it in RoomChannel, this will manage the message dispatching to our users. In our case we want to perform a simple join to the channel to allow channel access and a simple new_msg action so the user can receive messages.


defmodule MyAppWeb.RoomChannel do
  use Phoenix.Channel

  def join("room:lobby", _message, socket) do
    {:ok, socket}
  end

  def handle_in("new_msg", %{"body" => body}, socket) do
    broadcast!(socket, "new_msg", %{body: body})

    {:noreply, socket}
  end
end

Then you’ll need to join the channel on the client side. Phoenix offers a phoenix.js library that supports this pub sub design out of the box, as web socket is just the communication protocol but do not provides more complex patterns. You can add to your socket.js file

socket.connect()


// Now that you are connected, you can join channels with a topic:

let channel = socket.channel("room:lobby", {})


channel.on("new_msg", payload => {

    let messageItem = document.createElement("li")

    messageItem.innerText = `[${Date()}] ${payload.body}`

    messagesContainer.appendChild(messageItem)

})


channel.join()

.receive("ok", resp => {
    console.log("Joined successfully", resp)
})

.receive("error", resp => {
    console.log("Unable to join", resp)
})

You can then push messages to your clients by adding a form in your back office to allow customer service to send messages easily that will call the following code


let channel = socket.channel("room:lobby", {})

let chatInput = document.querySelector("#chat-input")

let messagesContainer = document.querySelector("#messages")


chatInput.addEventListener("keypress", event => {

    if (event.keyCode === 13) {

        channel.push("new_msg", {
            body: chatInput.value
        })

        chatInput.value = ""

    }

})


channel.join()

.receive("ok", resp => {
    console.log("Joined successfully", resp)
})

.receive("error", resp => {
    console.log("Unable to join", resp)
})


Be careful though that we do not manage any authentification system so anyone can eventually use the channel to send messages to your customers.

You can play a little with your channel by using a command line web socket client and try to follow the Phoenix.Socket.Message format.


%Phoenix.Socket.Message{
  event: term(),
  payload: term(),
  ref: term(),
  topic: term()
}


The event contains the event name like phx_join, the topic would be your channel name, ref is an identifier for the request, I generally put "1". The payload is the content expected by the channel handler.
For your information, the web socket and long poll urls are separated and you can connect using a web socket client :


wsta 'ws://localhost:4000/socket/websocket'

Or

curl 'http://localhost:4000/socket/longpoll'

Now that we've made our web socket communication we need to be sure that only our customers will connect. To do that you'll need to use a library we've already seen named Guardian. What we'll do is that we'll provide the user token on client side while connecting, this way our server will be able to check user token and attach it to the session. We'll also attach user to a channel corresponding to his UUID, this way we'll be able to send him messages like forcing deconnect for example.

So first of all you need to update your client to inject the user token. I did it by modifying app.html.eex body end this way :


<script>window.token = "<%= assigns[:token] %>";</script>
<script src="<%= static_path(@conn, "/js/app.js") %>"></script>


Then you'll need to pass the token on channel connection by editing socket.js this way :


import {Socket} from "phoenix"

let socket = new Socket("/socket", {params: {token: window.token}})


Now that our client is giving the token to our web socket handler we'll need to change the way connection is handled in user_socket.ex by loading the user resource from the token :


def connect(%{"token" => token}, socket) do
  case Account.Guardian.resource_from_token(token) do
    {:ok, %User{} = resource, _claims} ->
      {:ok, assign(socket, :current_user, resource.id)}

    {:error, reason} ->
      Logger.warn(fn ->
        "Websocket unauthorized: #{inspect(reason)}"
      end)

      :error
  end
end


I'll let you refer to the post about account management to see how to define your Account.Guardian module to load from database the user on token decoding. We see that the function returns the resource and the claims that we use to be copyed to socket under the atom :current_user.

Then we use the id function to be able to manage all active sockets for a user. This way we can for example for disconnection from all socket for a specific user :


# Would allow you to broadcast a "disconnect" event and terminate
# all active sockets and channels for a given user:
#
#     GiApiWeb.Endpoint.broadcast("user_socket:#{user.id}", "disconnect", %{})
#
# Returning `nil` makes this socket anonymous.

def id(socket), do: "user_socket:#{socket.assigns.current_user}"


And there you are, a secured connection with an identifier allowing you to know which user is connected.

Hope you enjoyed the quick tutorial, now you can go further and implement some more complex channel handlers.

Most seen