Processando Eventos de Rede Social usando EDA, Go, RabbitMQ e AT Protocol - Familiarizando com AT Protocol.
Buenas para você que está lendo este artigo, o qual é o terceiro da série sobre Arquitetura Orientada a Eventos com Go, RabbitMQ, AT Protocol e a rede social Bluesky. Quem diria que chegaríamos até aqui, hein?! Uma vitória para nós: eu, que estou conseguindo escrever, e você, que está interessado em ler! Então, vamos lá?
Mas antes, se você não lembra muito sobre o RabbitMQ ou sobre o contexto geral que dei para essa série, você pode começar por aqui para se contextualizar e por aqui para entender como usar o RabbitMQ com Go. Já neste artigo, vou focar em como se familiarizar com o AT Protocol em Go.
Por que usar o AT Protocol?
Ah, essa pergunta é boa, caso você esteja se fazendo ela! O AT Protocol é o protocolo de comunicação da rede federada Bluesky, onde poderemos ter outras redes sociais se comunicando entre si via esse protocolo. Eu comecei a prestar atenção nesse protocolo justamente quando o X (antigo Twitter) foi bloqueado no Brasil, e eu decidi migrar para o Bluesky. Mesmo com a volta do X (antigo Twitter), continuo naquela rede por achar bem interessante a forma como ela está sendo criada. E, claro, também pelas pessoas que sigo por lá.
Outro ponto aqui é que, por ser um protocolo open source (de código aberto) e gratuito, dá para fazer muita coisa bacana, inclusive o projeto que vamos realizar nesta série, após essas introduções que estou fazendo. Bacana, né? Além de aprender um pouco sobre Go, você, que está lendo, vai ver um projeto open source nascer, já que o que criarmos aqui ficará aberto, e a ideia será usá-lo como aprendizado.
Tá, mas o que seria esse AT Protocol?
O AT Protocol significa Authenticated Transfer Protocol. Deram esse nome devido ao fato de que todos os dados do usuário são assinados pelos próprios usuários que os criaram. Por exemplo: quando um usuário cria um post na rede social Bluesky, esse dado já possui uma assinatura que valida quem o criou (entre outras informações). Isso torna possível transmitir esses dados por vários serviços, provando que são reais sem precisar consultar o servidor de origem. Bacana, né?! Isso abre muitas possibilidades para que diferentes redes sociais conversem entre si e compartilhem conteúdo.
Imagine se o YouTube, Instagram e X (antigo Twitter) conversassem entre si usando esse protocolo. Você precisaria de apenas uma conta, em qualquer uma dessas redes, para interagir com os conteúdos de todas as outras. Isso é muito poderoso por si só, mas vamos ver até onde o pessoal do Bluesky vai levar o AT Protocol.
Caso queira saber mais sobre o AT Protocol, pode acessar aqui o site deles, que tem muita coisa explicada por lá!
Vamos começar a usar o AT Protocol com Go?
Se você pensou nisso, saiba que agora vamos começar um projetinho bacana (que me tomou bastante tempo para deixá-lo redondo e funcional). Vamos criar um bot que vai curtir e republicar todos os posts que tiverem um determinado texto, por exemplo, uma #hashtag, em uma de suas replies. Ou seja, você comenta uma #hashtag no post de um amigo, e o bot vai lá, curte e compartilha o post do seu amigo.
Bacana, né?! Espero que sim, pois demorei bastante para fazer. Primeiro porque tentei usar o Indigo, implementação de client em Go que o próprio time do Bluesky mantém, mas achei muito complexa e acho que ela mais confundiria do que ajudaria. Então, criei um pacote simples, baseado no Indigo e em outro projeto que encontrei, um bot que republica vagas de Go do William. Juntando tudo isso e a documentação, consegui fazer algo bem bacana!
Antes de mais nada, não vou detalhar como criar um novo projeto em Go, vou deixar isso como desafio para você. Outro ponto é que modifiquei a forma como usava logs. Recebi um feedback para usar o slog nativo do Go, que adiciona tags aos logs e ajuda a manter um padrão mais acolhido pela comunidade.
Agora sim, nossa organização de pastas e arquivos vai ser a seguinte:
├ goskyrepost
│ .env
│ .gitignore
│ go.mod
│ go.sum
│ LICENSE
│
├───cmd
│ └───api
│ main.go
│
├───internal
│ ├───handler
│ │ handler.go
│ │
│ └───repost
│ create_record.go
│ repost.go
│
└───pkg
└───atprotocol
authenticate.go
constant.go
did.go
firehose.go
record.go
repo_commit_event.go
Na pasta cmd/api
, vamos ter o arquivo main
do nosso bot. Ele é bem simples: vai importar o nosso pacote atprotocol
e chamar a função Firehose
, que irá escutar a firehose do Bluesky, passando uma função que irá processar os eventos que chegarem. Veja abaixo o seu conteúdo:
package main
import (
"log/slog"
"github.com/augustoasilva/goskyrepost/internal/handler"
"github.com/augustoasilva/goskyrepost/pkg/atprotocol"
)
func main() {
slog.Info("iniciando o servidor do bot de repost")
if firehoseErr := atprotocol.Firehose(handler.EventFn); firehoseErr != nil {
slog.Error("erro ao escutar a firehose do bluesky", "error", firehoseErr)
panic(firehoseErr)
}
}
Opa, mas pera lá, o que é essa tal de firehose que comentei? Você pode estar se perguntando. Bem, firehose é um termo dado a um serviço que disponibiliza um stream de dados em tempo real. No caso do Bluesky, eles têm um WebSocket onde disponibilizam diversos eventos, ou, como eles chamam esses eventos, diversos repo commits. Mas o que seriam esses repo commits? Você pode se perguntar de novo, então eu explicarei de novo: repo, ou data repository (repositório de dados), é como eles chamam os conjuntos de dados que representam um usuário, e os commits são as ações de criação, atualização ou deleção desses repos. Não me aprofundarei mais aqui, pois poderia ficar extenso, mas caso queira saber mais, pode consultar o glossário do AT Protocol.
Agora, antes de irmos para o pacote internal
, vamos olhar o pacote pkg
, que, em geral, é onde coloco pacotes de Go que não estão necessariamente ligados ao que a aplicação faz. Nesse caso, é só porque não gostei de usar o Indigo, conforme comentei antes, então criei meu próprio pacote. O primeiro arquivo com o qual vamos lidar será o did.go.
package atprotocol
type DIDVerificationMethod struct {
ID string `json:"id"`
Type string `json:"type"`
Controller string `json:"controller"`
PublicKeyMultibase string `json:"publicKeyMultibase"`
}
type DIDService struct {
ID string `json:"id"`
Type string `json:"type"`
ServiceEndpoint string `json:"serviceEndpoint"`
}
type DIDDoc struct {
Context []string `json:"@context"`
ID string `json:"id"`
AlsoKnownAs []string `json:"alsoKnownAs"`
VerificationMethod []DIDVerificationMethod `json:"verificationMethod"`
Service []DIDService `json:"service"`
}
Neste arquivo, temos as estruturas de resposta do endpoint de autenticação, que veremos mais à frente. Esse DIDDoc
é o objeto que contém as informações sobre o usuário autenticado via API do Bluesky. Caso queira saber mais sobre isso, basta acessar a documentação. De novo, não posso me aprofundar demais porque, como você já sabe, isso aqui ficaria muito longo. Seguindo para o próximo arquivo, veremos o record.go:
package atprotocol
type Reply struct {
Parent Parent `json:"parent"`
Root Root `json:"root"`
}
type Parent struct {
Cid string `json:"cid"`
Uri string `json:"uri"`
}
type Root struct {
Cid string `json:"cid"`
Uri string `json:"uri"`
}
type PostRecord struct {
Type string `json:"$type"`
Text string `json:"text"`
Reply *Reply `json:"reply"`
}
Nesse caso, record
é todo registro de ação que acontece no AT Protocol. Logo, existe um record
para post, outro para like, outro para quote, outro para reply… Conseguiu pegar, né?! Bem simples! Agora, vamos seguir para o próximo arquivo, que é o repo_commit_event.go:
package atprotocol
type RepoCommitEvent struct {
Repo string `cbor:"repo"`
Rev string `cbor:"rev"`
Seq int64 `cbor:"seq"`
Since string `cbor:"since"`
Time string `cbor:"time"`
TooBig bool `cbor:"tooBig"`
Prev interface{} `cbor:"prev"`
Rebase bool `cbor:"rebase"`
Blocks []byte `cbor:"blocks"`
Ops []RepoOperation `cbor:"ops"`
}
type RepoOperation struct {
Action string `cbor:"action"`
Path string `cbor:"path"`
Reply *Reply `cbor:"reply"`
Text []byte `cbor:"text"`
CID interface{} `cbor:"cid"`
}
Nada demais até aqui, essa estrutura representa o repo commit que comentei antes, que chega via firehose. Mas, se você tem um bom olho, vai perceber que não usei JSON para representar o tipo, e sim CBOR. Já tinha visto antes? Pois bem, o CBOR (Concise Binary Object Representation) é um formato binário de representação de dados. Ele cumpre o mesmo papel do JSON, porém, por ser em binário, acaba sendo mais compacto e se torna mais rápido e eficiente para transmitir. Logo, faz sentido ser usado em um serviço de alta volumetria de dados e alta eficiência, né?! Pois bem, seguindo para o próximo arquivo, teremos o firehose.go:
package atprotocol
import (
"bytes"
"io"
"log/slog"
"github.com/fxamacker/cbor/v2"
"github.com/gorilla/websocket"
)
func Firehose(handleEvent func(evt RepoCommitEvent) error) error {
conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
if err != nil {
slog.Error("falha ao conectar no websocket", "error", err)
return err
}
defer conn.Close()
slog.Info("conectado ao websocket", "url", wsURL)
for {
_, message, errMessage := conn.ReadMessage()
if errMessage != nil {
slog.Error("error ao ler a mensagem do websocket", "error", errMessage)
return errMessage
}
decoder := cbor.NewDecoder(bytes.NewReader(message))
for {
var evt RepoCommitEvent
decodeErr := decoder.Decode(&evt)
if decodeErr != nil {
if decodeErr == io.EOF {
break
}
slog.Error("error ao decodificar a mensagem de commit do repo", "error", decodeErr)
return decodeErr
}
if handleErr := handleEvent(evt); handleErr != nil {
panic(handleErr)
}
}
}
}
Olha as coisas ficando interessantes e complicadas ao mesmo tempo! Aqui temos algumas novidades: estamos passando uma função como parâmetro. Lembra da função que passamos no main.go
? Ela vai ser usada aqui! Primeiro, vamos abrir uma conexão de WebSocket com a firehose. Depois, com essa conexão aberta, começamos um loop infinito para processar continuamente as mensagens que chegam via WebSocket. Claro, essa não é a melhor forma de lidar com isso, mas deixei assim porque fica mais fácil de entender. Esse laço, que só termina com um erro ou um break
, é bem parecido com o while
de outras linguagens, já que Go não tem while
. Agora, vamos para o authenticate.go:
package atprotocol
import (
"bytes"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"os"
)
type SessionRequest struct {
Identifier string `json:"identifier"`
Password string `json:"password"`
}
func CreateSession() (*DIDResponse, error) {
request := SessionRequest{
Identifier: os.Getenv("BLUESKY_IDENTIFIER"),
Password: os.Getenv("BLUESKY_PASSWORD"),
}
requestBody, _ := json.Marshal(request)
url := fmt.Sprintf("%s/com.atproto.server.createSession", BskyXrpcURL)
resp, err := http.Post(url, "application/json", bytes.NewBuffer(requestBody))
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
slog.Error("status code inesperado", "status_code", resp.StatusCode)
return nil, fmt.Errorf("status code inesperado: %d", resp.StatusCode)
}
var didResponse DIDResponse
if jsonDecoderErr := json.NewDecoder(resp.Body).Decode(&didResponse); jsonDecoderErr != nil {
slog.Error(
"error ao decodificar a resposta do servidor",
"error", jsonDecoderErr,
)
return nil, jsonDecoderErr
}
return &didResponse, nil
}
Bem, esse arquivo é mais direto. Apesar de parecer assustador para quem tem pouca experiência, ele é bem simples: monta o body da requisição de autenticação, executa-a, obtendo o resultado e devolve esse resultado aqui, caso não ocorra um erro. E, por fim, do pacote atprotocol
, vamos ver o constant.go:
package atprotocol
const (
BskyXrpcURL = "<https://bsky.social/xrpc>"
wsURL = "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
)
Que somente contém duas URLs de acesso: a primeira é o Entryway do Bluesky e a outra é do WebSocket da firehose, também do Bluesky. Agora, você pode estar se perguntando o que é o Entryway. Bem, vou deixar isso como desafio para você usar o glossário e a documentação que já deixei por aqui! Agora, vamos ver o pacote internal/repost
, que contém a lógica para repostar e curtir (eu peguei essa ideia de curtir do projeto do William que comentei acima e achei bacana para mostrar como criar dois records diferentes). Vamos começar com o repost.go:
package repost
import (
"log/slog"
"github.com/augustoasilva/goskyrepost/pkg/atprotocol"
)
func Repost(p *atprotocol.PostRecord) error {
token, err := atprotocol.CreateSession()
if err != nil {
slog.Error("Error getting token", "error", err)
return err
}
recordParams := createRecordParams{
DIDResponse: token,
Resource: "app.bsky.feed.repost",
URI: p.Reply.Root.URI,
CID: p.Reply.Root.CID,
}
err = createRecord(recordParams)
if err != nil {
return err
}
recordParams.Resource = "app.bsky.feed.like"
err = createRecord(recordParams)
if err != nil {
return err
}
return nil
}
Esse arquivo é bem direto também. Essa função vai autenticar o usuário que configuramos nas nossas variáveis de ambiente. Após a autenticação, criamos um objeto com os parâmetros para criar um record no Bluesky. Conseguem perceber que, para criar um repost e um like, usamos sempre as mesmas informações? Interessante, né? Esses dois tipos de records compartilham o mesmo padrão. Agora, vamos ver o arquivo create_record.go, que faz a requisição:
package repost
import (
"bytes"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"time"
"github.com/augustoasilva/goskyrepost/pkg/atprotocol"
)
type createRecordParams struct {
DIDResponse *atprotocol.DIDResponse
Resource string
URI string
CID string
}
func createRecord(r createRecordParams) error {
body := atprotocol.RequestRecordBody{
LexiconTypeID: r.Resource,
Collection: r.Resource,
Repo: r.DIDResponse.DID,
Record: atprotocol.RequestRecord{
RecordSubject: atprotocol.RecordSubject{
URI: r.URI,
CID: r.CID,
},
CreatedAt: time.Now(),
},
}
jsonBody, _ := json.Marshal(body)
url := fmt.Sprintf("%s/com.atproto.repo.createRecord", atprotocol.BskyXrpcURL)
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonBody))
if err != nil {
slog.Error("erro ao criar objeto de request para criar um record", "error", err, "r.Resource", r.Resource)
return nil
}
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", r.DIDResponse.AccessJwt))
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
slog.Error("erro ao fazer a requisição para criar um record", "error", err, "r.Resource", r.Resource)
return nil
}
if resp.StatusCode != http.StatusOK {
slog.Error("status code inesperado ao tentar criar um record", "status", resp, "r.Resource", r.Resource)
return nil
}
slog.Info("record criado com sucesso", "resource", r.Resource)
return nil
}
Já nesse arquivo, nós montamos o body da requisição (um detalhe aqui é que o mesmo body vai ser usado para criar o repost e o like, mudando apenas o resource). Para o post que vamos fazer, criamos o objeto de requisição passando a URL e o body. Em seguida, adicionamos dois headers essenciais, onde identificamos o content-type da requisição e o token de autorização, e então fazemos a requisição. Simples e direto! Calma que estamos terminando; sei que está longo, mas falta apenas o pacote handler
, com o arquivo handler.go:
package handler
import (
"bytes"
"errors"
"fmt"
"io"
"log/slog"
"strings"
"github.com/augustoasilva/goskyrepost/internal/repost"
"github.com/augustoasilva/goskyrepost/pkg/atprotocol"
"github.com/fxamacker/cbor/v2"
"github.com/ipfs/go-cid"
carv2 "github.com/ipld/go-car/v2"
)
var terms = []string{"#repostbot"}
var EventFn = func(evt atprotocol.RepoCommitEvent) error {
for _, op := range evt.Ops {
if op.Action == "create" {
if len(evt.Blocks) > 0 {
err := handleCARBlocks(evt.Blocks, op)
if err != nil {
slog.Error("erro ao processar os blocos do CAR", "error", err)
return err
}
}
}
}
return nil
}
func handleCARBlocks(blocks []byte, op atprotocol.RepoOperation) error {
if len(blocks) == 0 {
return errors.New("não existem blocos para processar")
}
reader, err := carv2.NewBlockReader(bytes.NewReader(blocks))
if err != nil {
slog.Error("erro ao criar reader para ler os blocos do CAR", "error", err)
return err
}
for {
block, readErr := reader.Next()
if readErr == io.EOF {
break
}
if readErr != nil {
slog.Error("erro ao ler um bloco do CAR", "error", readErr)
break
}
c, cidErr := getCidFromOp(op)
if cidErr != nil {
slog.Error("erro ao pegar o CID da operação do evento", "error", cidErr)
continue
}
if block.Cid().Equals(*c) {
var post atprotocol.PostRecord
if unmarshalErr := cbor.Unmarshal(block.RawData(), &post); unmarshalErr != nil {
slog.Error("erro ao decodificar o bloco CAR usando CBOR", "error", unmarshalErr)
continue
}
if post.Text == "" || post.Reply == nil {
continue
}
if filterTerms(post.Text) {
_ = repost.Repost(&post)
}
}
}
return nil
}
func getCidFromOp(op atprotocol.RepoOperation) (*cid.Cid, error) {
if opTag, ok := op.CID.(cbor.Tag); ok {
if cidBytes, ok := opTag.Content.([]byte); ok {
return decodeCID(cidBytes)
}
}
return nil, errors.New("nenhum CID encontrado na operação")
}
func decodeCID(cidBytes []byte) (*cid.Cid, error) {
c, err := cid.Decode(string(cidBytes))
if err != nil {
return nil, fmt.Errorf("erro ao decodificar o CID: %w", err)
}
return &c, nil
}
func filterTerms(text string) bool {
for _, term := range terms {
if strings.Contains(strings.ToLower(text), strings.ToLower(term)) {
return true
}
}
return false
}
Esse arquivo é bem grande e contém pequenos pedaços de código, mas vou explicar o que ele faz. Ele cria uma variável que vai guardar a função que vamos usar no arquivo main.go
, passando para a firehose. Essa função irá receber o evento da firehose e ler os blocos que ela contém. Entretanto, esses blocos de informação são blocos de CAR. Mas o que é CAR? Bem, se você se perguntou o que é CAR, digo-lhe que é "carro" em inglês! Brincadeiras à parte, CAR, nesse contexto, é um formato de arquivo que armazena dados de forma eficiente e segura ao usar endereçamento de conteúdo. De novo, não vou me aprofundar no termo CAR, pois o artigo já está longo, mas aqui você pode acessar a especificação do CARv2 que estou usando.
Ao ler cada bloco do CAR, vamos atrás do bloco que contém a informação relevante ao commit. E como fazemos isso? Bem, cada commit (o evento que recebemos da firehose) tem uma lista de operações, e, dessa lista, vamos ler cada uma para dar um match no CID do bloco de informação com o CID que contém a operação. Assim, conseguimos as informações daquela operação. Foca que o CID vai endereçar as informações da operação via CID, sendo o CID o identificador de conteúdo (Content Identifier). Lembra que estamos filtrando somente operações de criação (op.Action == "create"
)? Logo, precisamos dessas informações.
Depois de toda essa parte complexa, chegamos no que queremos: validamos se aquele record possui um texto (para representar um post, quote, reply, etc.) e se tem um reply, pois é via reply que vamos pegar o post que vamos republicar. Lembra? Caso essas condições sejam satisfeitas, nós chamamos a função de repost!
Agora chegou o momento: execute seu projeto e, depois, faça um reply em algum post usando o termo que você configurou no handler.go, e você verá o código em ação! Se tudo der certo, você verá o seguinte no seu console:
2024/10/14 19:59:22 INFO iniciando o servidor do bot de repost
2024/10/14 19:59:23 INFO conectado ao websocket url=wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos
2024/10/14 19:59:52 INFO record criado com sucesso resource=app.bsky.feed.repost
2024/10/14 19:59:53 INFO record criado com sucesso resource=app.bsky.feed.like
Então, você verá que a conta que você configurou no bot irá curtir e republicar conforme planejado. Parabéns! Agora você tem um bot funcional do Bluesky. Caso queira ver, o código está disponível aqui no GitHub: https://github.com/augustoasilva/goskyrepost/tree/feature/add-base-bot-functionalities
E agora?
Agora, brinque um pouco com o código e com as documentações; divirta-se com o resultado! O próximo artigo da série talvez demore um pouco, pois o projeto que vamos fazer ainda estou projetando, mas não fique triste, pois na próxima semana teremos artigo novo!
Então, é isso para este artigo. Nos vemos nos próximos artigos! E, se você está gostando, inscreva-se na newsletter do site para ficar por dentro das novidades e poder interagir com os artigos!