golang 实现struct复制


package components

import "fmt"
import "reflect"

//复制结构体中名称一致类型一致得字段
func CopyFields(des interface{}, src interface{}, fields ...string) (err error) {
at := reflect.TypeOf(des)
av := reflect.ValueOf(des)
bt := reflect.TypeOf(src)
bv := reflect.ValueOf(src)

// 简单判断下
if at.Kind() != reflect.Ptr {
err = fmt.Errorf("des must be a struct pointer")
return
}
av = reflect.ValueOf(av.Interface())

// 要复制哪些字段
_fields := make([]string, 0)
if len(fields) > 0 {
_fields = fields
} else {
for i := 0; i < bv.NumField(); i++ { _fields = append(_fields, bt.Field(i).Name) } } if len(_fields) == 0 { return } // 复制 for i := 0; i < len(_fields); i++ { name := _fields[i] f := av.Elem().FieldByName(name) bValue := bv.FieldByName(name) // a中有同名的字段并且类型一致才复制 if f.IsValid() && f.Kind() == bValue.Kind() && f.CanSet() { f.Set(bValue) } } return }

golang 自己实现Rpc调用客户端


package components

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"github.com/astaxie/beego/config"
"io"
"io/ioutil"
"net/http"
)

var (
defaultRpcConf config.Configer
)

type RpcResponse struct {
Id string `json:"id"`
Code string `json:"code"`
Message string `json:"message"`
Type string `json:"type"`
Payload interface{} `json:"payload"`
}

//发起远程调用
//TODO 鉴权可以在这里进行
func Request(url string, data io.Reader, payload* interface{}) (*RpcResponse, error) {
req,err := http.NewRequest("POST", url, data)
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)

if err != nil {
return nil, err
}

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}

str := string(body[:])
fmt.Print(str)

v := RpcResponse{Payload: payload}
json.Unmarshal(body, &v)

return &v,nil
}

func Rpc(name string, data interface{}, payload interface{}) (*RpcResponse, error) {
url := defaultRpcConf.String("urls::test")
requestJson, err := json.Marshal(data)
if err != nil {
return nil, err
}
p, e := Request(url, bytes.NewReader(requestJson), &payload)
if e != nil {
return nil, e
}
return p, nil
}

//rpc 简单调用方法
func RpcWithPayload(name string, data interface{}, payload interface{}) error {
p, e := Rpc(name, data, payload)

if e != nil {
return e
}

//检查rpc返回状态码
if p.Code != "0" {
return errors.New("error code: " + p.Code + " Message: " + p.Message)
}

return nil
}

func init() {
conf, err := config.NewConfig("ini", "../conf/rpc.conf")
if err != nil {
panic(err)
}
defaultRpcConf = conf
}

cronbot 安装使用

安装官方教程安装cronbot命令。

常见问题:

Q1:No module named ‘requests.packages.urllib3’
请使用 (pip install requests urllib3 –force –upgrade)升级urllib3版本既可解决。
Q2:ImportError: ‘pyOpenSSL’ module missing required functionality. Try upgrading to v0.14 or newer.
请使用 (pip uninstall pyOpenSSL; pip install pyOpenSSL==16.2.0) 安装16.2.0版本的pyOpenSSL才可以,最新的17.0还是会出现报错

批量操作服务器脚本

该脚本的前提是在 ~/.ssh/config 已经配置好的情况下使用

#!/bin/bash
#####################################################################
########    服务器批量操作脚本
########    Email: i@liufang.org.cn
#####################################################################
for host in `cat ~/.ssh/config |grep 'Host '|awk -F ' ' '{print $2}'`
do
	echo -e "\033[34m\r\nexec command:$1 at host $host \033[0m"
	ssh $host $@
done

spark简单实例

    private static void runBasicDataFrameExample(SparkSession spark) throws AnalysisException {
        // $example off:global_temp_view$
        Properties prop = new Properties();
        prop.setProperty("user","root");
        prop.setProperty("password","admin");


        //read data form mysql novel article
        Dataset<Row> df2 = spark.read().jdbc("jdbc:mysql://localhost:3306/igogo", "novel_info", prop);
        df2.createGlobalTempView("novel");
        //Dataset<Row> sqlresult = spark.sql("SELECT to_char(create_time, 'yyyymmdd hh24') as d, count(*) FROM global_temp.novel group by d");



//        df2.sqlContext().udf().register("d", (String s) -> s.length(), DataType.INTEGER);
        spark.udf().register("d", (String s) -> { return s.length();}, DataTypes.IntegerType);
        Dataset<Row> sqlresult = spark.sql("SELECT id, d(title) as myfunc FROM global_temp.novel having myfunc>10");
        sqlresult.show();
        //config mysql
//        spark.newSession().sql("SELECT * FROM global_temp.people").write().mode(org.apache.spark.sql.SaveMode.Append)
//                .jdbc("jdbc:mysql://localhost:3306/test", "test", prop);
    }

maven 镜像配置

国内网络环境决定, 很多开发工作, 软件包管理软件需要配置镜像才能相对正常工作, maven也不例外。

添加配置文件 ~/.m2/settings.xml

<?xml version="1.0" encoding="UTF-8"?>
<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd">
    <mirrors>
        <!-- 阿里云仓库 -->
        <mirror>
            <id>alimaven</id>
            <mirrorOf>central</mirrorOf>
            <name>aliyun maven</name>
            <url>http://maven.aliyun.com/nexus/content/repositories/central/</url>
        </mirror>

        <!-- 中央仓库1 -->
        <mirror>
            <id>repo1</id>
            <mirrorOf>central</mirrorOf>
            <name>Human Readable Name for this Mirror.</name>
            <url>http://repo1.maven.org/maven2/</url>
        </mirror>

        <!-- 中央仓库2 -->
        <mirror>
            <id>repo2</id>
            <mirrorOf>central</mirrorOf>
            <name>Human Readable Name for this Mirror.</name>
            <url>http://repo2.maven.org/maven2/</url>
        </mirror>
    </mirrors>
</settings>

在微服务架构中, 多API请求往往占据很长时间, 特别是对于那些没有相互依赖的api请求, 我们可以调整为并发请求,如此能大大节约api请求总时间。
下面的函数是利用curl并发请求改写的一个函数, 可以很好的解决多api请求的问题。

    function multi_curl($requests = [])
    {
        $response = [];
        $hander = [];
        $mh = curl_multi_init();

        foreach($requests as $id=>$item) {
            if(empty($item['url'])) {
                throw new \Exception('基本参数URL不能为空');
            }

            $ch = curl_init();
            curl_setopt($ch, CURLOPT_URL, $item['url']);
            curl_setopt($ch, CURLOPT_HEADER, 0);
            curl_setopt($ch, CURLOPT_RETURNTRANSFER,1);


            curl_setopt($ch, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_0);
            curl_setopt($ch, CURLOPT_HTTPHEADER, array('Expect: '));
            if(defined('CURLOPT_IPRESOLVE') && defined('CURL_IPRESOLVE_V4')){
                curl_setopt($ch, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
            }
            curl_setopt($ch, CURLOPT_TIMEOUT, isset($item['timeout']) ? $item['timeout'] : 3);
            curl_setopt($ch, CURLOPT_CUSTOMREQUEST, isset($item['method']) ? $item['method'] : 'GET');

            //设置请求参数
            if(isset($item['data'])) {
                curl_setopt($ch, CURLOPT_POSTFIELDS, http_build_query($item['data']));
            }

            curl_setopt($ch, CURLOPT_HTTPAUTH, CURLAUTH_DIGEST);
            //定义api请求认证信息
            if(defined('API_USERNAME') && defined('API_PASSWORD')) {
                curl_setopt($ch, CURLOPT_USERPWD, API_USERNAME . ":" . API_PASSWORD);
            }

            curl_multi_add_handle($mh,$ch);
            $hander[$id] = $ch;
        }

        $running=null;
        do {
            usleep(1000);
            curl_multi_exec($mh,$running);
        } while ($running > 0);

        //get content and remote handle
        foreach($hander as $id=>$ch) {
            $response[$id] = curl_multi_getcontent($ch);
            //检查结果是否需要转换为json, 默认转换
            $toArray = isset($requests[$id]['to_array']) ? $requests[$id]['to_array'] == true : true;
            if($toArray) {
                //$response[$id] = json_decode(trim($data, chr(239) . chr(187) . chr(191)), true);
                $response[$id] = json_decode($response[$id], true);
            }
            curl_multi_remove_handle($mh, $ch);
        }
        curl_multi_close($mh);
        
        //检查是否进行数组转换, 默认转换为数组

        return $response;
    }

配置docker国内镜像加速

在国内如果没拉专线大部分都是很慢很慢, 无法正常使用docker, 但是我们可以通过镜像地址来减轻这种现象。

配置信息(/etc/default/docker):

# Docker Upstart and SysVinit configuration file

#
# THIS FILE DOES NOT APPLY TO SYSTEMD
#
#   Please see the documentation for "systemd drop-ins":
#   https://docs.docker.com/engine/admin/systemd/
#

# Customize location of Docker binary (especially for development testing).
#DOCKERD="/usr/local/bin/dockerd"

# Use DOCKER_OPTS to modify the daemon startup options.
#DOCKER_OPTS="--dns 8.8.8.8 --dns 8.8.4.4"
#阿里云镜像
DOCKER_OPTS="--registry-mirror=https://4xle30hc.mirror.aliyuncs.com"
#官方中国镜像
#DOCKER_OPTS="--registry-mirror=https://registry.docker-cn.com"
#163镜像
#DOCKER_OPTS="--registry-mirror=http://hub-mirror.c.163.com"

# If you need Docker to use an HTTP proxy, it can also be specified here.
#export http_proxy="http://127.0.0.1:3128/"

# This is also a handy place to tweak where Docker's temporary files go.
#export DOCKER_TMPDIR="/mnt/bigdrive/docker-tmp"

修改配置后请重启docker服务:

sudo service docker restart

分布式日志系统搭建

概述

应为工作的需要,需要部署一个日志管理系统,以解决多服务器日志查看的问题。
在网上查询了下, 现在常用的是基于filebeat,kafka,logstash,elasticsearch,kibana实现。

各个软件的角色
filebeat: 用于日志收集的客户端(日志数据直接送往elasticsearch应该是也可以的)。
kafka: 消息队列系统,filebeat采集后丢消息队列, 用来分发日志。
logstash:日志收集分析存储(可以对日志进行一些分析处理)。
elasticsearch: 这是一个分布式的搜索引擎, 在这里作为数据仓库, 用来存储,搜索服务。
kibana: 是一个为 Logstash 和 ElasticSearch 提供的日志分析的 Web 接口。可使用它对日志进行高效的搜索、可视化、分析等各种操作。

环境搭建

各个软件下载解压步骤略过, java环境搭建略过, 如该对这些有问题可以进行百度解决。

例如我是将所有的软件直接解压到 /opt/, 该目录一般也是用来装一些软件使用。

filebeat 安装

修改配置文件, 根据自己的需要修改配置信息, 最基本的配置文件修改收集的日志路径(软件根目录filebeat.yml文件)

 

 paths:
    - /var/log/*.log

配置收集的日志输出:

 

#================================ Kafka =====================================
output.kafka:
  # initial brokers for reading cluster metadata
  #hosts: ["localhost:9092", "localhost:9093", "localhost:9094"]
  hosts: ["localhost:9092"]
  # message topic selection + partitioning
  #topic: '%{[type]}'
  topic: 'c-test'
  partition.round_robin:
    reachable_only: false

  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

输出源可选的有很多, 包括可以直接输出到我们后面说的那两个软件, elasticsearch,logstash 等。 这里我们使用的kafka(kafka这里1可以做一些缓冲, 其次方便后期做一些其他的扩展)

运行filebeat:

sudo ./filebeat -e -c filebeat.yml

需要注意的是 filebeat.yml 软件会提示需要root所有。

kafka安装

kafka是基于zookeeper的,我们可以先启动zookeepre

bin/zookeeper-server-start.sh config/zookeeper.properties

 

配置文件包括数据存储,端口等信息,默认无需修改即可运行, 需要注意的是,数据存储的目录默认是 /tmp下,如果重启服务器就会导致数据丢失的可能。zookkeeper也是可以集群的。

kafka配置信息修改, 这是一个分布式的软件, 配置信息(server.properties)修改需要注意的是

1 指向同一个zookeeper集群

2 broker.id=0 在集群中不要重复

创建主题(重要)

 

./bin/kafka-topics.sh –zookeeper 192.168.2.225:2183/config/mobile/mq –create –topic log –replication-factor 1 –partitions 1 

启动创建者(可以用来测试)

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic log

 

启动消费者(可以用来测试)

 

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic log --from-beginning

 

安装logstash

创建配置文件 config/my.conf

 

input {
	kafka {
		#config kafka server info
		bootstrap_servers =&gt; "localhost:9092,localhost:9093,localhost:9094"
		topics =&gt; ["c-test"]
	}
}


output { 
	#stdout {}
	elasticsearch {
	}
filter {
    json {
        source =&gt; "message"
    }        
}

详细配置请参照官方文档:https://www.elastic.co/guide/en/logstash/current/installing-logstash.html#package-repositories

 

启动:

bin/logstash -f config/my.conf

 

该系统启动对elasticsearch有依赖。

安装Elasticsearch

修改配置文件(elasticsearch.yml)

 

# ---------------------------------- Cluster -----------------------------------
#
# Use a descriptive name for your cluster:  集群名字,在一个集群中的机器请保持一致的配置
#
cluster.name: es
#
# ------------------------------------ Node ------------------------------------
#
# Use a descriptive name for the node: 集群节点名, 如果在集群中请保持不同的名字
#
node.name: fang-pc


# ----------------------------------- Paths ------------------------------------
#
# Path to directory where to store the data (separate multiple locations by comma):  数据存储路径,根据需要调整
#
path.data: /home/fang/software/elasticsearch-5.4.1/data
#
# Path to log files:  日志路径,根据自己需求调整
#
path.logs: /home/fang/software/elasticsearch-5.4.1/logs

 

启动:

bin/elasticsearch

安装kibana

这个需要配置es服务器地址, 默认是本机。

启动:

bin/kibana

 

默认地址: http://localhost:5601

 

至此日志系统搭建完成, 更详细,更升入的需要查看官方文档学习了。

elasticsearch历史数据维护:

//删除所有的日志数据, 默认日志格式 logstash-2017.06.12  按日生成index的
curl -XDELETE 'http://host.IP.address:9200/logstash-*'
curl -XDELETE http://localhost:9200/.kibana

//如果没有其他数据可以直接
curl -XDELETE http://localhost:9200/*

hive分析nginx日志

#创建数据库表
CREATE TABLE logs(  
  host STRING,  
  tmp STRING,  
  users STRING,  
  time STRING,  
  request STRING,  
  status STRING,  
  size STRING,  
  referer STRING,  
  agent STRING)  
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'  
WITH SERDEPROPERTIES (  
  "input.regex" = "([^ ]*) ([^ ]*) ([^ ]*) \\[(.*)\\] \"(.*?)\" (-|[0-9]*) (-|[0-9]*) \"(.*?)\" \"(.*?)\"",  
  "output.format.string" = "%1$s %2$s %3$s %4$s %5$s %6$s %7$s %8$s %9$s"  
)  
STORED AS TEXTFILE;  

#导入日志数据
LOAD DATA LOCAL INPATH '/home/fang/Downloads/access.log' OVERWRITE INTO TABLE logs;

#检查数据导入
select * from logs limit 100;